groupMsgRead.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. package msgTransfer
  2. import (
  3. "sync"
  4. "time"
  5. msgChat "git.banshen.xyz/huangguangrong/slow_wild_queue/types"
  6. "github.com/zeromicro/go-zero/core/logx"
  7. )
  8. type groupMsgRead struct {
  9. mu sync.Mutex // 锁
  10. conversationId string // 会话id
  11. push *msgChat.Push //推送方法
  12. pushCh chan *msgChat.Push // 通道推送
  13. count int // 数量
  14. pushTime time.Time // 上次推送时间
  15. done chan struct{}
  16. }
  17. func newGroupMsgRead(push *msgChat.Push, pushCh chan *msgChat.Push) *groupMsgRead {
  18. g := &groupMsgRead{
  19. push: push,
  20. conversationId: push.ConversationId,
  21. pushCh: pushCh,
  22. count: 1,
  23. pushTime: time.Time{},
  24. done: make(chan struct{}),
  25. }
  26. go g.transfer()
  27. return g
  28. }
  29. func (g *groupMsgRead) mergePush(push *msgChat.Push) {
  30. g.mu.Lock()
  31. defer g.mu.Unlock()
  32. if push == nil {
  33. g.push = push
  34. }
  35. g.count++
  36. for msgId, read := range push.ReadRecords {
  37. g.push.ReadRecords[msgId] = read
  38. }
  39. }
  40. // 是否是活跃的状态
  41. func (g *groupMsgRead) IsIdle() bool {
  42. g.mu.Lock()
  43. defer g.mu.Unlock()
  44. return g.isIdle()
  45. }
  46. func (g *groupMsgRead) isIdle() bool {
  47. pushTime := g.pushTime
  48. val := GroupMsgReadRecordDelayTime*2 - time.Since(pushTime)
  49. if val <= 0 && g.push == nil && g.count == 0 {
  50. return true
  51. }
  52. return false
  53. }
  54. // 清理内容
  55. func (g *groupMsgRead) clear() {
  56. select {
  57. case <-g.done:
  58. default:
  59. close(g.done)
  60. }
  61. g.push = nil
  62. }
  63. func (g *groupMsgRead) transfer() {
  64. // 超时发送
  65. timer := time.NewTimer(GroupMsgReadRecordDelayTime / 2)
  66. defer timer.Stop()
  67. for {
  68. select {
  69. case <-g.done:
  70. return
  71. case <-timer.C:
  72. g.mu.Lock()
  73. pushTime := g.pushTime
  74. val := GroupMsgReadRecordDelayTime - time.Since(pushTime)
  75. push := g.push
  76. if val > 0 && g.count < GroupMsgReadRecordDelayCount || push == nil {
  77. // 时间和数量都没有达标
  78. if val > 0 {
  79. timer.Reset(val)
  80. }
  81. g.mu.Unlock()
  82. continue
  83. }
  84. g.pushTime = time.Now()
  85. g.push = nil
  86. g.count = 0
  87. timer.Reset(val)
  88. g.mu.Unlock()
  89. // 推送
  90. logx.Infof("超过合并的条件推送 %v", push)
  91. g.pushCh <- push
  92. default:
  93. g.mu.Lock()
  94. if g.count >= GroupMsgReadRecordDelayCount {
  95. push := g.push
  96. // 达到了数量的话也要进行推送
  97. g.pushTime = time.Now()
  98. g.push = nil
  99. g.count = 0
  100. g.mu.Unlock()
  101. // 推送
  102. logx.Infof("超过合并的条件推送 %v", push)
  103. g.pushCh <- push
  104. continue
  105. }
  106. if g.isIdle() {
  107. g.mu.Unlock()
  108. g.pushCh <- &msgChat.Push{
  109. ChatType: msgChat.GroupChatType,
  110. ConversationId: g.conversationId,
  111. }
  112. continue
  113. }
  114. g.mu.Unlock()
  115. timeDelay := GroupMsgReadRecordDelayTime / 4
  116. if timeDelay > time.Second {
  117. timeDelay = time.Second
  118. }
  119. time.Sleep(timeDelay)
  120. }
  121. }
  122. // 超量发送
  123. }