msgReadTransfer.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. package msgTransfer
  2. import (
  3. "context"
  4. "encoding/base64"
  5. "encoding/json"
  6. "slowwild/internal/svc"
  7. "slowwild/internal/utils"
  8. "sync"
  9. "time"
  10. msgChat "git.banshen.xyz/huangguangrong/slow_wild_queue/types"
  11. "github.com/zeromicro/go-zero/core/stores/cache"
  12. )
  13. var (
  14. GroupMsgReadRecordDelayTime = time.Second // 超时时间
  15. GroupMsgReadRecordDelayCount = 10 //最大消息处理数量
  16. )
  17. const (
  18. GroupMsgReadHandlerAtTransfer = iota //是否开启群消息批量处理
  19. GroupMsgReadHandlerDelayTransfer
  20. )
  21. type MsgReadTransfer struct {
  22. *BaseMsgTransfer
  23. cache.Cache
  24. mu sync.Mutex
  25. groupMsgs map[string]*groupMsgRead
  26. push chan *msgChat.Push
  27. }
  28. func NewMsgReadTransfer(svc *svc.ServiceContext) *MsgReadTransfer {
  29. m := &MsgReadTransfer{
  30. BaseMsgTransfer: NewBaseMsgTransfer(svc),
  31. groupMsgs: make(map[string]*groupMsgRead, 1),
  32. push: make(chan *msgChat.Push, 1),
  33. }
  34. if svc.Config.MsgReadHandler.GroupMsgReadHandler != GroupMsgReadHandlerAtTransfer {
  35. // 开启了群消息批量处理
  36. if svc.Config.MsgReadHandler.GroupMsgReadRecordDelayCount > 0 {
  37. GroupMsgReadRecordDelayCount = int(svc.Config.MsgReadHandler.GroupMsgReadRecordDelayCount)
  38. }
  39. if svc.Config.MsgReadHandler.GroupMsgReadRecordDelayTime > 0 {
  40. GroupMsgReadRecordDelayTime = time.Duration(svc.Config.MsgReadHandler.GroupMsgReadRecordDelayTime) * time.Second
  41. }
  42. }
  43. go m.transfer()
  44. return m
  45. }
  46. func (m *MsgReadTransfer) Consume(ctx context.Context, key, value string) error {
  47. var (
  48. data msgChat.MsgMarkRead
  49. )
  50. if err := json.Unmarshal([]byte(value), &data); err != nil {
  51. return err
  52. }
  53. // 更新已读未读处理
  54. read, err := m.UpdateChatLogRead(ctx, &data)
  55. if err != nil {
  56. return err
  57. }
  58. push := &msgChat.Push{
  59. ConversationId: data.ConversationId,
  60. ChatType: data.ChatType,
  61. SendID: data.SendID,
  62. RecvID: data.RecvID,
  63. ContentType: msgChat.ContentMakeRead,
  64. ReadRecords: read,
  65. }
  66. switch data.ChatType {
  67. case msgChat.SingleChatType:
  68. // 私聊直接推送
  69. m.push <- push
  70. case msgChat.GroupChatType:
  71. // 判断是否开启合并消息
  72. if m.svcCtx.Config.MsgReadHandler.GroupMsgReadHandler == GroupMsgReadHandlerAtTransfer {
  73. m.push <- push
  74. }
  75. m.mu.Lock()
  76. defer m.mu.Unlock()
  77. push.SendID = 0
  78. if _, ok := m.groupMsgs[push.ConversationId]; ok {
  79. m.Infof("merge push %v", push.ConversationId)
  80. m.groupMsgs[push.ConversationId].mergePush(push)
  81. } else {
  82. m.groupMsgs[push.ConversationId] = newGroupMsgRead(push, m.push)
  83. }
  84. }
  85. return nil
  86. }
  87. func (m *MsgReadTransfer) UpdateChatLogRead(ctx context.Context, data *msgChat.MsgMarkRead) (map[string]string, error) {
  88. res := make(map[string]string)
  89. chatLogs, err := m.svcCtx.ChatLogModel.ListByMsgIds(ctx, data.MsgIds)
  90. if err != nil {
  91. return nil, err
  92. }
  93. for _, chat := range chatLogs {
  94. switch chat.ChatType {
  95. case msgChat.SingleChatType:
  96. chat.ReadRecords = []byte{1}
  97. case msgChat.GroupChatType:
  98. readRecords := utils.Load(chat.ReadRecords)
  99. readRecords.Set(data.SendID)
  100. chat.ReadRecords = readRecords.Export()
  101. }
  102. res[chat.ID.Hex()] = base64.StdEncoding.EncodeToString(chat.ReadRecords)
  103. err := m.svcCtx.ChatLogModel.UpdateMakeRead(ctx, chat.ID, chat.ReadRecords)
  104. if err != nil {
  105. return nil, err
  106. }
  107. }
  108. return res, nil
  109. }
  110. func (m *MsgReadTransfer) transfer() {
  111. for push := range m.push {
  112. if push.RecvID > 0 || len(push.RecvIDs) > 0 {
  113. if err := m.Transfer(context.Background(), push); err != nil {
  114. m.Errorf("m transfer err %v push %v", err, push)
  115. }
  116. }
  117. if push.ChatType == msgChat.SingleChatType {
  118. continue
  119. }
  120. if m.svcCtx.Config.MsgReadHandler.GroupMsgReadHandler == GroupMsgReadHandlerAtTransfer {
  121. continue
  122. }
  123. // 接收者id为空
  124. m.mu.Lock()
  125. if _, ok := m.groupMsgs[push.ConversationId]; ok && m.groupMsgs[push.ConversationId].IsIdle() {
  126. m.groupMsgs[push.ConversationId].clear()
  127. delete(m.groupMsgs, push.ConversationId)
  128. }
  129. }
  130. }