conversationlogic.go 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. package conversation
  2. import (
  3. "context"
  4. "fmt"
  5. "slowwildws/internal/server"
  6. "slowwildws/internal/svc"
  7. "slowwildws/internal/types"
  8. "slowwildws/internal/utils"
  9. "time"
  10. msgChat "git.banshen.xyz/huangguangrong/slow_wild_queue/types"
  11. "github.com/mitchellh/mapstructure"
  12. "github.com/zeromicro/go-zero/core/logx"
  13. )
  14. type ConversationLogic struct {
  15. logx.Logger
  16. svcCtx *svc.ServiceContext
  17. }
  18. func NewConversationLogic(ctx context.Context, svcCtx svc.ServiceContext) ConversationLogic {
  19. return ConversationLogic{
  20. Logger: logx.WithContext(ctx),
  21. svcCtx: &svcCtx,
  22. }
  23. }
  24. func (l *ConversationLogic) ConversationChat(srv *server.WebsocketServer, conn *server.ConnectionServer, msg *types.Message) {
  25. var data msgChat.Chat
  26. if err := mapstructure.Decode(msg.Data, &data); err != nil {
  27. srv.Send(srv.NewErrMessage(err), conn)
  28. return
  29. }
  30. switch data.ChatType {
  31. case msgChat.SingleChatType:
  32. data.ConversationId = utils.CombineUserID(conn.Uid, data.RecvID)
  33. case msgChat.GroupChatType:
  34. data.ConversationId = utils.UserIdToHex(data.RecvID)
  35. }
  36. // 推送到kafka
  37. err := l.svcCtx.MsgChatTransferClient.Push(&msgChat.MsgChatTransfer{
  38. MsgId: msg.Id,
  39. ConversationId: data.ConversationId,
  40. ChatType: msgChat.ChatType(data.ChatType),
  41. SendID: conn.Uid,
  42. RecvID: data.RecvID,
  43. SendTime: time.Now().Unix(),
  44. MType: msgChat.MType(data.MType),
  45. Content: data.Content,
  46. })
  47. if err != nil {
  48. fmt.Println("消息推送失败")
  49. srv.Send(srv.NewErrMessage(err), conn)
  50. return
  51. }
  52. }