| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546 |
- package userBillTransfer
- import (
- "context"
- "encoding/json"
- "github.com/zeromicro/go-zero/core/logx"
- "paopaoimrpc/apps/client/paopaouser"
- "paopaoimtask/internal/svc"
- "paopaoimtask/mq"
- )
- /**
- 从kafka中读取数据写入到es中和redis中
- */
- type UserBillTransfer struct {
- svcCtx *svc.ServiceContext
- logx.Logger
- }
- func NewUserBillTransfer(svc *svc.ServiceContext) *UserBillTransfer {
- return &UserBillTransfer{
- svcCtx: svc,
- Logger: logx.WithContext(context.Background()),
- }
- }
- func (b *UserBillTransfer) Consume(ctx context.Context, key, value string) error {
- var (
- data mq.UserBillContentData
- )
- if err := json.Unmarshal([]byte(value), &data); err != nil {
- return err
- }
- _, err := b.svcCtx.PaoPaoUser.AttachmentProfit(context.Background(), &paopaouser.AttachmentProfitReq{
- TweetId: data.PostID,
- UserId: data.UserID,
- ProfitUserId: data.ProfitUserID,
- Price: data.Price,
- })
- if err != nil {
- // 如果消费失败了回滚用户的
- return err
- }
- return nil
- }
|