package msgTransfer import ( "context" "encoding/base64" "encoding/json" "github.com/zeromicro/go-zero/core/stores/cache" "paopaoimtask/internal/svc" "paopaoimtask/mq" "paopaoimtask/pkg/bitmap" "paopaoimtask/pkg/constants" "paopaoimws/ws" "sync" "time" ) var ( GroupMsgReadRecordDelayTime = time.Second // 超时时间 GroupMsgReadRecordDelayCount = 10 //最大消息处理数量 ) const ( GroupMsgReadHandlerAtTransfer = iota //是否开启群消息批量处理 GroupMsgReadHandlerDelayTransfer ) type MsgReadTransfer struct { *baseMsgTransfer cache.Cache mu sync.Mutex groupMsgs map[string]*groupMsgRead push chan *ws.Push } func NewMsgReadTransfer(svc *svc.ServiceContext) *MsgReadTransfer { m := &MsgReadTransfer{ baseMsgTransfer: NewBaseMsgTransfer(svc), groupMsgs: make(map[string]*groupMsgRead, 1), push: make(chan *ws.Push, 1), } if svc.Config.MsgReadHandler.GroupMsgReadHandler != GroupMsgReadHandlerAtTransfer { // 开启了群消息批量处理 if svc.Config.MsgReadHandler.GroupMsgReadRecordDelayCount > 0 { GroupMsgReadRecordDelayCount = int(svc.Config.MsgReadHandler.GroupMsgReadRecordDelayCount) } if svc.Config.MsgReadHandler.GroupMsgReadRecordDelayTime > 0 { GroupMsgReadRecordDelayTime = time.Duration(svc.Config.MsgReadHandler.GroupMsgReadRecordDelayTime) * time.Second } } go m.transfer() return m } func (m *MsgReadTransfer) Consume(ctx context.Context, key, value string) error { var ( data mq.MsgMarkRead ) if err := json.Unmarshal([]byte(value), &data); err != nil { return err } // 更新已读未读处理 read, err := m.UpdateChatLogRead(ctx, &data) if err != nil { return err } push := &ws.Push{ ConversationId: data.ConversationId, ChatType: data.ChatType, SendID: data.SendID, RecvID: data.RecvID, ContentType: constants.ContentMakeRead, ReadRecords: read, } switch data.ChatType { case constants.SingleChatType: // 私聊直接推送 m.push <- push case constants.GroupChatType: // 判断是否开启合并消息 if m.svcCtx.Config.MsgReadHandler.GroupMsgReadHandler == GroupMsgReadHandlerAtTransfer { m.push <- push } m.mu.Lock() defer m.mu.Unlock() push.SendID = 0 if _, ok := m.groupMsgs[push.ConversationId]; ok { m.Infof("merge push %v", push.ConversationId) m.groupMsgs[push.ConversationId].mergePush(push) } else { m.groupMsgs[push.ConversationId] = newGroupMsgRead(push, m.push) } } return nil } func (m *MsgReadTransfer) UpdateChatLogRead(ctx context.Context, data *mq.MsgMarkRead) (map[string]string, error) { res := make(map[string]string) chatLogs, err := m.svcCtx.ChatLogModel.ListByMsgIds(ctx, data.MsgIds) if err != nil { return nil, err } for _, chat := range chatLogs { switch chat.ChatType { case constants.SingleChatType: chat.ReadRecords = []byte{1} case constants.GroupChatType: readRecords := bitmap.Load(chat.ReadRecords) readRecords.Set(data.SendID) chat.ReadRecords = readRecords.Export() } res[chat.ID.Hex()] = base64.StdEncoding.EncodeToString(chat.ReadRecords) err := m.svcCtx.ChatLogModel.UpdateMakeRead(ctx, chat.ID, chat.ReadRecords) if err != nil { return nil, err } } return res, nil } func (m *MsgReadTransfer) transfer() { for push := range m.push { if push.RecvID > 0 || len(push.RecvIDs) > 0 { if err := m.Transfer(context.Background(), push); err != nil { m.Errorf("m transfer err %v push %v", err, push) } } if push.ChatType == constants.SingleChatType { continue } if m.svcCtx.Config.MsgReadHandler.GroupMsgReadHandler == GroupMsgReadHandlerAtTransfer { continue } // 接收者id为空 m.mu.Lock() if _, ok := m.groupMsgs[push.ConversationId]; ok && m.groupMsgs[push.ConversationId].IsIdle() { m.groupMsgs[push.ConversationId].clear() delete(m.groupMsgs, push.ConversationId) } } }