package queue import ( "context" "encoding/json" "fmt" "slowwildws/internal/svc" "git.banshen.xyz/huangguangrong/slow_wild_queue/types" ) type MsgSendConsumer struct { ctx context.Context svcCtx *svc.ServiceContext //Logger logx.Logger } func NewMsgSendConsumer(ctx context.Context, svc *svc.ServiceContext) *MsgSendConsumer { return &MsgSendConsumer{ ctx: ctx, svcCtx: svc, //Logger: logx.WithContext(ctx), } } func (m *MsgSendConsumer) Consume(ctx context.Context, key, val string) error { if val == "" { return nil } fmt.Println("接到的消息:", val) var msgData types.MessagePushData err := json.Unmarshal([]byte(val), &msgData) if err != nil { fmt.Println("参数解析失败") return err } // 获取用户的conn conn := m.svcCtx.WsServer.GetConn(msgData.UserID) if conn == nil { fmt.Println("获取用户的链接信息失败") return nil } m.svcCtx.WsServer.Send(msgData, conn) return nil }