package tweetTransfer import ( "context" "encoding/json" "fmt" "github.com/jinzhu/copier" "github.com/zeromicro/go-zero/core/logx" "paopaoimrpc/apps/client/paopaouser" "paopaoimtask/internal/svc" "paopaoimtask/mq" "paopaoimtask/pkg/es" "strings" "sync" ) /** 从kafka中读取数据写入到es中和redis中 */ type Tweet2EsTransfer struct { svcCtx *svc.ServiceContext logx.Logger } func NewTweet2EsTransfer(svc *svc.ServiceContext) *Tweet2EsTransfer { return &Tweet2EsTransfer{ svcCtx: svc, Logger: logx.WithContext(context.Background()), } } func (b *Tweet2EsTransfer) Consume(ctx context.Context, key, value string) error { var ( data mq.MsgTweetData cacheData es.TweetEsData ) if err := json.Unmarshal([]byte(value), &data); err != nil { return err } copier.Copy(&cacheData, data) tags := make(map[string]int32) if data.Tags != "" { for _, s := range strings.Split(data.Tags, ",") { tags[s] = 1 } } cacheData.Tags = tags // 解析之后写入es 和 redis wg := sync.WaitGroup{} wg.Add(2) var mu sync.Mutex var errors []error go func(esData *es.TweetEsData) { defer wg.Done() err := b.setEs(&cacheData) if err != nil { mu.Lock() errors = append(errors, err) mu.Unlock() } }(&cacheData) go func(esData *es.TweetEsData) { defer wg.Done() err := b.SetUserPostNum(&cacheData) if err != nil { mu.Lock() errors = append(errors, err) mu.Unlock() } }(&cacheData) wg.Wait() if len(errors) > 0 { return errors[0] } return nil } func (b *Tweet2EsTransfer) setEs(data *es.TweetEsData) error { var tweetIndex es.TweetIndex err := tweetIndex.Create(data, b.svcCtx.EsClient, b.svcCtx.Config.ElasticSearchConf.TweetIndex) if err != nil { fmt.Println(err.Error()) } return err } // 给用户加入资讯数量 func (b *Tweet2EsTransfer) SetUserPostNum(data *es.TweetEsData) error { _, err := b.svcCtx.PaoPaoUser.SetUserPostNum(context.Background(), &paopaouser.SetUserPostNumReq{ PostId: data.Id, UserId: data.UserId, SetType: 1, }) if err != nil { fmt.Println(err.Error()) } return err }