| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657 |
- package conversation
- import (
- "context"
- "fmt"
- "slowwildws/internal/server"
- "slowwildws/internal/svc"
- "slowwildws/internal/types"
- "slowwildws/internal/utils"
- "time"
- msgChat "git.banshen.xyz/huangguangrong/slow_wild_queue/types"
- "github.com/mitchellh/mapstructure"
- "github.com/zeromicro/go-zero/core/logx"
- )
- type ConversationLogic struct {
- logx.Logger
- svcCtx *svc.ServiceContext
- }
- func NewConversationLogic(ctx context.Context, svcCtx svc.ServiceContext) ConversationLogic {
- return ConversationLogic{
- Logger: logx.WithContext(ctx),
- svcCtx: &svcCtx,
- }
- }
- func (l *ConversationLogic) ConversationChat(srv *server.WebsocketServer, conn *server.ConnectionServer, msg *types.Message) {
- var data msgChat.Chat
- if err := mapstructure.Decode(msg.Data, &data); err != nil {
- srv.Send(srv.NewErrMessage(err), conn)
- return
- }
- switch data.ChatType {
- case msgChat.SingleChatType:
- data.ConversationId = utils.CombineUserID(conn.Uid, data.RecvID)
- case msgChat.GroupChatType:
- data.ConversationId = utils.UserIdToHex(data.RecvID)
- }
- // 推送到kafka
- err := l.svcCtx.MsgChatTransferClient.Push(&msgChat.MsgChatTransfer{
- MsgId: msg.Id,
- ConversationId: data.ConversationId,
- ChatType: msgChat.ChatType(data.ChatType),
- SendID: conn.Uid,
- RecvID: data.RecvID,
- SendTime: time.Now().Unix(),
- MType: msgChat.MType(data.MType),
- Content: data.Content,
- })
- if err != nil {
- fmt.Println("消息推送失败")
- srv.Send(srv.NewErrMessage(err), conn)
- return
- }
- }
|