tweet2EsTransfer.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. package tweetTransfer
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/jinzhu/copier"
  7. "github.com/zeromicro/go-zero/core/logx"
  8. "paopaoimrpc/apps/client/paopaouser"
  9. "paopaoimtask/internal/svc"
  10. "paopaoimtask/mq"
  11. "paopaoimtask/pkg/es"
  12. "strings"
  13. "sync"
  14. )
  15. /**
  16. 从kafka中读取数据写入到es中和redis中
  17. */
  18. type Tweet2EsTransfer struct {
  19. svcCtx *svc.ServiceContext
  20. logx.Logger
  21. }
  22. func NewTweet2EsTransfer(svc *svc.ServiceContext) *Tweet2EsTransfer {
  23. return &Tweet2EsTransfer{
  24. svcCtx: svc,
  25. Logger: logx.WithContext(context.Background()),
  26. }
  27. }
  28. func (b *Tweet2EsTransfer) Consume(ctx context.Context, key, value string) error {
  29. var (
  30. data mq.MsgTweetData
  31. cacheData es.TweetEsData
  32. )
  33. if err := json.Unmarshal([]byte(value), &data); err != nil {
  34. return err
  35. }
  36. copier.Copy(&cacheData, data)
  37. tags := make(map[string]int32)
  38. if data.Tags != "" {
  39. for _, s := range strings.Split(data.Tags, ",") {
  40. tags[s] = 1
  41. }
  42. }
  43. cacheData.Tags = tags
  44. // 解析之后写入es 和 redis
  45. wg := sync.WaitGroup{}
  46. wg.Add(2)
  47. var mu sync.Mutex
  48. var errors []error
  49. go func(esData *es.TweetEsData) {
  50. defer wg.Done()
  51. err := b.setEs(&cacheData)
  52. if err != nil {
  53. mu.Lock()
  54. errors = append(errors, err)
  55. mu.Unlock()
  56. }
  57. }(&cacheData)
  58. go func(esData *es.TweetEsData) {
  59. defer wg.Done()
  60. err := b.SetUserPostNum(&cacheData)
  61. if err != nil {
  62. mu.Lock()
  63. errors = append(errors, err)
  64. mu.Unlock()
  65. }
  66. }(&cacheData)
  67. wg.Wait()
  68. if len(errors) > 0 {
  69. return errors[0]
  70. }
  71. return nil
  72. }
  73. func (b *Tweet2EsTransfer) setEs(data *es.TweetEsData) error {
  74. var tweetIndex es.TweetIndex
  75. err := tweetIndex.Create(data, b.svcCtx.EsClient, b.svcCtx.Config.ElasticSearchConf.TweetIndex)
  76. if err != nil {
  77. fmt.Println(err.Error())
  78. }
  79. return err
  80. }
  81. // 给用户加入资讯数量
  82. func (b *Tweet2EsTransfer) SetUserPostNum(data *es.TweetEsData) error {
  83. _, err := b.svcCtx.PaoPaoUser.SetUserPostNum(context.Background(), &paopaouser.SetUserPostNumReq{
  84. PostId: data.Id,
  85. UserId: data.UserId,
  86. SetType: 1,
  87. })
  88. if err != nil {
  89. fmt.Println(err.Error())
  90. }
  91. return err
  92. }