package msgTransfer import ( "context" "encoding/json" "slowwild/internal/immodel" "slowwild/internal/svc" "slowwild/internal/utils" "strconv" msgChat "git.banshen.xyz/huangguangrong/slow_wild_queue/types" "go.mongodb.org/mongo-driver/bson/primitive" ) /** 消息消费 */ type MsgChatTransfer struct { *BaseMsgTransfer } func NewMsgChatTransfer(svc *svc.ServiceContext) *MsgChatTransfer { return &MsgChatTransfer{ NewBaseMsgTransfer(svc), } } // 消息消费的实现方法 func (m *MsgChatTransfer) Consume(ctx context.Context, key, value string) error { var ( data msgChat.MsgChatTransfer msgId = primitive.NewObjectID() ) if err := json.Unmarshal([]byte(value), &data); err != nil { return err } if err := m.addChatLog(ctx, msgId, &data); err != nil { return err } return m.Transfer(ctx, &msgChat.Push{ MsgID: msgId.Hex(), ConversationId: data.ConversationId, ChatType: data.ChatType, SendID: data.SendID, RecvID: data.RecvID, RecvIDs: data.RecvIDs, SendTime: data.SendTime, MType: data.MType, Content: data.Content, }) } // 记录消息到mongodb func (m *MsgChatTransfer) addChatLog(ctx context.Context, msgId primitive.ObjectID, data *msgChat.MsgChatTransfer) error { //记录消息 chatlog := immodel.ChatLog{ ID: msgId, ConversationId: data.ConversationId, SendId: strconv.FormatInt(data.SendID, 10), RecvId: strconv.FormatInt(data.RecvID, 10), MsgType: data.MType, MsgContent: data.Content, SendTime: data.SendTime, ChatType: data.ChatType, } //设置自己为已读状态 readRecords := utils.NewBitMap(0) readRecords.Set(data.SendID) chatlog.ReadRecords = readRecords.Export() err := m.svcCtx.ChatLogModel.Insert(ctx, &chatlog) if err != nil { return err } return m.svcCtx.ConversationModel.UpdateMsg(ctx, &chatlog) }