package userBillTransfer import ( "context" "encoding/json" "github.com/zeromicro/go-zero/core/logx" "paopaoimrpc/apps/client/paopaouser" "paopaoimtask/internal/svc" "paopaoimtask/mq" ) /** 从kafka中读取数据写入到es中和redis中 */ type UserBillTransfer struct { svcCtx *svc.ServiceContext logx.Logger } func NewUserBillTransfer(svc *svc.ServiceContext) *UserBillTransfer { return &UserBillTransfer{ svcCtx: svc, Logger: logx.WithContext(context.Background()), } } func (b *UserBillTransfer) Consume(ctx context.Context, key, value string) error { var ( data mq.UserBillContentData ) if err := json.Unmarshal([]byte(value), &data); err != nil { return err } _, err := b.svcCtx.PaoPaoUser.AttachmentProfit(context.Background(), &paopaouser.AttachmentProfitReq{ TweetId: data.PostID, UserId: data.UserID, ProfitUserId: data.ProfitUserID, Price: data.Price, }) if err != nil { // 如果消费失败了回滚用户的 return err } return nil }