msgReadTransfer.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. package msgTransfer
  2. import (
  3. "context"
  4. "encoding/base64"
  5. "encoding/json"
  6. "github.com/zeromicro/go-zero/core/stores/cache"
  7. "paopaoimtask/internal/svc"
  8. "paopaoimtask/mq"
  9. "paopaoimtask/pkg/bitmap"
  10. "paopaoimtask/pkg/constants"
  11. "paopaoimws/ws"
  12. "sync"
  13. "time"
  14. )
  15. var (
  16. GroupMsgReadRecordDelayTime = time.Second // 超时时间
  17. GroupMsgReadRecordDelayCount = 10 //最大消息处理数量
  18. )
  19. const (
  20. GroupMsgReadHandlerAtTransfer = iota //是否开启群消息批量处理
  21. GroupMsgReadHandlerDelayTransfer
  22. )
  23. type MsgReadTransfer struct {
  24. *baseMsgTransfer
  25. cache.Cache
  26. mu sync.Mutex
  27. groupMsgs map[string]*groupMsgRead
  28. push chan *ws.Push
  29. }
  30. func NewMsgReadTransfer(svc *svc.ServiceContext) *MsgReadTransfer {
  31. m := &MsgReadTransfer{
  32. baseMsgTransfer: NewBaseMsgTransfer(svc),
  33. groupMsgs: make(map[string]*groupMsgRead, 1),
  34. push: make(chan *ws.Push, 1),
  35. }
  36. if svc.Config.MsgReadHandler.GroupMsgReadHandler != GroupMsgReadHandlerAtTransfer {
  37. // 开启了群消息批量处理
  38. if svc.Config.MsgReadHandler.GroupMsgReadRecordDelayCount > 0 {
  39. GroupMsgReadRecordDelayCount = int(svc.Config.MsgReadHandler.GroupMsgReadRecordDelayCount)
  40. }
  41. if svc.Config.MsgReadHandler.GroupMsgReadRecordDelayTime > 0 {
  42. GroupMsgReadRecordDelayTime = time.Duration(svc.Config.MsgReadHandler.GroupMsgReadRecordDelayTime) * time.Second
  43. }
  44. }
  45. go m.transfer()
  46. return m
  47. }
  48. func (m *MsgReadTransfer) Consume(ctx context.Context, key, value string) error {
  49. var (
  50. data mq.MsgMarkRead
  51. )
  52. if err := json.Unmarshal([]byte(value), &data); err != nil {
  53. return err
  54. }
  55. // 更新已读未读处理
  56. read, err := m.UpdateChatLogRead(ctx, &data)
  57. if err != nil {
  58. return err
  59. }
  60. push := &ws.Push{
  61. ConversationId: data.ConversationId,
  62. ChatType: data.ChatType,
  63. SendID: data.SendID,
  64. RecvID: data.RecvID,
  65. ContentType: constants.ContentMakeRead,
  66. ReadRecords: read,
  67. }
  68. switch data.ChatType {
  69. case constants.SingleChatType:
  70. // 私聊直接推送
  71. m.push <- push
  72. case constants.GroupChatType:
  73. // 判断是否开启合并消息
  74. if m.svcCtx.Config.MsgReadHandler.GroupMsgReadHandler == GroupMsgReadHandlerAtTransfer {
  75. m.push <- push
  76. }
  77. m.mu.Lock()
  78. defer m.mu.Unlock()
  79. push.SendID = 0
  80. if _, ok := m.groupMsgs[push.ConversationId]; ok {
  81. m.Infof("merge push %v", push.ConversationId)
  82. m.groupMsgs[push.ConversationId].mergePush(push)
  83. } else {
  84. m.groupMsgs[push.ConversationId] = newGroupMsgRead(push, m.push)
  85. }
  86. }
  87. return nil
  88. }
  89. func (m *MsgReadTransfer) UpdateChatLogRead(ctx context.Context, data *mq.MsgMarkRead) (map[string]string, error) {
  90. res := make(map[string]string)
  91. chatLogs, err := m.svcCtx.ChatLogModel.ListByMsgIds(ctx, data.MsgIds)
  92. if err != nil {
  93. return nil, err
  94. }
  95. for _, chat := range chatLogs {
  96. switch chat.ChatType {
  97. case constants.SingleChatType:
  98. chat.ReadRecords = []byte{1}
  99. case constants.GroupChatType:
  100. readRecords := bitmap.Load(chat.ReadRecords)
  101. readRecords.Set(data.SendID)
  102. chat.ReadRecords = readRecords.Export()
  103. }
  104. res[chat.ID.Hex()] = base64.StdEncoding.EncodeToString(chat.ReadRecords)
  105. err := m.svcCtx.ChatLogModel.UpdateMakeRead(ctx, chat.ID, chat.ReadRecords)
  106. if err != nil {
  107. return nil, err
  108. }
  109. }
  110. return res, nil
  111. }
  112. func (m *MsgReadTransfer) transfer() {
  113. for push := range m.push {
  114. if push.RecvID > 0 || len(push.RecvIDs) > 0 {
  115. if err := m.Transfer(context.Background(), push); err != nil {
  116. m.Errorf("m transfer err %v push %v", err, push)
  117. }
  118. }
  119. if push.ChatType == constants.SingleChatType {
  120. continue
  121. }
  122. if m.svcCtx.Config.MsgReadHandler.GroupMsgReadHandler == GroupMsgReadHandlerAtTransfer {
  123. continue
  124. }
  125. // 接收者id为空
  126. m.mu.Lock()
  127. if _, ok := m.groupMsgs[push.ConversationId]; ok && m.groupMsgs[push.ConversationId].IsIdle() {
  128. m.groupMsgs[push.ConversationId].clear()
  129. delete(m.groupMsgs, push.ConversationId)
  130. }
  131. }
  132. }