package logic import ( "context" "slowwildws/internal/constants" "slowwildws/internal/svc" "slowwildws/internal/types" "sync" "time" "github.com/gorilla/websocket" "github.com/zeromicro/go-zero/core/logx" ) type ConnectionLogic struct { logx.Logger ctx context.Context svcCtx *svc.ServiceContext conn *websocket.Conn idleMu sync.Mutex // 超时锁 idle time.Time // 闲置时间 maxConnectionIdle time.Duration // 最大闲置时间,也就是空闲时间 messageMu sync.Mutex // 消息锁 readMessageList []*types.Message // 读取消息队列 readMessageSeq map[string]*types.Message // 读取消息序列化 done chan struct{} // 结束方法 message chan *types.Message //消息通道 } func NewConnectionLogic(ctx context.Context, svcCtx *svc.ServiceContext, conn *websocket.Conn) *ConnectionLogic { return &ConnectionLogic{ Logger: logx.WithContext(ctx), ctx: ctx, svcCtx: svcCtx, conn: conn, idle: time.Now(), maxConnectionIdle: time.Duration(svcCtx.Config.MaxConnectionIdle) * time.Second, done: make(chan struct{}), readMessageList: make([]*types.Message, 0, 2), readMessageSeq: make(map[string]*types.Message, 2), message: make(chan *types.Message, 1), // 给容量为1的话可以确保收发顺序 } } // 关闭连接 func (c *ConnectionLogic) Close() error { select { case <-c.done: default: close(c.done) } return c.conn.Close() } // 读取消息 func (c *ConnectionLogic) ReadMessage() (messageType int, p []byte, err error) { // 这里不能先获取锁,因为会被阻塞住,下面这个获取消息的方法是阻塞的,会导致锁一直得不到释放 messageType, p, err = c.conn.ReadMessage() c.idleMu.Lock() defer func() { c.idleMu.Unlock() }() c.idle = time.Now() return } // 写消息 func (c *ConnectionLogic) WriteMessage(messageType int, data []byte) error { c.idleMu.Lock() defer func() { c.idleMu.Unlock() }() err := c.conn.WriteMessage(messageType, data) c.idle = time.Now() return err } // 心跳检测 func (c *ConnectionLogic) Keepalive() { idlerTimer := time.NewTimer(c.maxConnectionIdle) defer idlerTimer.Stop() for { select { case <-idlerTimer.C: c.idleMu.Lock() idle := c.idle if idle.IsZero() { idlerTimer.Reset(c.maxConnectionIdle) c.idleMu.Unlock() continue } val := c.maxConnectionIdle - time.Since(idle) if val <= 0 { c.idleMu.Unlock() c.Close() return } idlerTimer.Reset(val) c.idleMu.Unlock() case <-c.done: return } } } // 添加消息到队列中 func (c *ConnectionLogic) appendMsgMq(msg *types.Message) { c.messageMu.Lock() defer c.messageMu.Unlock() if m, ok := c.readMessageSeq[msg.Id]; ok { // 已经有消息记录,该消息已经有ack确认 if len(c.readMessageList) == 0 { // 队列中没有该消息 return } if m.AckSeq >= msg.AckSeq { // 没有进行ack确认,或者重复 return } c.readMessageSeq[msg.Id] = msg return } // 还没有进行ack确认,避免客户端重复发送多余的ack消息 if msg.FrameType == constants.FrameAck { return } c.readMessageList = append(c.readMessageList, msg) c.readMessageSeq[msg.Id] = msg }