msgChatTransfer.go 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. package msgTransfer
  2. import (
  3. "context"
  4. "encoding/json"
  5. "go.mongodb.org/mongo-driver/bson/primitive"
  6. "paopaoimtask/internal/svc"
  7. "paopaoimtask/mq"
  8. "paopaoimtask/pkg/bitmap"
  9. "paopaoimws/immodels"
  10. "paopaoimws/ws"
  11. "strconv"
  12. )
  13. /**
  14. 消息消费
  15. */
  16. type MsgChatTransfer struct {
  17. *baseMsgTransfer
  18. }
  19. func NewMsgChatTransfer(svc *svc.ServiceContext) *MsgChatTransfer {
  20. return &MsgChatTransfer{
  21. NewBaseMsgTransfer(svc),
  22. }
  23. }
  24. // 消息消费的实现方法
  25. func (m *MsgChatTransfer) Consume(ctx context.Context, key, value string) error {
  26. var (
  27. data mq.MsgChatTransfer
  28. msgId = primitive.NewObjectID()
  29. )
  30. if err := json.Unmarshal([]byte(value), &data); err != nil {
  31. return err
  32. }
  33. if err := m.addChatLog(ctx, msgId, &data); err != nil {
  34. return err
  35. }
  36. return m.Transfer(ctx, &ws.Push{
  37. MsgID: msgId.Hex(),
  38. ConversationId: data.ConversationId,
  39. ChatType: data.ChatType,
  40. SendID: data.SendID,
  41. RecvID: data.RecvID,
  42. RecvIDs: data.RecvIDs,
  43. SendTime: data.SendTime,
  44. MType: data.MType,
  45. Content: data.Content,
  46. })
  47. }
  48. // 记录消息到mongodb
  49. func (m *MsgChatTransfer) addChatLog(ctx context.Context, msgId primitive.ObjectID, data *mq.MsgChatTransfer) error {
  50. //记录消息
  51. chatlog := immodels.ChatLog{
  52. ID: msgId,
  53. ConversationId: data.ConversationId,
  54. SendId: strconv.FormatInt(data.SendID, 10),
  55. RecvId: strconv.FormatInt(data.RecvID, 10),
  56. MsgFrom: 0,
  57. MsgType: data.MType,
  58. MsgContent: data.Content,
  59. SendTime: data.SendTime,
  60. ChatType: data.ChatType,
  61. }
  62. //设置自己为已读状态
  63. readRecords := bitmap.NewBitMap(0)
  64. readRecords.Set(data.SendID)
  65. chatlog.ReadRecords = readRecords.Export()
  66. err := m.svcCtx.ChatLogModel.Insert(ctx, &chatlog)
  67. if err != nil {
  68. return err
  69. }
  70. return m.svcCtx.ConversationModel.UpdateMsg(ctx, &chatlog)
  71. }