| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- package logic
- import (
- "context"
- "slowwildws/internal/constants"
- "slowwildws/internal/svc"
- "slowwildws/internal/types"
- "sync"
- "time"
- "github.com/gorilla/websocket"
- "github.com/zeromicro/go-zero/core/logx"
- )
- type ConnectionLogic struct {
- logx.Logger
- ctx context.Context
- svcCtx *svc.ServiceContext
- conn *websocket.Conn
- idleMu sync.Mutex // 超时锁
- idle time.Time // 闲置时间
- maxConnectionIdle time.Duration // 最大闲置时间,也就是空闲时间
- messageMu sync.Mutex // 消息锁
- readMessageList []*types.Message // 读取消息队列
- readMessageSeq map[string]*types.Message // 读取消息序列化
- done chan struct{} // 结束方法
- message chan *types.Message //消息通道
- }
- func NewConnectionLogic(ctx context.Context, svcCtx *svc.ServiceContext, conn *websocket.Conn) *ConnectionLogic {
- return &ConnectionLogic{
- Logger: logx.WithContext(ctx),
- ctx: ctx,
- svcCtx: svcCtx,
- conn: conn,
- idle: time.Now(),
- maxConnectionIdle: time.Duration(svcCtx.Config.MaxConnectionIdle) * time.Second,
- done: make(chan struct{}),
- readMessageList: make([]*types.Message, 0, 2),
- readMessageSeq: make(map[string]*types.Message, 2),
- message: make(chan *types.Message, 1), // 给容量为1的话可以确保收发顺序
- }
- }
- // 关闭连接
- func (c *ConnectionLogic) Close() error {
- select {
- case <-c.done:
- default:
- close(c.done)
- }
- return c.conn.Close()
- }
- // 读取消息
- func (c *ConnectionLogic) ReadMessage() (messageType int, p []byte, err error) {
- // 这里不能先获取锁,因为会被阻塞住,下面这个获取消息的方法是阻塞的,会导致锁一直得不到释放
- messageType, p, err = c.conn.ReadMessage()
- c.idleMu.Lock()
- defer func() {
- c.idleMu.Unlock()
- }()
- c.idle = time.Now()
- return
- }
- // 写消息
- func (c *ConnectionLogic) WriteMessage(messageType int, data []byte) error {
- c.idleMu.Lock()
- defer func() {
- c.idleMu.Unlock()
- }()
- err := c.conn.WriteMessage(messageType, data)
- c.idle = time.Now()
- return err
- }
- // 心跳检测
- func (c *ConnectionLogic) Keepalive() {
- idlerTimer := time.NewTimer(c.maxConnectionIdle)
- defer idlerTimer.Stop()
- for {
- select {
- case <-idlerTimer.C:
- c.idleMu.Lock()
- idle := c.idle
- if idle.IsZero() {
- idlerTimer.Reset(c.maxConnectionIdle)
- c.idleMu.Unlock()
- continue
- }
- val := c.maxConnectionIdle - time.Since(idle)
- if val <= 0 {
- c.idleMu.Unlock()
- c.Close()
- return
- }
- idlerTimer.Reset(val)
- c.idleMu.Unlock()
- case <-c.done:
- return
- }
- }
- }
- // 添加消息到队列中
- func (c *ConnectionLogic) appendMsgMq(msg *types.Message) {
- c.messageMu.Lock()
- defer c.messageMu.Unlock()
- if m, ok := c.readMessageSeq[msg.Id]; ok {
- // 已经有消息记录,该消息已经有ack确认
- if len(c.readMessageList) == 0 {
- // 队列中没有该消息
- return
- }
- if m.AckSeq >= msg.AckSeq {
- // 没有进行ack确认,或者重复
- return
- }
- c.readMessageSeq[msg.Id] = msg
- return
- }
- // 还没有进行ack确认,避免客户端重复发送多余的ack消息
- if msg.FrameType == constants.FrameAck {
- return
- }
- c.readMessageList = append(c.readMessageList, msg)
- c.readMessageSeq[msg.Id] = msg
- }
|