package msgTransfer import ( "context" "encoding/json" "go.mongodb.org/mongo-driver/bson/primitive" "paopaoimtask/internal/svc" "paopaoimtask/mq" "paopaoimtask/pkg/bitmap" "paopaoimws/immodels" "paopaoimws/ws" "strconv" ) /** 消息消费 */ type MsgChatTransfer struct { *baseMsgTransfer } func NewMsgChatTransfer(svc *svc.ServiceContext) *MsgChatTransfer { return &MsgChatTransfer{ NewBaseMsgTransfer(svc), } } // 消息消费的实现方法 func (m *MsgChatTransfer) Consume(ctx context.Context, key, value string) error { var ( data mq.MsgChatTransfer msgId = primitive.NewObjectID() ) if err := json.Unmarshal([]byte(value), &data); err != nil { return err } if err := m.addChatLog(ctx, msgId, &data); err != nil { return err } return m.Transfer(ctx, &ws.Push{ MsgID: msgId.Hex(), ConversationId: data.ConversationId, ChatType: data.ChatType, SendID: data.SendID, RecvID: data.RecvID, RecvIDs: data.RecvIDs, SendTime: data.SendTime, MType: data.MType, Content: data.Content, }) } // 记录消息到mongodb func (m *MsgChatTransfer) addChatLog(ctx context.Context, msgId primitive.ObjectID, data *mq.MsgChatTransfer) error { //记录消息 chatlog := immodels.ChatLog{ ID: msgId, ConversationId: data.ConversationId, SendId: strconv.FormatInt(data.SendID, 10), RecvId: strconv.FormatInt(data.RecvID, 10), MsgFrom: 0, MsgType: data.MType, MsgContent: data.Content, SendTime: data.SendTime, ChatType: data.ChatType, } //设置自己为已读状态 readRecords := bitmap.NewBitMap(0) readRecords.Set(data.SendID) chatlog.ReadRecords = readRecords.Export() err := m.svcCtx.ChatLogModel.Insert(ctx, &chatlog) if err != nil { return err } return m.svcCtx.ConversationModel.UpdateMsg(ctx, &chatlog) }