package conversation import ( "context" "fmt" "slowwildws/internal/server" "slowwildws/internal/svc" "slowwildws/internal/types" "slowwildws/internal/utils" "time" msgChat "git.banshen.xyz/huangguangrong/slow_wild_queue/types" "github.com/mitchellh/mapstructure" "github.com/zeromicro/go-zero/core/logx" ) type ConversationLogic struct { logx.Logger svcCtx *svc.ServiceContext } func NewConversationLogic(ctx context.Context, svcCtx svc.ServiceContext) ConversationLogic { return ConversationLogic{ Logger: logx.WithContext(ctx), svcCtx: &svcCtx, } } func (l *ConversationLogic) ConversationChat(srv *server.WebsocketServer, conn *server.ConnectionServer, msg *types.Message) { var data msgChat.Chat if err := mapstructure.Decode(msg.Data, &data); err != nil { srv.Send(srv.NewErrMessage(err), conn) return } switch data.ChatType { case msgChat.SingleChatType: data.ConversationId = utils.CombineUserID(conn.Uid, data.RecvID) case msgChat.GroupChatType: data.ConversationId = utils.UserIdToHex(data.RecvID) } // 推送到kafka err := l.svcCtx.MsgChatTransferClient.Push(&msgChat.MsgChatTransfer{ MsgId: msg.Id, ConversationId: data.ConversationId, ChatType: msgChat.ChatType(data.ChatType), SendID: conn.Uid, RecvID: data.RecvID, SendTime: time.Now().Unix(), MType: msgChat.MType(data.MType), Content: data.Content, }) if err != nil { fmt.Println("消息推送失败") srv.Send(srv.NewErrMessage(err), conn) return } }