connectionserver.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package server
  2. import (
  3. "context"
  4. "slowwildws/internal/config"
  5. "slowwildws/internal/constants"
  6. "slowwildws/internal/types"
  7. "sync"
  8. "time"
  9. "github.com/gorilla/websocket"
  10. "github.com/zeromicro/go-zero/core/logx"
  11. )
  12. type ConnectionServer struct {
  13. logx.Logger
  14. ctx context.Context
  15. conn *websocket.Conn
  16. idleMu sync.Mutex // 超时锁
  17. idle time.Time // 闲置时间
  18. maxConnectionIdle time.Duration // 最大闲置时间,也就是空闲时间
  19. messageMu sync.Mutex // 消息锁
  20. readMessageList []*types.Message // 读取消息队列
  21. readMessageSeq map[string]*types.Message // 读取消息序列化
  22. done chan struct{} // 结束方法
  23. message chan *types.Message // 消息通道
  24. Uid int64 // 用户id
  25. }
  26. func NewConnectionServer(ctx context.Context, conf config.Config, conn *websocket.Conn) *ConnectionServer {
  27. return &ConnectionServer{
  28. Logger: logx.WithContext(ctx),
  29. ctx: ctx,
  30. conn: conn,
  31. idle: time.Now(),
  32. maxConnectionIdle: time.Duration(conf.MaxConnectionIdle) * time.Second,
  33. done: make(chan struct{}),
  34. readMessageList: make([]*types.Message, 0, 2),
  35. readMessageSeq: make(map[string]*types.Message, 2),
  36. message: make(chan *types.Message, 1), // 给容量为1的话可以确保收发顺序
  37. }
  38. }
  39. // 关闭连接
  40. func (c *ConnectionServer) Close() error {
  41. select {
  42. case <-c.done:
  43. default:
  44. close(c.done)
  45. }
  46. return c.conn.Close()
  47. }
  48. // 读取消息
  49. func (c *ConnectionServer) ReadMessage() (messageType int, p []byte, err error) {
  50. // 这里不能先获取锁,因为会被阻塞住,下面这个获取消息的方法是阻塞的,会导致锁一直得不到释放
  51. messageType, p, err = c.conn.ReadMessage()
  52. c.idleMu.Lock()
  53. defer func() {
  54. c.idleMu.Unlock()
  55. }()
  56. c.idle = time.Now()
  57. return
  58. }
  59. // 写消息
  60. func (c *ConnectionServer) WriteMessage(messageType int, data []byte) error {
  61. c.idleMu.Lock()
  62. defer func() {
  63. c.idleMu.Unlock()
  64. }()
  65. err := c.conn.WriteMessage(messageType, data)
  66. c.idle = time.Now()
  67. return err
  68. }
  69. // 心跳检测
  70. func (c *ConnectionServer) Keepalive() {
  71. idlerTimer := time.NewTimer(c.maxConnectionIdle)
  72. defer idlerTimer.Stop()
  73. for {
  74. select {
  75. case <-idlerTimer.C:
  76. c.idleMu.Lock()
  77. idle := c.idle
  78. if idle.IsZero() {
  79. idlerTimer.Reset(c.maxConnectionIdle)
  80. c.idleMu.Unlock()
  81. continue
  82. }
  83. val := c.maxConnectionIdle - time.Since(idle)
  84. if val <= 0 {
  85. c.idleMu.Unlock()
  86. c.Close()
  87. return
  88. }
  89. idlerTimer.Reset(val)
  90. c.idleMu.Unlock()
  91. case <-c.done:
  92. return
  93. }
  94. }
  95. }
  96. // 添加消息到队列中
  97. func (c *ConnectionServer) AppendMsgMq(msg *types.Message) {
  98. c.messageMu.Lock()
  99. defer c.messageMu.Unlock()
  100. if m, ok := c.readMessageSeq[msg.Id]; ok {
  101. // 已经有消息记录,该消息已经有ack确认
  102. if len(c.readMessageList) == 0 {
  103. // 队列中没有该消息
  104. return
  105. }
  106. if m.AckSeq >= msg.AckSeq {
  107. // 没有进行ack确认,或者重复
  108. return
  109. }
  110. c.readMessageSeq[msg.Id] = msg
  111. return
  112. }
  113. // 还没有进行ack确认,避免客户端重复发送多余的ack消息
  114. if msg.FrameType == constants.FrameAck {
  115. return
  116. }
  117. c.readMessageList = append(c.readMessageList, msg)
  118. c.readMessageSeq[msg.Id] = msg
  119. }
  120. // 写入消息到通道中
  121. func (c *ConnectionServer) WriteMessageToChan(message *types.Message) {
  122. c.message <- message
  123. }