| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100 |
- package tweetTransfer
- import (
- "context"
- "encoding/json"
- "fmt"
- "github.com/jinzhu/copier"
- "github.com/zeromicro/go-zero/core/logx"
- "paopaoimrpc/apps/client/paopaouser"
- "paopaoimtask/internal/svc"
- "paopaoimtask/mq"
- "paopaoimtask/pkg/es"
- "strings"
- "sync"
- )
- /**
- 从kafka中读取数据写入到es中和redis中
- */
- type Tweet2EsTransfer struct {
- svcCtx *svc.ServiceContext
- logx.Logger
- }
- func NewTweet2EsTransfer(svc *svc.ServiceContext) *Tweet2EsTransfer {
- return &Tweet2EsTransfer{
- svcCtx: svc,
- Logger: logx.WithContext(context.Background()),
- }
- }
- func (b *Tweet2EsTransfer) Consume(ctx context.Context, key, value string) error {
- var (
- data mq.MsgTweetData
- cacheData es.TweetEsData
- )
- if err := json.Unmarshal([]byte(value), &data); err != nil {
- return err
- }
- copier.Copy(&cacheData, data)
- tags := make(map[string]int32)
- if data.Tags != "" {
- for _, s := range strings.Split(data.Tags, ",") {
- tags[s] = 1
- }
- }
- cacheData.Tags = tags
- // 解析之后写入es 和 redis
- wg := sync.WaitGroup{}
- wg.Add(2)
- var mu sync.Mutex
- var errors []error
- go func(esData *es.TweetEsData) {
- defer wg.Done()
- err := b.setEs(&cacheData)
- if err != nil {
- mu.Lock()
- errors = append(errors, err)
- mu.Unlock()
- }
- }(&cacheData)
- go func(esData *es.TweetEsData) {
- defer wg.Done()
- err := b.SetUserPostNum(&cacheData)
- if err != nil {
- mu.Lock()
- errors = append(errors, err)
- mu.Unlock()
- }
- }(&cacheData)
- wg.Wait()
- if len(errors) > 0 {
- return errors[0]
- }
- return nil
- }
- func (b *Tweet2EsTransfer) setEs(data *es.TweetEsData) error {
- var tweetIndex es.TweetIndex
- err := tweetIndex.Create(data, b.svcCtx.EsClient, b.svcCtx.Config.ElasticSearchConf.TweetIndex)
- if err != nil {
- fmt.Println(err.Error())
- }
- return err
- }
- // 给用户加入资讯数量
- func (b *Tweet2EsTransfer) SetUserPostNum(data *es.TweetEsData) error {
- _, err := b.svcCtx.PaoPaoUser.SetUserPostNum(context.Background(), &paopaouser.SetUserPostNumReq{
- PostId: data.Id,
- UserId: data.UserId,
- SetType: 1,
- })
- if err != nil {
- fmt.Println(err.Error())
- }
- return err
- }
|