| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- package msgTransfer
- import (
- "context"
- "encoding/base64"
- "encoding/json"
- "slowwild/internal/svc"
- "slowwild/internal/utils"
- "sync"
- "time"
- msgChat "git.banshen.xyz/huangguangrong/slow_wild_queue/types"
- "github.com/zeromicro/go-zero/core/stores/cache"
- )
- var (
- GroupMsgReadRecordDelayTime = time.Second // 超时时间
- GroupMsgReadRecordDelayCount = 10 //最大消息处理数量
- )
- const (
- GroupMsgReadHandlerAtTransfer = iota //是否开启群消息批量处理
- GroupMsgReadHandlerDelayTransfer
- )
- type MsgReadTransfer struct {
- *BaseMsgTransfer
- cache.Cache
- mu sync.Mutex
- groupMsgs map[string]*groupMsgRead
- push chan *msgChat.Push
- }
- func NewMsgReadTransfer(svc *svc.ServiceContext) *MsgReadTransfer {
- m := &MsgReadTransfer{
- BaseMsgTransfer: NewBaseMsgTransfer(svc),
- groupMsgs: make(map[string]*groupMsgRead, 1),
- push: make(chan *msgChat.Push, 1),
- }
- if svc.Config.MsgReadHandler.GroupMsgReadHandler != GroupMsgReadHandlerAtTransfer {
- // 开启了群消息批量处理
- if svc.Config.MsgReadHandler.GroupMsgReadRecordDelayCount > 0 {
- GroupMsgReadRecordDelayCount = int(svc.Config.MsgReadHandler.GroupMsgReadRecordDelayCount)
- }
- if svc.Config.MsgReadHandler.GroupMsgReadRecordDelayTime > 0 {
- GroupMsgReadRecordDelayTime = time.Duration(svc.Config.MsgReadHandler.GroupMsgReadRecordDelayTime) * time.Second
- }
- }
- go m.transfer()
- return m
- }
- func (m *MsgReadTransfer) Consume(ctx context.Context, key, value string) error {
- var (
- data msgChat.MsgMarkRead
- )
- if err := json.Unmarshal([]byte(value), &data); err != nil {
- return err
- }
- // 更新已读未读处理
- read, err := m.UpdateChatLogRead(ctx, &data)
- if err != nil {
- return err
- }
- push := &msgChat.Push{
- ConversationId: data.ConversationId,
- ChatType: data.ChatType,
- SendID: data.SendID,
- RecvID: data.RecvID,
- ContentType: msgChat.ContentMakeRead,
- ReadRecords: read,
- }
- switch data.ChatType {
- case msgChat.SingleChatType:
- // 私聊直接推送
- m.push <- push
- case msgChat.GroupChatType:
- // 判断是否开启合并消息
- if m.svcCtx.Config.MsgReadHandler.GroupMsgReadHandler == GroupMsgReadHandlerAtTransfer {
- m.push <- push
- }
- m.mu.Lock()
- defer m.mu.Unlock()
- push.SendID = 0
- if _, ok := m.groupMsgs[push.ConversationId]; ok {
- m.Infof("merge push %v", push.ConversationId)
- m.groupMsgs[push.ConversationId].mergePush(push)
- } else {
- m.groupMsgs[push.ConversationId] = newGroupMsgRead(push, m.push)
- }
- }
- return nil
- }
- func (m *MsgReadTransfer) UpdateChatLogRead(ctx context.Context, data *msgChat.MsgMarkRead) (map[string]string, error) {
- res := make(map[string]string)
- chatLogs, err := m.svcCtx.ChatLogModel.ListByMsgIds(ctx, data.MsgIds)
- if err != nil {
- return nil, err
- }
- for _, chat := range chatLogs {
- switch chat.ChatType {
- case msgChat.SingleChatType:
- chat.ReadRecords = []byte{1}
- case msgChat.GroupChatType:
- readRecords := utils.Load(chat.ReadRecords)
- readRecords.Set(data.SendID)
- chat.ReadRecords = readRecords.Export()
- }
- res[chat.ID.Hex()] = base64.StdEncoding.EncodeToString(chat.ReadRecords)
- err := m.svcCtx.ChatLogModel.UpdateMakeRead(ctx, chat.ID, chat.ReadRecords)
- if err != nil {
- return nil, err
- }
- }
- return res, nil
- }
- func (m *MsgReadTransfer) transfer() {
- for push := range m.push {
- if push.RecvID > 0 || len(push.RecvIDs) > 0 {
- if err := m.Transfer(context.Background(), push); err != nil {
- m.Errorf("m transfer err %v push %v", err, push)
- }
- }
- if push.ChatType == msgChat.SingleChatType {
- continue
- }
- if m.svcCtx.Config.MsgReadHandler.GroupMsgReadHandler == GroupMsgReadHandlerAtTransfer {
- continue
- }
- // 接收者id为空
- m.mu.Lock()
- if _, ok := m.groupMsgs[push.ConversationId]; ok && m.groupMsgs[push.ConversationId].IsIdle() {
- m.groupMsgs[push.ConversationId].clear()
- delete(m.groupMsgs, push.ConversationId)
- }
- }
- }
|