Parcourir la source

增加私聊逻辑

v_grhhuang il y a 8 mois
Parent
commit
be01e8ba89

+ 7 - 0
etc/slowwildws-api.yaml

@@ -3,6 +3,13 @@ Host: 0.0.0.0
 Port: 9999
 
 MaxConnectionIdle: 300
+DefaultMaxConnectionIdleTime: 300
+DefaultAckTimeout: 120
+DefaultConcurrency: 10
+
+Auth:
+  AccessSecret: "your-secret-key"
+  AccessExpire: 3600
 
 MsgChatTransfer:
   Topic: msgChatTransfer

+ 3 - 1
internal/logic/connectionlogic.go

@@ -33,17 +33,18 @@ func (c *ConnectionLogic) HandlerConn(conn *server.ConnectionServer) {
 		c.svcCtx.WsServer.Close(conn)
 		return
 	}
-	fmt.Println(fmt.Sprintf("用户id:%d", uid))
 	conn.Uid = uid
 	// 处理任务
 	go c.svcCtx.WsServer.HandleWrite(conn)
 
 	if c.svcCtx.WsServer.IsAck(nil) {
+		fmt.Println("read ack")
 		go c.svcCtx.WsServer.ReadAck(conn)
 	}
 	for {
 		// 获取消息
 		_, msg, err := conn.ReadMessage()
+		fmt.Println("read message")
 		if err != nil {
 			c.Logger.Errorf("websocket conn read message error %v", err)
 			c.svcCtx.WsServer.Close(conn)
@@ -60,6 +61,7 @@ func (c *ConnectionLogic) HandlerConn(conn *server.ConnectionServer) {
 			c.svcCtx.WsServer.Logger.Infof("conn message read ack msg %v", message)
 			conn.AppendMsgMq(&message)
 		} else {
+			fmt.Println("write message")
 			conn.WriteMessageToChan(&message)
 		}
 	}

+ 18 - 0
internal/queue/listen.go

@@ -0,0 +1,18 @@
+package queue
+
+import (
+	"context"
+	"slowwildws/internal/config"
+	"slowwildws/internal/svc"
+
+	"github.com/zeromicro/go-queue/kq"
+	"github.com/zeromicro/go-zero/core/service"
+)
+
+func Consumers(c config.Config, ctx context.Context, svcContext *svc.ServiceContext) []service.Service {
+
+	return []service.Service{
+		kq.MustNewQueue(c.MsgPushToClientTransfer, NewMsgSendConsumer(ctx, svcContext)),
+	}
+
+}

+ 45 - 0
internal/queue/msgsendconsumer.go

@@ -0,0 +1,45 @@
+package queue
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"slowwildws/internal/svc"
+
+	"git.banshen.xyz/huangguangrong/slow_wild_queue/types"
+)
+
+type MsgSendConsumer struct {
+	ctx    context.Context
+	svcCtx *svc.ServiceContext
+	//Logger logx.Logger
+}
+
+func NewMsgSendConsumer(ctx context.Context, svc *svc.ServiceContext) *MsgSendConsumer {
+	return &MsgSendConsumer{
+		ctx:    ctx,
+		svcCtx: svc,
+		//Logger: logx.WithContext(ctx),
+	}
+}
+
+func (m *MsgSendConsumer) Consume(ctx context.Context, key, val string) error {
+	if val == "" {
+		return nil
+	}
+	fmt.Println("接到的消息:", val)
+	var msgData types.MessagePushData
+	err := json.Unmarshal([]byte(val), &msgData)
+	if err != nil {
+		fmt.Println("参数解析失败")
+		return err
+	}
+	// 获取用户的conn
+	conn := m.svcCtx.WsServer.GetConn(msgData.UserID)
+	if conn == nil {
+		fmt.Println("获取用户的链接信息失败")
+		return nil
+	}
+	m.svcCtx.WsServer.Send(msgData, conn)
+	return nil
+}

+ 1 - 2
internal/server/optionsserver.go

