| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- package msgTransfer
- import (
- "github.com/zeromicro/go-zero/core/logx"
- "paopaoimtask/pkg/constants"
- "paopaoimws/ws"
- "sync"
- "time"
- )
- type groupMsgRead struct {
- mu sync.Mutex
- conversationId string
- push *ws.Push //推送方法
- pushCh chan *ws.Push // 通道推送
- count int // 数量
- pushTime time.Time // 上次推送时间
- done chan struct{}
- }
- func newGroupMsgRead(push *ws.Push, pushCh chan *ws.Push) *groupMsgRead {
- g := &groupMsgRead{
- push: push,
- conversationId: push.ConversationId,
- pushCh: pushCh,
- count: 1,
- pushTime: time.Time{},
- done: make(chan struct{}),
- }
- go g.transfer()
- return g
- }
- func (g *groupMsgRead) mergePush(push *ws.Push) {
- g.mu.Lock()
- defer g.mu.Unlock()
- if push == nil {
- g.push = push
- }
- g.count++
- for msgId, read := range push.ReadRecords {
- g.push.ReadRecords[msgId] = read
- }
- }
- // 是否是活跃的状态
- func (g *groupMsgRead) IsIdle() bool {
- g.mu.Lock()
- defer g.mu.Unlock()
- return g.isIdle()
- }
- func (g *groupMsgRead) isIdle() bool {
- pushTime := g.pushTime
- val := GroupMsgReadRecordDelayTime*2 - time.Since(pushTime)
- if val <= 0 && g.push == nil && g.count == 0 {
- return true
- }
- return false
- }
- // 清理内容
- func (g *groupMsgRead) clear() {
- select {
- case <-g.done:
- default:
- close(g.done)
- }
- g.push = nil
- }
- func (g *groupMsgRead) transfer() {
- // 超时发送
- timer := time.NewTimer(GroupMsgReadRecordDelayTime / 2)
- defer timer.Stop()
- for {
- select {
- case <-g.done:
- return
- case <-timer.C:
- g.mu.Lock()
- pushTime := g.pushTime
- val := GroupMsgReadRecordDelayTime - time.Since(pushTime)
- push := g.push
- if val > 0 && g.count < GroupMsgReadRecordDelayCount || push == nil {
- // 时间和数量都没有达标
- if val > 0 {
- timer.Reset(val)
- }
- g.mu.Unlock()
- continue
- }
- g.pushTime = time.Now()
- g.push = nil
- g.count = 0
- timer.Reset(val)
- g.mu.Unlock()
- // 推送
- logx.Infof("超过合并的条件推送 %v", push)
- g.pushCh <- push
- default:
- g.mu.Lock()
- if g.count >= GroupMsgReadRecordDelayCount {
- push := g.push
- // 达到了数量的话也要进行推送
- g.pushTime = time.Now()
- g.push = nil
- g.count = 0
- g.mu.Unlock()
- // 推送
- logx.Infof("超过合并的条件推送 %v", push)
- g.pushCh <- push
- continue
- }
- if g.isIdle() {
- g.mu.Unlock()
- g.pushCh <- &ws.Push{
- ChatType: constants.GroupChatType,
- ConversationId: g.conversationId,
- }
- continue
- }
- g.mu.Unlock()
- timeDelay := GroupMsgReadRecordDelayTime / 4
- if timeDelay > time.Second {
- timeDelay = time.Second
- }
- time.Sleep(timeDelay)
- }
- }
- // 超量发送
- }
|