package msgTransfer import ( "sync" "time" msgChat "git.banshen.xyz/huangguangrong/slow_wild_queue/types" "github.com/zeromicro/go-zero/core/logx" ) type groupMsgRead struct { mu sync.Mutex // 锁 conversationId string // 会话id push *msgChat.Push //推送方法 pushCh chan *msgChat.Push // 通道推送 count int // 数量 pushTime time.Time // 上次推送时间 done chan struct{} } func newGroupMsgRead(push *msgChat.Push, pushCh chan *msgChat.Push) *groupMsgRead { g := &groupMsgRead{ push: push, conversationId: push.ConversationId, pushCh: pushCh, count: 1, pushTime: time.Time{}, done: make(chan struct{}), } go g.transfer() return g } func (g *groupMsgRead) mergePush(push *msgChat.Push) { g.mu.Lock() defer g.mu.Unlock() if push == nil { g.push = push } g.count++ for msgId, read := range push.ReadRecords { g.push.ReadRecords[msgId] = read } } // 是否是活跃的状态 func (g *groupMsgRead) IsIdle() bool { g.mu.Lock() defer g.mu.Unlock() return g.isIdle() } func (g *groupMsgRead) isIdle() bool { pushTime := g.pushTime val := GroupMsgReadRecordDelayTime*2 - time.Since(pushTime) if val <= 0 && g.push == nil && g.count == 0 { return true } return false } // 清理内容 func (g *groupMsgRead) clear() { select { case <-g.done: default: close(g.done) } g.push = nil } func (g *groupMsgRead) transfer() { // 超时发送 timer := time.NewTimer(GroupMsgReadRecordDelayTime / 2) defer timer.Stop() for { select { case <-g.done: return case <-timer.C: g.mu.Lock() pushTime := g.pushTime val := GroupMsgReadRecordDelayTime - time.Since(pushTime) push := g.push if val > 0 && g.count < GroupMsgReadRecordDelayCount || push == nil { // 时间和数量都没有达标 if val > 0 { timer.Reset(val) } g.mu.Unlock() continue } g.pushTime = time.Now() g.push = nil g.count = 0 timer.Reset(val) g.mu.Unlock() // 推送 logx.Infof("超过合并的条件推送 %v", push) g.pushCh <- push default: g.mu.Lock() if g.count >= GroupMsgReadRecordDelayCount { push := g.push // 达到了数量的话也要进行推送 g.pushTime = time.Now() g.push = nil g.count = 0 g.mu.Unlock() // 推送 logx.Infof("超过合并的条件推送 %v", push) g.pushCh <- push continue } if g.isIdle() { g.mu.Unlock() g.pushCh <- &msgChat.Push{ ChatType: msgChat.GroupChatType, ConversationId: g.conversationId, } continue } g.mu.Unlock() timeDelay := GroupMsgReadRecordDelayTime / 4 if timeDelay > time.Second { timeDelay = time.Second } time.Sleep(timeDelay) } } // 超量发送 }