@@ -12,8 +12,7 @@ type serverOption struct {
 	maxConnectionIdle time.Duration     // 连接空闲超时时间
 	ack               constants.AckType // ack类型
 	ackTimeout        time.Duration     // ack等待超时时间
-
-	concurrency int // 并发数
+	concurrency       int               // 并发数
 }
 
 func newServerOptions(conf config.Config, opts ...ServerOptions) serverOption {

+ 36 - 10
internal/server/websocketserver.go

@@ -6,10 +6,12 @@ import (
 	"net/http"
 	"slowwildws/internal/config"
 	"slowwildws/internal/constants"
+	"slowwildws/internal/service"
 	"slowwildws/internal/types"
 	"sync"
 	"time"
 
+	slowWildQueue "git.banshen.xyz/huangguangrong/slow_wild_queue"
 	"github.com/gorilla/websocket"
 	"github.com/zeromicro/go-zero/core/logx"
 	"github.com/zeromicro/go-zero/core/threading"
@@ -22,9 +24,11 @@ type WebsocketServer struct {
 	upgrader     websocket.Upgrader
 	logx.Logger
 	opt *serverOption
+	ctx context.Context
 	*threading.TaskRunner
-	ctx           context.Context
-	messageServer *MessageServer
+	messageServer         *MessageServer
+	slowwildService       *service.SlowWildService
+	msgChatTransferClient slowWildQueue.MsgChatTransferClient
 }
 
 func NewWebsockerServer(conf config.Config, opts ...ServerOptions) *WebsocketServer {
@@ -35,12 +39,15 @@ func NewWebsockerServer(conf config.Config, opts ...ServerOptions) *WebsocketSer
 				return true
 			},
 		},
-		Logger:        logx.WithContext(context.Background()),
-		connToUser:    make(map[*ConnectionServer]int64),
-		userToConn:    make(map[int64]*ConnectionServer),
-		opt:           &opt,
-		TaskRunner:    threading.NewTaskRunner(opt.concurrency),
-		messageServer: NewMessageServer(),
+		Logger:                logx.WithContext(context.Background()),
+		connToUser:            make(map[*ConnectionServer]int64),
+		userToConn:            make(map[int64]*ConnectionServer),
+		opt:                   &opt,
+		TaskRunner:            threading.NewTaskRunner(opt.concurrency),
+		messageServer:         NewMessageServer(),
+		ctx:                   context.Background(), // 后期考虑使用传递进来的ctx
+		slowwildService:       service.NewSlowWildService(),
+		msgChatTransferClient: slowWildQueue.NewMsgChatTransferClient(conf.MsgChatTransfer.Addrs, conf.MsgChatTransfer.Topic),
 	}
 }
 
@@ -132,6 +139,7 @@ func (s *WebsocketServer) ReadAck(conn *ConnectionServer) {
 				FrameType: constants.FrameAck,
 				Id:        message.Id,
 				AckSeq:    message.AckSeq + 1,
+				Data:      "ack 应答",
 			}, conn)
 			// 进行业务处理,从队列中移除
 			conn.readMessageList = conn.readMessageList[1:]
