groupMsgRead.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. package msgTransfer
  2. import (
  3. "github.com/zeromicro/go-zero/core/logx"
  4. "paopaoimtask/pkg/constants"
  5. "paopaoimws/ws"
  6. "sync"
  7. "time"
  8. )
  9. type groupMsgRead struct {
  10. mu sync.Mutex
  11. conversationId string
  12. push *ws.Push //推送方法
  13. pushCh chan *ws.Push // 通道推送
  14. count int // 数量
  15. pushTime time.Time // 上次推送时间
  16. done chan struct{}
  17. }
  18. func newGroupMsgRead(push *ws.Push, pushCh chan *ws.Push) *groupMsgRead {
  19. g := &groupMsgRead{
  20. push: push,
  21. conversationId: push.ConversationId,
  22. pushCh: pushCh,
  23. count: 1,
  24. pushTime: time.Time{},
  25. done: make(chan struct{}),
  26. }
  27. go g.transfer()
  28. return g
  29. }
  30. func (g *groupMsgRead) mergePush(push *ws.Push) {
  31. g.mu.Lock()
  32. defer g.mu.Unlock()
  33. if push == nil {
  34. g.push = push
  35. }
  36. g.count++
  37. for msgId, read := range push.ReadRecords {
  38. g.push.ReadRecords[msgId] = read
  39. }
  40. }
  41. // 是否是活跃的状态
  42. func (g *groupMsgRead) IsIdle() bool {
  43. g.mu.Lock()
  44. defer g.mu.Unlock()
  45. return g.isIdle()
  46. }
  47. func (g *groupMsgRead) isIdle() bool {
  48. pushTime := g.pushTime
  49. val := GroupMsgReadRecordDelayTime*2 - time.Since(pushTime)
  50. if val <= 0 && g.push == nil && g.count == 0 {
  51. return true
  52. }
  53. return false
  54. }
  55. // 清理内容
  56. func (g *groupMsgRead) clear() {
  57. select {
  58. case <-g.done:
  59. default:
  60. close(g.done)
  61. }
  62. g.push = nil
  63. }
  64. func (g *groupMsgRead) transfer() {
  65. // 超时发送
  66. timer := time.NewTimer(GroupMsgReadRecordDelayTime / 2)
  67. defer timer.Stop()
  68. for {
  69. select {
  70. case <-g.done:
  71. return
  72. case <-timer.C:
  73. g.mu.Lock()
  74. pushTime := g.pushTime
  75. val := GroupMsgReadRecordDelayTime - time.Since(pushTime)
  76. push := g.push
  77. if val > 0 && g.count < GroupMsgReadRecordDelayCount || push == nil {
  78. // 时间和数量都没有达标
  79. if val > 0 {
  80. timer.Reset(val)
  81. }
  82. g.mu.Unlock()
  83. continue
  84. }
  85. g.pushTime = time.Now()
  86. g.push = nil
  87. g.count = 0
  88. timer.Reset(val)
  89. g.mu.Unlock()
  90. // 推送
  91. logx.Infof("超过合并的条件推送 %v", push)
  92. g.pushCh <- push
  93. default:
  94. g.mu.Lock()
  95. if g.count >= GroupMsgReadRecordDelayCount {
  96. push := g.push
  97. // 达到了数量的话也要进行推送
  98. g.pushTime = time.Now()
  99. g.push = nil
  100. g.count = 0
  101. g.mu.Unlock()
  102. // 推送
  103. logx.Infof("超过合并的条件推送 %v", push)
  104. g.pushCh <- push
  105. continue
  106. }
  107. if g.isIdle() {
  108. g.mu.Unlock()
  109. g.pushCh <- &ws.Push{
  110. ChatType: constants.GroupChatType,
  111. ConversationId: g.conversationId,
  112. }
  113. continue
  114. }
  115. g.mu.Unlock()
  116. timeDelay := GroupMsgReadRecordDelayTime / 4
  117. if timeDelay > time.Second {
  118. timeDelay = time.Second
  119. }
  120. time.Sleep(timeDelay)
  121. }
  122. }
  123. // 超量发送
  124. }