| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778 |
- package msgTransfer
- import (
- "context"
- "encoding/json"
- "slowwild/internal/immodel"
- "slowwild/internal/svc"
- "slowwild/internal/utils"
- "strconv"
- msgChat "git.banshen.xyz/huangguangrong/slow_wild_queue/types"
- "go.mongodb.org/mongo-driver/bson/primitive"
- )
- /**
- 消息消费
- */
- type MsgChatTransfer struct {
- *BaseMsgTransfer
- }
- func NewMsgChatTransfer(svc *svc.ServiceContext) *MsgChatTransfer {
- return &MsgChatTransfer{
- NewBaseMsgTransfer(svc),
- }
- }
- // 消息消费的实现方法
- func (m *MsgChatTransfer) Consume(ctx context.Context, key, value string) error {
- var (
- data msgChat.MsgChatTransfer
- msgId = primitive.NewObjectID()
- )
- if err := json.Unmarshal([]byte(value), &data); err != nil {
- return err
- }
- if err := m.addChatLog(ctx, msgId, &data); err != nil {
- return err
- }
- return m.Transfer(ctx, &msgChat.Push{
- MsgID: msgId.Hex(),
- ConversationId: data.ConversationId,
- ChatType: data.ChatType,
- SendID: data.SendID,
- RecvID: data.RecvID,
- RecvIDs: data.RecvIDs,
- SendTime: data.SendTime,
- MType: data.MType,
- Content: data.Content,
- })
- }
- // 记录消息到mongodb
- func (m *MsgChatTransfer) addChatLog(ctx context.Context, msgId primitive.ObjectID, data *msgChat.MsgChatTransfer) error {
- //记录消息
- chatlog := immodel.ChatLog{
- ID: msgId,
- ConversationId: data.ConversationId,
- SendId: strconv.FormatInt(data.SendID, 10),
- RecvId: strconv.FormatInt(data.RecvID, 10),
- MsgType: data.MType,
- MsgContent: data.Content,
- SendTime: data.SendTime,
- ChatType: data.ChatType,
- }
- //设置自己为已读状态
- readRecords := utils.NewBitMap(0)
- readRecords.Set(data.SendID)
- chatlog.ReadRecords = readRecords.Export()
- err := m.svcCtx.ChatLogModel.Insert(ctx, &chatlog)
- if err != nil {
- return err
- }
- return m.svcCtx.ConversationModel.UpdateMsg(ctx, &chatlog)
- }
|