@@ -199,8 +207,19 @@ func (s *WebsocketServer) HandleWrite(conn *ConnectionServer) {
 			case constants.FramePing:
 				s.messageServer.Send(&types.Message{FrameType: constants.FramePing}, conn)
 			case constants.FrameData, constants.FrameNoAck:
-				if handler, ok := message.Method; ok {
-					handler(s, conn, message)
+				fmt.Println("收到消息:", message)
+				if handler := s.slowwildService.GetMethodHandler(message.Method, s.ctx); handler != nil {
+					messageData, err := handler.ChatHandler(message, conn.Uid)
+					if err != nil {
+						fmt.Println("执行失败:", err)
+						s.messageServer.Send(s.NewErrMessage(err), conn)
+					}
+					// 推送到kafka由其他服务消费
+					err = s.msgChatTransferClient.Push(messageData)
+					if err != nil {
+						fmt.Println("执行失败2:", err)
+						s.messageServer.Send(s.NewErrMessage(err), conn)
+					}
 				} else {
 					s.messageServer.Send(&types.Message{FrameType: constants.FrameData, Data: []byte(fmt.Sprintf("不存在执行的方法 %v 请检查", message.Method))}, conn)
 				}
@@ -226,3 +245,10 @@ func (s *WebsocketServer) NewErrMessage(err error) *types.Message {
 		Data:      err.Error(),
 	}
 }
+
+func (s *WebsocketServer) GetConn(uid int64) *ConnectionServer {
+	s.RWMutex.RLock()
+	defer s.RWMutex.RUnlock()
+
+	return s.userToConn[uid]
+}

+ 16 - 19
internal/logic/conversation/conversationlogic.go → internal/service/conversation/conversationlogic.go

@@ -2,9 +2,6 @@ package conversation
 
 import (
 	"context"
-	"fmt"
-	"slowwildws/internal/server"
-	"slowwildws/internal/svc"
 	"slowwildws/internal/types"
 	"slowwildws/internal/utils"
 	"time"
@@ -16,42 +13,42 @@ import (
 
 type ConversationLogic struct {
 	logx.Logger
-	svcCtx *svc.ServiceContext
 }
 
-func NewConversationLogic(ctx context.Context, svcCtx svc.ServiceContext) ConversationLogic {
-	return ConversationLogic{
+func NewConversationLogic(ctx context.Context) *ConversationLogic {
+	return &ConversationLogic{
 		Logger: logx.WithContext(ctx),
-		svcCtx: &svcCtx,
 	}
 }
 
-func (l *ConversationLogic) ConversationChat(srv *server.WebsocketServer, conn *server.ConnectionServer, msg *types.Message) {
+func (l *ConversationLogic) ChatHandler(msg *types.Message, uid int64) (*msgChat.MsgChatTransfer, error) {
 	var data msgChat.Chat
 	if err := mapstructure.Decode(msg.Data, &data); err != nil {
-		srv.Send(srv.NewErrMessage(err), conn)
-		return
+		return nil, err
 	}
+	// 设置发送者id
+	data.SendID = msg.UserID
+	// 设置接收者id
+	data.RecvID = msg.FormID
+	// 设置消息id
+	data.Msg.MsgID = msg.Id
+	// 设置消息发送时间
+	data.SendTime = time.Now().Unix()
 	switch data.ChatType {
 	case msgChat.SingleChatType:
-		data.ConversationId = utils.CombineUserID(conn.Uid, data.RecvID)
+		data.ConversationId = utils.CombineUserID(uid, data.RecvID)
 	case msgChat.GroupChatType:
 		data.ConversationId = utils.UserIdToHex(data.RecvID)
 	}
-	// 推送到kafka
-	err := l.svcCtx.MsgChatTransferClient.Push(&msgChat.MsgChatTransfer{
+	msgData := &msgChat.MsgChatTransfer{
 		MsgId:          msg.Id,
 		ConversationId: data.ConversationId,
 		ChatType:       msgChat.ChatType(data.ChatType),
-		SendID:         conn.Uid,
+		SendID:         uid,
 		RecvID:         data.RecvID,
 		SendTime:       time.Now().Unix(),
 		MType:          msgChat.MType(data.MType),
 		Content:        data.Content,
-	})
-	if err != nil {
-		fmt.Println("消息推送失败")
-		srv.Send(srv.NewErrMessage(err), conn)
-		return
 	}
+	return msgData, nil
 }

+ 29 - 0
internal/service/service.go

@@ -0,0 +1,29 @@
+package service
+
+import (
+	"context"
+	"slowwildws/internal/service/conversation"
+	"slowwildws/internal/types"
+
+	msgChat "git.banshen.xyz/huangguangrong/slow_wild_queue/types"
+)
+
+type MessageService interface {
+	ChatHandler(msg *types.Message, uid int64) (*msgChat.MsgChatTransfer, error)
+}
+
+type SlowWildService struct {
+}
+
+func NewSlowWildService() *SlowWildService {
+	return &SlowWildService{}
+}
+
+func (s SlowWildService) GetMethodHandler(method string, ctx context.Context) MessageService {
+	switch method {
+	case "chat":
+		return conversation.NewConversationLogic(ctx)
+	default:
+		return nil
+	}
+}

+ 0 - 2
internal/svc/servicecontext.go

@@ -16,7 +16,6 @@ type ServiceContext struct {
 	WsServer              *server.WebsocketServer
 	UserMiddleware        rest.Middleware
 	MsgChatTransferClient slowWildQueue.MsgChatTransferClient
-	MsgReadTransferClient slowWildQueue.MsgReadTransferClient
 }
 
 func NewServiceContext(c config.Config) *ServiceContext {
@@ -25,6 +24,5 @@ func NewServiceContext(c config.Config) *ServiceContext {
 		WsServer:              server.NewWebsockerServer(c, server.WithServerMaxConnectionIdle(time.Duration(c.MaxConnectionIdle)*time.Second), server.WithServerAck(constants.NoAck)),
 		UserMiddleware:        middleware.NewJwtMiddleware(c.Auth.AccessSecret).Handle,
 		MsgChatTransferClient: slowWildQueue.NewMsgChatTransferClient(c.MsgChatTransfer.Addrs, c.MsgChatTransfer.Topic),
-		MsgReadTransferClient: slowWildQueue.NewMsgReadTransferClient(c.MsgReadTransfer.Addrs, c.MsgReadTransfer.Topic),
 	}
 }

+ 16 - 0
slowwildws.go

@@ -1,14 +1,17 @@
 package main
 
 import (
+	"context"
 	"flag"
 	"fmt"
 
 	"slowwildws/internal/config"
 	"slowwildws/internal/handler"
+	"slowwildws/internal/queue"
 	"slowwildws/internal/svc"
 
 	"github.com/zeromicro/go-zero/core/conf"
+	"github.com/zeromicro/go-zero/core/service"
 	"github.com/zeromicro/go-zero/rest"
 )
 
@@ -26,6 +29,19 @@ func main() {
 	ctx := svc.NewServiceContext(c)
 	handler.RegisterHandlers(server, ctx)
 
+	go func(svc *svc.ServiceContext, conf config.Config) {
+		// 创建消息消费队列
+		serviceGroup := service.NewServiceGroup()
+		services := queue.Consumers(conf, context.Background(), svc)
+		defer serviceGroup.Stop()
+
+		for _, server := range services {
+			serviceGroup.Add(server)
+		}
+		fmt.Printf("starting websocket consumer queue server....\n")
+		serviceGroup.Start()
+	}(ctx, c)
+
 	fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)
 	server.Start()
 }