package server import ( "context" "slowwildws/internal/config" "slowwildws/internal/constants" "slowwildws/internal/types" "sync" "time" "github.com/gorilla/websocket" "github.com/zeromicro/go-zero/core/logx" ) type ConnectionServer struct { logx.Logger ctx context.Context conn *websocket.Conn idleMu sync.Mutex // 超时锁 idle time.Time // 闲置时间 maxConnectionIdle time.Duration // 最大闲置时间,也就是空闲时间 messageMu sync.Mutex // 消息锁 readMessageList []*types.Message // 读取消息队列 readMessageSeq map[string]*types.Message // 读取消息序列化 done chan struct{} // 结束方法 message chan *types.Message // 消息通道 Uid int64 // 用户id } func NewConnectionServer(ctx context.Context, conf config.Config, conn *websocket.Conn) *ConnectionServer { return &ConnectionServer{ Logger: logx.WithContext(ctx), ctx: ctx, conn: conn, idle: time.Now(), maxConnectionIdle: time.Duration(conf.MaxConnectionIdle) * time.Second, done: make(chan struct{}), readMessageList: make([]*types.Message, 0, 2), readMessageSeq: make(map[string]*types.Message, 2), message: make(chan *types.Message, 1), // 给容量为1的话可以确保收发顺序 } } // 关闭连接 func (c *ConnectionServer) Close() error { select { case <-c.done: default: close(c.done) } return c.conn.Close() } // 读取消息 func (c *ConnectionServer) ReadMessage() (messageType int, p []byte, err error) { // 这里不能先获取锁,因为会被阻塞住,下面这个获取消息的方法是阻塞的,会导致锁一直得不到释放 messageType, p, err = c.conn.ReadMessage() c.idleMu.Lock() defer func() { c.idleMu.Unlock() }() c.idle = time.Now() return } // 写消息 func (c *ConnectionServer) WriteMessage(messageType int, data []byte) error { c.idleMu.Lock() defer func() { c.idleMu.Unlock() }() err := c.conn.WriteMessage(messageType, data) c.idle = time.Now() return err } // 心跳检测 func (c *ConnectionServer) Keepalive() { idlerTimer := time.NewTimer(c.maxConnectionIdle) defer idlerTimer.Stop() for { select { case <-idlerTimer.C: c.idleMu.Lock() idle := c.idle if idle.IsZero() { idlerTimer.Reset(c.maxConnectionIdle) c.idleMu.Unlock() continue } val := c.maxConnectionIdle - time.Since(idle) if val <= 0 { c.idleMu.Unlock() c.Close() return } idlerTimer.Reset(val) c.idleMu.Unlock() case <-c.done: return } } } // 添加消息到队列中 func (c *ConnectionServer) AppendMsgMq(msg *types.Message) { c.messageMu.Lock() defer c.messageMu.Unlock() if m, ok := c.readMessageSeq[msg.Id]; ok { // 已经有消息记录,该消息已经有ack确认 if len(c.readMessageList) == 0 { // 队列中没有该消息 return } if m.AckSeq >= msg.AckSeq { // 没有进行ack确认,或者重复 return } c.readMessageSeq[msg.Id] = msg return } // 还没有进行ack确认,避免客户端重复发送多余的ack消息 if msg.FrameType == constants.FrameAck { return } c.readMessageList = append(c.readMessageList, msg) c.readMessageSeq[msg.Id] = msg } // 写入消息到通道中 func (c *ConnectionServer) WriteMessageToChan(message *types.Message) { c.message <- message }