msgChatTransfer.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package msgTransfer
  2. import (
  3. "context"
  4. "encoding/json"
  5. "slowwild/internal/immodel"
  6. "slowwild/internal/svc"
  7. "slowwild/internal/utils"
  8. "strconv"
  9. msgChat "git.banshen.xyz/huangguangrong/slow_wild_queue/types"
  10. "go.mongodb.org/mongo-driver/bson/primitive"
  11. )
  12. /**
  13. 消息消费
  14. */
  15. type MsgChatTransfer struct {
  16. *BaseMsgTransfer
  17. }
  18. func NewMsgChatTransfer(svc *svc.ServiceContext) *MsgChatTransfer {
  19. return &MsgChatTransfer{
  20. NewBaseMsgTransfer(svc),
  21. }
  22. }
  23. // 消息消费的实现方法
  24. func (m *MsgChatTransfer) Consume(ctx context.Context, key, value string) error {
  25. var (
  26. data msgChat.MsgChatTransfer
  27. msgId = primitive.NewObjectID()
  28. )
  29. if err := json.Unmarshal([]byte(value), &data); err != nil {
  30. return err
  31. }
  32. if err := m.addChatLog(ctx, msgId, &data); err != nil {
  33. return err
  34. }
  35. return m.Transfer(ctx, &msgChat.Push{
  36. MsgID: msgId.Hex(),
  37. ConversationId: data.ConversationId,
  38. ChatType: data.ChatType,
  39. SendID: data.SendID,
  40. RecvID: data.RecvID,
  41. RecvIDs: data.RecvIDs,
  42. SendTime: data.SendTime,
  43. MType: data.MType,
  44. Content: data.Content,
  45. })
  46. }
  47. // 记录消息到mongodb
  48. func (m *MsgChatTransfer) addChatLog(ctx context.Context, msgId primitive.ObjectID, data *msgChat.MsgChatTransfer) error {
  49. //记录消息
  50. chatlog := immodel.ChatLog{
  51. ID: msgId,
  52. ConversationId: data.ConversationId,
  53. SendId: strconv.FormatInt(data.SendID, 10),
  54. RecvId: strconv.FormatInt(data.RecvID, 10),
  55. MsgType: data.MType,
  56. MsgContent: data.Content,
  57. SendTime: data.SendTime,
  58. ChatType: data.ChatType,
  59. }
  60. //设置自己为已读状态
  61. readRecords := utils.NewBitMap(0)
  62. readRecords.Set(data.SendID)
  63. chatlog.ReadRecords = readRecords.Export()
  64. err := m.svcCtx.ChatLogModel.Insert(ctx, &chatlog)
  65. if err != nil {
  66. return err
  67. }
  68. return m.svcCtx.ConversationModel.UpdateMsg(ctx, &chatlog)
  69. }