websocketserver.go 7.2 KB


  1. package server
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "slowwildws/internal/config"
  7. "slowwildws/internal/constants"
  8. "slowwildws/internal/service"
  9. "slowwildws/internal/types"
  10. "sync"
  11. "time"
  12. slowWildQueue "git.banshen.xyz/huangguangrong/slow_wild_queue"
  13. "github.com/gorilla/websocket"
  14. "github.com/zeromicro/go-zero/core/logx"
  15. "github.com/zeromicro/go-zero/core/threading"
  16. )
  17. type WebsocketServer struct {
  18. sync.RWMutex // 读写锁
  19. connToUser map[*ConnectionServer]int64
  20. userToConn map[int64]*ConnectionServer
  21. upgrader websocket.Upgrader
  22. logx.Logger
  23. opt *serverOption
  24. ctx context.Context
  25. *threading.TaskRunner
  26. messageServer *MessageServer
  27. slowwildService *service.SlowWildService
  28. msgChatTransferClient slowWildQueue.MsgChatTransferClient
  29. }
  30. func NewWebsockerServer(conf config.Config, opts ...ServerOptions) *WebsocketServer {
  31. opt := newServerOptions(conf, opts...)
  32. return &WebsocketServer{
  33. upgrader: websocket.Upgrader{
  34. CheckOrigin: func(r *http.Request) bool {
  35. return true
  36. },
  37. },
  38. Logger: logx.WithContext(context.Background()),
  39. connToUser: make(map[*ConnectionServer]int64),
  40. userToConn: make(map[int64]*ConnectionServer),
  41. opt: &opt,
  42. TaskRunner: threading.NewTaskRunner(opt.concurrency),
  43. messageServer: NewMessageServer(),
  44. ctx: context.Background(), // 后期考虑使用传递进来的ctx
  45. slowwildService: service.NewSlowWildService(),
  46. msgChatTransferClient: slowWildQueue.NewMsgChatTransferClient(conf.MsgChatTransfer.Addrs, conf.MsgChatTransfer.Topic),
  47. }
  48. }
  49. // 初始化一个连接信息
  50. func (s *WebsocketServer) InitConn(ctx context.Context, conn *websocket.Conn, conf config.Config) *ConnectionServer {
  51. return NewConnectionServer(ctx, conf, conn)
  52. }
  53. // 把连接添加到map中
  54. func (s *WebsocketServer) AddConn(conn *ConnectionServer, ctx context.Context) {
  55. uid := ctx.Value(constants.UserID).(int64)
  56. fmt.Println("获取到的用户id: ", uid)
  57. s.RWMutex.Lock()
  58. defer s.RWMutex.Unlock()
  59. // 验证用户是否登录过
  60. if c := s.userToConn[uid]; c != nil {
  61. c.Close()
  62. }
  63. s.connToUser[conn] = uid
  64. s.userToConn[uid] = conn
  65. // todo 是否应该再这里加上用户在线标识
  66. }
  67. // 获取存储的用户id
  68. func (s *WebsocketServer) GetUserId(conn *ConnectionServer) int64 {
  69. s.RWMutex.RLock()
  70. defer s.RWMutex.RUnlock()
  71. if conn == nil {
  72. return 0
  73. }
  74. return s.connToUser[conn]
  75. }
  76. // 关闭这个连接服务
  77. func (s *WebsocketServer) Close(conn *ConnectionServer) {
  78. s.RWMutex.Lock()
  79. defer s.RWMutex.Unlock()
  80. uid := s.connToUser[conn]
  81. if uid == 0 {
  82. // 已经关闭了
  83. return
  84. }
  85. delete(s.connToUser, conn)
  86. delete(s.userToConn, uid)
  87. conn.Close()
  88. }
  89. // 判断是否需要ack确认
  90. func (s *WebsocketServer) IsAck(message *types.Message) bool {
  91. if message == nil {
  92. return s.opt.ack != constants.NoAck
  93. }
  94. if message.FormID == constants.SYSTEM_ROOT_UID {
  95. //超级管理员不需要这个ack确认机制,直接发
  96. return false
  97. }
  98. return s.opt.ack != constants.NoAck && message.FrameType != constants.FrameNoAck
  99. }
  100. // 读取消息的ack
  101. func (s *WebsocketServer) ReadAck(conn *ConnectionServer) {
  102. for {
  103. select {
  104. case <-conn.done:
  105. s.Infof("close message ack uid %v", conn.Uid)
  106. return
  107. default:
  108. }
  109. conn.messageMu.Lock()
  110. if len(conn.readMessageList) == 0 {
  111. conn.messageMu.Unlock()
  112. // 没有消息的话让其休眠100毫秒再进行下一次判定
  113. time.Sleep(100 * time.Millisecond)
  114. continue
  115. }
  116. // 读取第一条消息
  117. message := conn.readMessageList[0]
  118. //判断ack的方式
  119. switch s.opt.ack {
  120. case constants.OnlyAck:
  121. // 直接给客户端回复
  122. s.messageServer.Send(&types.Message{
  123. FrameType: constants.FrameAck,
  124. Id: message.Id,
  125. AckSeq: message.AckSeq + 1,
  126. Data: "ack 应答",
  127. }, conn)
  128. // 进行业务处理,从队列中移除
  129. conn.readMessageList = conn.readMessageList[1:]
  130. conn.messageMu.Unlock()
  131. conn.message <- message
  132. case constants.RigorAck:
  133. if message.AckSeq == 0 {
  134. // 还没有确认
  135. conn.readMessageList[0].AckSeq++ //记录确认序号加1
  136. conn.readMessageList[0].AckTime = time.Now() // 记录确认时间
  137. s.messageServer.Send(&types.Message{
  138. FrameType: constants.FrameAck,
  139. Id: message.Id,
  140. AckSeq: message.AckSeq + 1,
  141. }, conn)
  142. s.Infof("message ack RigorAck send mid %v, seq %v, time %v", message.Id, message.AckSeq, message.AckTime)
  143. conn.messageMu.Unlock()
  144. continue
  145. }
  146. // 再验证
  147. // 1.客户端返回结果,再一次确认
  148. msgSeq := conn.readMessageSeq[message.Id]
  149. if msgSeq.AckSeq > message.AckSeq {
  150. // 进行业务处理,从队列中移除
  151. conn.readMessageList = conn.readMessageList[1:]
  152. conn.messageMu.Unlock()
  153. conn.message <- message
  154. s.Infof("message ack rigorAck success mid %v", message.Id)
  155. continue
  156. }
  157. // 2. 客户端没有确认,考虑是否再次发送确认消息
  158. val := s.opt.ackTimeout - time.Since(message.AckTime)
  159. fmt.Println("超时时间: ", val)
  160. // 2.1 超过结束确认,抛出错误
  161. if !message.AckTime.IsZero() && val <= 0 {
  162. delete(conn.readMessageSeq, message.Id)
  163. conn.readMessageList = conn.readMessageList[1:]
  164. conn.messageMu.Unlock()
  165. continue
  166. }
  167. // 2.2 未超过,超新发送
  168. conn.messageMu.Unlock()
  169. s.messageServer.Send(&types.Message{
  170. FrameType: constants.FrameAck,
  171. Id: message.Id,
  172. AckSeq: message.AckSeq,
  173. }, conn)
  174. time.Sleep(3 * time.Second)
  175. }
  176. }
  177. }
  178. // 任务处理,读取消息体中的method方法,判断是否需要执行
  179. func (s *WebsocketServer) HandleWrite(conn *ConnectionServer) {
  180. for {
  181. select {
  182. case <-conn.done:
  183. //连接关闭
  184. return
  185. case message := <-conn.message:
  186. switch message.FrameType {
  187. case constants.FramePing:
  188. s.messageServer.Send(&types.Message{FrameType: constants.FramePing}, conn)
  189. case constants.FrameData, constants.FrameNoAck:
  190. fmt.Println("收到消息:", message)
  191. if handler := s.slowwildService.GetMethodHandler(message.Method, s.ctx); handler != nil {
  192. messageData, err := handler.ChatHandler(message, conn.Uid)
  193. if err != nil {
  194. fmt.Println("执行失败:", err)
  195. s.messageServer.Send(s.NewErrMessage(err), conn)
  196. }
  197. // 推送到kafka由其他服务消费
  198. err = s.msgChatTransferClient.Push(messageData)
  199. if err != nil {
  200. fmt.Println("执行失败2:", err)
  201. s.messageServer.Send(s.NewErrMessage(err), conn)
  202. }
  203. } else {
  204. s.messageServer.Send(&types.Message{FrameType: constants.FrameData, Data: []byte(fmt.Sprintf("不存在执行的方法 %v 请检查", message.Method))}, conn)
  205. }
  206. }
  207. if s.opt.ack != constants.NoAck {
  208. // 删除ack消息的序号记录
  209. conn.messageMu.Lock()
  210. delete(conn.readMessageSeq, message.Id)
  211. conn.messageMu.Unlock()
  212. }
  213. }
  214. }
  215. }
  216. func (s *WebsocketServer) Send(msg interface{}, conns ...*ConnectionServer) error {
  217. return s.messageServer.Send(msg, conns...)
  218. }
  219. func (s *WebsocketServer) NewErrMessage(err error) *types.Message {
  220. return &types.Message{
  221. FrameType: constants.FrameData,
  222. Data: err.Error(),
  223. }
  224. }
  225. func (s *WebsocketServer) GetConn(uid int64) *ConnectionServer {
  226. s.RWMutex.RLock()
  227. defer s.RWMutex.RUnlock()
  228. return s.userToConn[uid]
  229. }