Bladeren bron

更新扩展包

v_grhhuang 11 maanden geleden
bovenliggende
commit
413174a816

+ 0 - 92
etc/task-mq.yaml

@@ -1,92 +0,0 @@
-# 模块名称
-Name: task-mq
-# 模块监听的ip和端口
-ListenOn: 0.0.0.0:10001
-# 用户消息消费配置,这里是采用go-zero自带的kafka配置,可以参考https://go-zero.dev/docs/tutorials/message-queue/kafka
-MsgChatTransfer:
-  # service的名称
-  Name: MsgChatTransfer
-  # kafka 的多个 Broker 节点
-  Brokers:
-    - 192.168.50.14:19094
-  # 消费者组
-  Group: kafka
-  # 订阅的 Topic 主题
-  Topic: msgChatTransfer
-  # 从头开始消费
-  Offset: first
-  # go-queue 内部是起多个 goroutine 从 kafka 中获取信息写入进程内的 channel,这个参数是控制此处的 goroutine 数量(⚠️ 并不是真正消费时的并发 goroutine 数量)
-  Consumers: 1
-# 如上的配置参数;此配置用于消息已读
-MsgReadTransfer:
-  Name: MsgReadTransfer
-  Brokers:
-    - 192.168.50.14:19094
-  Group: kafka
-  Topic: msgReadTransfer
-  Offset: first
-  Consumers: 1
-# 如上的配置参数;此配置用于资讯模块的异步写es和缓存
-TweetTransfer:
-  Name: TweetTransfer
-  Brokers:
-    - 192.168.50.14:19094
-  Group: kafka
-  Topic: tweetTransfer
-  Offset: first
-  Consumers: 1
-# 如上的配置参数;此配置用于用户购买附件之后的异步处理
-UserBillTransfer:
-  Name: UserBillTransfer
-  Brokers:
-    - 192.168.50.14:19094
-  Group: kafka
-  Topic: userBillTransfer
-  Offset: first
-  Consumers: 1
-# 消息已读处理的配置
-MsgReadHandler:
-  GroupMsgReadHandler: 1
-  GroupMsgReadRecordDelayTime: 60
-  GroupMsgReadRecordDelayCount: 2
-# redis的配置参数
-Redisx:
-  Host: 192.168.50.14:16379
-  Type: node
-  Pass: easy-chat
-# mongo的配置信息
-Mongo:
-  url: "mongodb://root:123456@192.168.50.14:47017"
-  Db: easy-chat
-# websocket的地址
-Ws:
-  Host: paopao-im-ws-svc:10003
-# 好友模块的地址
-SocialRpc:
-  Target: k8s://paopao-im/paopao-social-rpc-svc:10002
-# 用户模块的地址
-UserRpc:
-  Target: k8s://paopao-im/paopao-user-rpc-svc:10000
-# es的连接配置参数
-ElasticsearchConf:
-  # es ip地址
-  Host: 192.168.50.14
-  # es 的端口
-  Port: 19200
-  # es的用户名
-  Username: elastic
-  # es的账号密码
-  Password: huang123456
-  # es中需要写入的索引
-  TweetIndex: "es_index_tweet"
-
-Log:
-  ServiceName: task-mq
-  Mode: file
-  Encoding: json
-  Path: "/var/logs"
-  Level: "info"
-  Compress: true
-  KeepDays: 30
-  StackCooldownMillis: 100
-  Rotation: daily

+ 31 - 90
go.mod

@@ -1,108 +1,49 @@
-module git.banshen.xyz/huangguangrong/paopao-im-task
+module paopaoqueue
 
 go 1.23.2
 
-require (
-	github.com/elastic/go-elasticsearch/v8 v8.16.0
-	github.com/jinzhu/copier v0.4.0
-	github.com/zeromicro/go-queue v1.2.2
-	github.com/zeromicro/go-zero v1.7.3
-	go.mongodb.org/mongo-driver v1.17.1
-	go.uber.org/zap v1.24.0
-)
-
-require (
-	github.com/golang/snappy v0.0.4 // indirect
-	github.com/gorilla/websocket v1.5.3 // indirect
-	github.com/montanaflynn/stats v0.7.1 // indirect
-	github.com/xdg-go/pbkdf2 v1.0.0 // indirect
-	github.com/xdg-go/scram v1.1.2 // indirect
-	github.com/xdg-go/stringprep v1.0.4 // indirect
-	github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
-	golang.org/x/crypto v0.28.0 // indirect
-	golang.org/x/sync v0.9.0 // indirect
-)
+require github.com/zeromicro/go-queue v1.2.2
 
 require (
 	github.com/beorn7/perks v1.0.1 // indirect
-	github.com/cenkalti/backoff/v4 v4.3.0 // indirect
-	github.com/cespare/xxhash/v2 v2.3.0 // indirect
-	github.com/coreos/go-semver v0.3.1 // indirect
-	github.com/coreos/go-systemd/v22 v22.5.0 // indirect
-	github.com/davecgh/go-spew v1.1.1 // indirect
-	github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
-	github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect
-	github.com/emicklei/go-restful/v3 v3.11.0 // indirect
+	github.com/cenkalti/backoff/v4 v4.2.1 // indirect
+	github.com/cespare/xxhash/v2 v2.2.0 // indirect
 	github.com/fatih/color v1.17.0 // indirect
-	github.com/go-logr/logr v1.4.2 // indirect
+	github.com/go-logr/logr v1.3.0 // indirect
 	github.com/go-logr/stdr v1.2.2 // indirect
-	github.com/go-openapi/jsonpointer v0.19.6 // indirect
-	github.com/go-openapi/jsonreference v0.20.2 // indirect
-	github.com/go-openapi/swag v0.22.4 // indirect
-	github.com/gogo/protobuf v1.3.2 // indirect
-	github.com/golang/mock v1.6.0 // indirect
-	github.com/golang/protobuf v1.5.4 // indirect
-	github.com/google/gnostic-models v0.6.8 // indirect
-	github.com/google/go-cmp v0.6.0 // indirect
-	github.com/google/gofuzz v1.2.0 // indirect
-	github.com/google/uuid v1.6.0 // indirect
-	github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0 // indirect
-	github.com/josharian/intern v1.0.0 // indirect
-	github.com/json-iterator/go v1.1.12 // indirect
+	github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0 // indirect
 	github.com/klauspost/compress v1.17.9 // indirect
-	github.com/mailru/easyjson v0.7.7 // indirect
 	github.com/mattn/go-colorable v0.1.13 // indirect
 	github.com/mattn/go-isatty v0.0.20 // indirect
-	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
-	github.com/modern-go/reflect2 v1.0.2 // indirect
-	github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
-	github.com/openzipkin/zipkin-go v0.4.3 // indirect
+	github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
+	github.com/openzipkin/zipkin-go v0.4.2 // indirect
 	github.com/pelletier/go-toml/v2 v2.2.2 // indirect
 	github.com/pierrec/lz4/v4 v4.1.21 // indirect
-	github.com/prometheus/client_golang v1.20.5 // indirect
-	github.com/prometheus/client_model v0.6.1 // indirect
-	github.com/prometheus/common v0.55.0 // indirect
-	github.com/prometheus/procfs v0.15.1 // indirect
-	github.com/redis/go-redis/v9 v9.7.0 // indirect
+	github.com/prometheus/client_golang v1.18.0 // indirect
+	github.com/prometheus/client_model v0.5.0 // indirect
+	github.com/prometheus/common v0.45.0 // indirect
+	github.com/prometheus/procfs v0.12.0 // indirect
 	github.com/segmentio/kafka-go v0.4.47 // indirect
 	github.com/spaolacci/murmur3 v1.1.0 // indirect
-	go.etcd.io/etcd/api/v3 v3.5.15 // indirect
-	go.etcd.io/etcd/client/pkg/v3 v3.5.15 // indirect
-	go.etcd.io/etcd/client/v3 v3.5.15 // indirect
-	go.opentelemetry.io/otel v1.32.0 // indirect
+	github.com/zeromicro/go-zero v1.6.6 // indirect
+	go.opentelemetry.io/otel v1.19.0 // indirect
 	go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect
-	go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0 // indirect
-	go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.32.0 // indirect
-	go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.32.0 // indirect
-	go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.32.0 // indirect
-	go.opentelemetry.io/otel/exporters/zipkin v1.32.0 // indirect
-	go.opentelemetry.io/otel/metric v1.32.0 // indirect
-	go.opentelemetry.io/otel/sdk v1.32.0 // indirect
-	go.opentelemetry.io/otel/trace v1.32.0 // indirect
-	go.opentelemetry.io/proto/otlp v1.3.1 // indirect
-	go.uber.org/atomic v1.10.0 // indirect
-	go.uber.org/automaxprocs v1.6.0 // indirect
-	go.uber.org/multierr v1.9.0 // indirect
-	golang.org/x/net v0.30.0 // indirect
-	golang.org/x/oauth2 v0.23.0 // indirect
-	golang.org/x/sys v0.27.0 // indirect
-	golang.org/x/term v0.25.0 // indirect
-	golang.org/x/text v0.20.0 // indirect
-	golang.org/x/time v0.7.0 // indirect
-	google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 // indirect
-	google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 // indirect
-	google.golang.org/grpc v1.68.0 // indirect
-	google.golang.org/protobuf v1.35.2 // indirect
-	gopkg.in/inf.v0 v0.9.1 // indirect
+	go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 // indirect
+	go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 // indirect
+	go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 // indirect
+	go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.19.0 // indirect
+	go.opentelemetry.io/otel/exporters/zipkin v1.19.0 // indirect
+	go.opentelemetry.io/otel/metric v1.19.0 // indirect
+	go.opentelemetry.io/otel/sdk v1.19.0 // indirect
+	go.opentelemetry.io/otel/trace v1.19.0 // indirect
+	go.opentelemetry.io/proto/otlp v1.0.0 // indirect
+	go.uber.org/automaxprocs v1.5.3 // indirect
+	golang.org/x/net v0.26.0 // indirect
+	golang.org/x/sys v0.21.0 // indirect
+	golang.org/x/text v0.16.0 // indirect
+	google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 // indirect
+	google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect
+	google.golang.org/grpc v1.64.0 // indirect
+	google.golang.org/protobuf v1.34.2 // indirect
 	gopkg.in/yaml.v2 v2.4.0 // indirect
-	gopkg.in/yaml.v3 v3.0.1 // indirect
-	k8s.io/api v0.29.3 // indirect
-	k8s.io/apimachinery v0.29.4 // indirect
-	k8s.io/client-go v0.29.3 // indirect
-	k8s.io/klog/v2 v2.110.1 // indirect
-	k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect
-	k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect
-	sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
-	sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
-	sigs.k8s.io/yaml v1.3.0 // indirect
 )

+ 0 - 44
internal/config/config.go

@@ -1,44 +0,0 @@
-package config
-
-import (
-	"github.com/zeromicro/go-queue/kq"
-	"github.com/zeromicro/go-zero/core/service"
-	"github.com/zeromicro/go-zero/core/stores/redis"
-	"github.com/zeromicro/go-zero/zrpc"
-)
-
-type Config struct {
-	service.ServiceConf
-	ListenOn         string
-	MsgChatTransfer  kq.KqConf
-	MsgReadTransfer  kq.KqConf
-	TweetTransfer    kq.KqConf
-	UserBillTransfer kq.KqConf
-	Redisx           redis.RedisConf
-
-	Mongo struct {
-		Url string
-		Db  string
-	}
-
-	Ws struct {
-		Host string
-	}
-
-	SocialRpc zrpc.RpcClientConf
-	UserRpc   zrpc.RpcClientConf
-
-	MsgReadHandler struct {
-		GroupMsgReadHandler          int32
-		GroupMsgReadRecordDelayTime  int64
-		GroupMsgReadRecordDelayCount int64
-	}
-
-	ElasticSearchConf struct {
-		Host       string
-		Port       int64
-		Username   string
-		Password   string
-		TweetIndex string
-	}
-}

+ 0 - 29
internal/handler/listen.go

@@ -1,29 +0,0 @@
-package handler
-
-import (
-	"github.com/zeromicro/go-queue/kq"
-	"github.com/zeromicro/go-zero/core/service"
-	"paopaoimtask/internal/handler/msgTransfer"
-	"paopaoimtask/internal/handler/tweetTransfer"
-	"paopaoimtask/internal/handler/userBillTransfer"
-	"paopaoimtask/internal/svc"
-)
-
-type Listen struct {
-	svc *svc.ServiceContext
-}
-
-func NewListen(svc *svc.ServiceContext) *Listen {
-	return &Listen{
-		svc,
-	}
-}
-
-func (l *Listen) Services() []service.Service {
-	return []service.Service{
-		kq.MustNewQueue(l.svc.Config.MsgChatTransfer, msgTransfer.NewMsgChatTransfer(l.svc)),
-		kq.MustNewQueue(l.svc.Config.MsgReadTransfer, msgTransfer.NewMsgReadTransfer(l.svc)),
-		kq.MustNewQueue(l.svc.Config.TweetTransfer, tweetTransfer.NewTweet2EsTransfer(l.svc)),
-		kq.MustNewQueue(l.svc.Config.UserBillTransfer, userBillTransfer.NewUserBillTransfer(l.svc)),
-	}
-}

+ 0 - 135
internal/handler/msgTransfer/groupMsgRead.go

@@ -1,135 +0,0 @@
-package msgTransfer
-
-import (
-	"github.com/zeromicro/go-zero/core/logx"
-	"paopaoimtask/pkg/constants"
-	"paopaoimws/ws"
-	"sync"
-	"time"
-)
-
-type groupMsgRead struct {
-	mu             sync.Mutex
-	conversationId string
-	push           *ws.Push      //推送方法
-	pushCh         chan *ws.Push // 通道推送
-
-	count int // 数量
-
-	pushTime time.Time // 上次推送时间
-
-	done chan struct{}
-}
-
-func newGroupMsgRead(push *ws.Push, pushCh chan *ws.Push) *groupMsgRead {
-	g := &groupMsgRead{
-		push:           push,
-		conversationId: push.ConversationId,
-		pushCh:         pushCh,
-		count:          1,
-		pushTime:       time.Time{},
-		done:           make(chan struct{}),
-	}
-	go g.transfer()
-	return g
-}
-
-func (g *groupMsgRead) mergePush(push *ws.Push) {
-	g.mu.Lock()
-	defer g.mu.Unlock()
-	if push == nil {
-		g.push = push
-	}
-	g.count++
-	for msgId, read := range push.ReadRecords {
-		g.push.ReadRecords[msgId] = read
-	}
-}
-
-// 是否是活跃的状态
-func (g *groupMsgRead) IsIdle() bool {
-	g.mu.Lock()
-	defer g.mu.Unlock()
-	return g.isIdle()
-}
-
-func (g *groupMsgRead) isIdle() bool {
-	pushTime := g.pushTime
-	val := GroupMsgReadRecordDelayTime*2 - time.Since(pushTime)
-	if val <= 0 && g.push == nil && g.count == 0 {
-		return true
-	}
-	return false
-}
-
-// 清理内容
-func (g *groupMsgRead) clear() {
-	select {
-	case <-g.done:
-	default:
-		close(g.done)
-	}
-	g.push = nil
-}
-
-func (g *groupMsgRead) transfer() {
-	// 超时发送
-	timer := time.NewTimer(GroupMsgReadRecordDelayTime / 2)
-	defer timer.Stop()
-	for {
-		select {
-		case <-g.done:
-			return
-		case <-timer.C:
-			g.mu.Lock()
-			pushTime := g.pushTime
-			val := GroupMsgReadRecordDelayTime - time.Since(pushTime)
-			push := g.push
-			if val > 0 && g.count < GroupMsgReadRecordDelayCount || push == nil {
-				// 时间和数量都没有达标
-				if val > 0 {
-					timer.Reset(val)
-				}
-				g.mu.Unlock()
-				continue
-			}
-			g.pushTime = time.Now()
-			g.push = nil
-			g.count = 0
-			timer.Reset(val)
-			g.mu.Unlock()
-			// 推送
-			logx.Infof("超过合并的条件推送 %v", push)
-			g.pushCh <- push
-		default:
-			g.mu.Lock()
-			if g.count >= GroupMsgReadRecordDelayCount {
-				push := g.push
-				// 达到了数量的话也要进行推送
-				g.pushTime = time.Now()
-				g.push = nil
-				g.count = 0
-				g.mu.Unlock()
-				// 推送
-				logx.Infof("超过合并的条件推送 %v", push)
-				g.pushCh <- push
-				continue
-			}
-			if g.isIdle() {
-				g.mu.Unlock()
-				g.pushCh <- &ws.Push{
-					ChatType:       constants.GroupChatType,
-					ConversationId: g.conversationId,
-				}
-				continue
-			}
-			g.mu.Unlock()
-			timeDelay := GroupMsgReadRecordDelayTime / 4
-			if timeDelay > time.Second {
-				timeDelay = time.Second
-			}
-			time.Sleep(timeDelay)
-		}
-	}
-	// 超量发送
-}

+ 0 - 79
internal/handler/msgTransfer/msgChatTransfer.go

@@ -1,79 +0,0 @@
-package msgTransfer
-
-import (
-	"context"
-	"encoding/json"
-	"go.mongodb.org/mongo-driver/bson/primitive"
-	"paopaoimtask/internal/svc"
-	"paopaoimtask/mq"
-	"paopaoimtask/pkg/bitmap"
-	"paopaoimws/immodels"
-	"paopaoimws/ws"
-	"strconv"
-)
-
-/**
-消息消费
-*/
-
-type MsgChatTransfer struct {
-	*baseMsgTransfer
-}
-
-func NewMsgChatTransfer(svc *svc.ServiceContext) *MsgChatTransfer {
-	return &MsgChatTransfer{
-		NewBaseMsgTransfer(svc),
-	}
-}
-
-// 消息消费的实现方法
-func (m *MsgChatTransfer) Consume(ctx context.Context, key, value string) error {
-	var (
-		data  mq.MsgChatTransfer
-		msgId = primitive.NewObjectID()
-	)
-	if err := json.Unmarshal([]byte(value), &data); err != nil {
-		return err
-	}
-	if err := m.addChatLog(ctx, msgId, &data); err != nil {
-		return err
-	}
-
-	return m.Transfer(ctx, &ws.Push{
-		MsgID:          msgId.Hex(),
-		ConversationId: data.ConversationId,
-		ChatType:       data.ChatType,
-		SendID:         data.SendID,
-		RecvID:         data.RecvID,
-		RecvIDs:        data.RecvIDs,
-		SendTime:       data.SendTime,
-		MType:          data.MType,
-		Content:        data.Content,
-	})
-}
-
-// 记录消息到mongodb
-func (m *MsgChatTransfer) addChatLog(ctx context.Context, msgId primitive.ObjectID, data *mq.MsgChatTransfer) error {
-	//记录消息
-	chatlog := immodels.ChatLog{
-		ID:             msgId,
-		ConversationId: data.ConversationId,
-		SendId:         strconv.FormatInt(data.SendID, 10),
-		RecvId:         strconv.FormatInt(data.RecvID, 10),
-		MsgFrom:        0,
-		MsgType:        data.MType,
-		MsgContent:     data.Content,
-		SendTime:       data.SendTime,
-		ChatType:       data.ChatType,
-	}
-	//设置自己为已读状态
-	readRecords := bitmap.NewBitMap(0)
-	readRecords.Set(data.SendID)
-	chatlog.ReadRecords = readRecords.Export()
-
-	err := m.svcCtx.ChatLogModel.Insert(ctx, &chatlog)
-	if err != nil {
-		return err
-	}
-	return m.svcCtx.ConversationModel.UpdateMsg(ctx, &chatlog)
-}

+ 0 - 144
internal/handler/msgTransfer/msgReadTransfer.go

@@ -1,144 +0,0 @@
-package msgTransfer
-
-import (
-	"context"
-	"encoding/base64"
-	"encoding/json"
-	"github.com/zeromicro/go-zero/core/stores/cache"
-	"paopaoimtask/internal/svc"
-	"paopaoimtask/mq"
-	"paopaoimtask/pkg/bitmap"
-	"paopaoimtask/pkg/constants"
-	"paopaoimws/ws"
-	"sync"
-	"time"
-)
-
-var (
-	GroupMsgReadRecordDelayTime  = time.Second // 超时时间
-	GroupMsgReadRecordDelayCount = 10          //最大消息处理数量
-)
-
-const (
-	GroupMsgReadHandlerAtTransfer = iota //是否开启群消息批量处理
-	GroupMsgReadHandlerDelayTransfer
-)
-
-type MsgReadTransfer struct {
-	*baseMsgTransfer
-	cache.Cache
-	mu        sync.Mutex
-	groupMsgs map[string]*groupMsgRead
-	push      chan *ws.Push
-}
-
-func NewMsgReadTransfer(svc *svc.ServiceContext) *MsgReadTransfer {
-	m := &MsgReadTransfer{
-		baseMsgTransfer: NewBaseMsgTransfer(svc),
-		groupMsgs:       make(map[string]*groupMsgRead, 1),
-		push:            make(chan *ws.Push, 1),
-	}
-	if svc.Config.MsgReadHandler.GroupMsgReadHandler != GroupMsgReadHandlerAtTransfer {
-		// 开启了群消息批量处理
-		if svc.Config.MsgReadHandler.GroupMsgReadRecordDelayCount > 0 {
-			GroupMsgReadRecordDelayCount = int(svc.Config.MsgReadHandler.GroupMsgReadRecordDelayCount)
-		}
-		if svc.Config.MsgReadHandler.GroupMsgReadRecordDelayTime > 0 {
-			GroupMsgReadRecordDelayTime = time.Duration(svc.Config.MsgReadHandler.GroupMsgReadRecordDelayTime) * time.Second
-		}
-	}
-	go m.transfer()
-	return m
-}
-
-func (m *MsgReadTransfer) Consume(ctx context.Context, key, value string) error {
-	var (
-		data mq.MsgMarkRead
-	)
-	if err := json.Unmarshal([]byte(value), &data); err != nil {
-		return err
-	}
-
-	// 更新已读未读处理
-	read, err := m.UpdateChatLogRead(ctx, &data)
-	if err != nil {
-		return err
-	}
-	push := &ws.Push{
-		ConversationId: data.ConversationId,
-		ChatType:       data.ChatType,
-		SendID:         data.SendID,
-		RecvID:         data.RecvID,
-		ContentType:    constants.ContentMakeRead,
-		ReadRecords:    read,
-	}
-	switch data.ChatType {
-	case constants.SingleChatType:
-		// 私聊直接推送
-		m.push <- push
-	case constants.GroupChatType:
-		// 判断是否开启合并消息
-		if m.svcCtx.Config.MsgReadHandler.GroupMsgReadHandler == GroupMsgReadHandlerAtTransfer {
-			m.push <- push
-		}
-		m.mu.Lock()
-		defer m.mu.Unlock()
-		push.SendID = 0
-		if _, ok := m.groupMsgs[push.ConversationId]; ok {
-			m.Infof("merge push %v", push.ConversationId)
-			m.groupMsgs[push.ConversationId].mergePush(push)
-		} else {
-			m.groupMsgs[push.ConversationId] = newGroupMsgRead(push, m.push)
-		}
-	}
-
-	return nil
-}
-
-func (m *MsgReadTransfer) UpdateChatLogRead(ctx context.Context, data *mq.MsgMarkRead) (map[string]string, error) {
-	res := make(map[string]string)
-	chatLogs, err := m.svcCtx.ChatLogModel.ListByMsgIds(ctx, data.MsgIds)
-	if err != nil {
-		return nil, err
-	}
-	for _, chat := range chatLogs {
-		switch chat.ChatType {
-		case constants.SingleChatType:
-			chat.ReadRecords = []byte{1}
-		case constants.GroupChatType:
-			readRecords := bitmap.Load(chat.ReadRecords)
-			readRecords.Set(data.SendID)
-			chat.ReadRecords = readRecords.Export()
-		}
-
-		res[chat.ID.Hex()] = base64.StdEncoding.EncodeToString(chat.ReadRecords)
-		err := m.svcCtx.ChatLogModel.UpdateMakeRead(ctx, chat.ID, chat.ReadRecords)
-		if err != nil {
-			return nil, err
-		}
-
-	}
-	return res, nil
-}
-
-func (m *MsgReadTransfer) transfer() {
-	for push := range m.push {
-		if push.RecvID > 0 || len(push.RecvIDs) > 0 {
-			if err := m.Transfer(context.Background(), push); err != nil {
-				m.Errorf("m transfer err %v push %v", err, push)
-			}
-		}
-		if push.ChatType == constants.SingleChatType {
-			continue
-		}
-		if m.svcCtx.Config.MsgReadHandler.GroupMsgReadHandler == GroupMsgReadHandlerAtTransfer {
-			continue
-		}
-		// 接收者id为空
-		m.mu.Lock()
-		if _, ok := m.groupMsgs[push.ConversationId]; ok && m.groupMsgs[push.ConversationId].IsIdle() {
-			m.groupMsgs[push.ConversationId].clear()
-			delete(m.groupMsgs, push.ConversationId)
-		}
-	}
-}

+ 0 - 72
internal/handler/msgTransfer/msgTransfer.go

@@ -1,72 +0,0 @@
-package msgTransfer
-
-import (
-	"context"
-	"fmt"
-	"github.com/zeromicro/go-zero/core/logx"
-	"paopaoimrpc/apps/client/paopaosocial"
-	"paopaoimtask/internal/svc"
-	"paopaoimtask/pkg/constants"
-	"paopaoimws/websocket"
-	"paopaoimws/ws"
-)
-
-type baseMsgTransfer struct {
-	svcCtx *svc.ServiceContext
-	logx.Logger
-}
-
-func NewBaseMsgTransfer(svc *svc.ServiceContext) *baseMsgTransfer {
-	return &baseMsgTransfer{
-		svcCtx: svc,
-		Logger: logx.WithContext(context.Background()),
-	}
-}
-
-func (b *baseMsgTransfer) Transfer(ctx context.Context, data *ws.Push) error {
-	var err error
-	switch data.ChatType {
-	case constants.SingleChatType:
-		err = b.single(ctx, data)
-	case constants.GroupChatType:
-		err = b.group(ctx, data)
-	}
-	return err
-}
-
-func (b *baseMsgTransfer) single(ctx context.Context, data *ws.Push) error {
-	return b.svcCtx.WsClient.Send(websocket.Message{
-		FrameType: websocket.FrameData,
-		Method:    "push",
-		FormID:    constants.SYSTEM_ROOT_UID,
-		Data:      data,
-		Id:        data.MsgID,
-	})
-}
-
-func (b *baseMsgTransfer) group(ctx context.Context, data *ws.Push) error {
-	// 查询群用户
-	fmt.Println("接收者id: ", data.RecvID)
-	fmt.Println("发送者id: ", data.SendID)
-	members, err := b.svcCtx.PaoPaoSocial.GetGroupMembers(ctx, &paopaosocial.GetGroupMembersReq{
-		GroupId: data.RecvID,
-		UserId:  data.SendID,
-	})
-	if err != nil {
-		return err
-	}
-	data.RecvIDs = make([]int64, 0, members.Count)
-	for _, item := range members.List {
-		if item.UserId == data.SendID {
-			continue
-		}
-		data.RecvIDs = append(data.RecvIDs, item.UserId)
-	}
-	return b.svcCtx.WsClient.Send(websocket.Message{
-		FrameType: websocket.FrameData,
-		Method:    "push",
-		FormID:    constants.SYSTEM_ROOT_UID,
-		Data:      data,
-		Id:        data.MsgID,
-	})
-}

+ 0 - 100
internal/handler/tweetTransfer/tweet2EsTransfer.go

@@ -1,100 +0,0 @@
-package tweetTransfer
-
-import (
-	"context"
-	"encoding/json"
-	"fmt"
-	"github.com/jinzhu/copier"
-	"github.com/zeromicro/go-zero/core/logx"
-	"paopaoimrpc/apps/client/paopaouser"
-	"paopaoimtask/internal/svc"
-	"paopaoimtask/mq"
-	"paopaoimtask/pkg/es"
-	"strings"
-	"sync"
-)
-
-/**
-从kafka中读取数据写入到es中和redis中
-*/
-
-type Tweet2EsTransfer struct {
-	svcCtx *svc.ServiceContext
-	logx.Logger
-}
-
-func NewTweet2EsTransfer(svc *svc.ServiceContext) *Tweet2EsTransfer {
-	return &Tweet2EsTransfer{
-		svcCtx: svc,
-		Logger: logx.WithContext(context.Background()),
-	}
-}
-
-func (b *Tweet2EsTransfer) Consume(ctx context.Context, key, value string) error {
-	var (
-		data      mq.MsgTweetData
-		cacheData es.TweetEsData
-	)
-	if err := json.Unmarshal([]byte(value), &data); err != nil {
-		return err
-	}
-
-	copier.Copy(&cacheData, data)
-	tags := make(map[string]int32)
-	if data.Tags != "" {
-		for _, s := range strings.Split(data.Tags, ",") {
-			tags[s] = 1
-		}
-	}
-	cacheData.Tags = tags
-	// 解析之后写入es 和 redis
-	wg := sync.WaitGroup{}
-	wg.Add(2)
-	var mu sync.Mutex
-	var errors []error
-	go func(esData *es.TweetEsData) {
-		defer wg.Done()
-		err := b.setEs(&cacheData)
-		if err != nil {
-			mu.Lock()
-			errors = append(errors, err)
-			mu.Unlock()
-		}
-	}(&cacheData)
-	go func(esData *es.TweetEsData) {
-		defer wg.Done()
-		err := b.SetUserPostNum(&cacheData)
-		if err != nil {
-			mu.Lock()
-			errors = append(errors, err)
-			mu.Unlock()
-		}
-	}(&cacheData)
-	wg.Wait()
-	if len(errors) > 0 {
-		return errors[0]
-	}
-	return nil
-}
-
-func (b *Tweet2EsTransfer) setEs(data *es.TweetEsData) error {
-	var tweetIndex es.TweetIndex
-	err := tweetIndex.Create(data, b.svcCtx.EsClient, b.svcCtx.Config.ElasticSearchConf.TweetIndex)
-	if err != nil {
-		fmt.Println(err.Error())
-	}
-	return err
-}
-
-// 给用户加入资讯数量
-func (b *Tweet2EsTransfer) SetUserPostNum(data *es.TweetEsData) error {
-	_, err := b.svcCtx.PaoPaoUser.SetUserPostNum(context.Background(), &paopaouser.SetUserPostNumReq{
-		PostId:  data.Id,
-		UserId:  data.UserId,
-		SetType: 1,
-	})
-	if err != nil {
-		fmt.Println(err.Error())
-	}
-	return err
-}

+ 0 - 46
internal/handler/userBillTransfer/userbillTransfer.go

@@ -1,46 +0,0 @@
-package userBillTransfer
-
-import (
-	"context"
-	"encoding/json"
-	"github.com/zeromicro/go-zero/core/logx"
-	"paopaoimrpc/apps/client/paopaouser"
-	"paopaoimtask/internal/svc"
-	"paopaoimtask/mq"
-)
-
-/**
-从kafka中读取数据写入到es中和redis中
-*/
-
-type UserBillTransfer struct {
-	svcCtx *svc.ServiceContext
-	logx.Logger
-}
-
-func NewUserBillTransfer(svc *svc.ServiceContext) *UserBillTransfer {
-	return &UserBillTransfer{
-		svcCtx: svc,
-		Logger: logx.WithContext(context.Background()),
-	}
-}
-
-func (b *UserBillTransfer) Consume(ctx context.Context, key, value string) error {
-	var (
-		data mq.UserBillContentData
-	)
-	if err := json.Unmarshal([]byte(value), &data); err != nil {
-		return err
-	}
-	_, err := b.svcCtx.PaoPaoUser.AttachmentProfit(context.Background(), &paopaouser.AttachmentProfitReq{
-		TweetId:      data.PostID,
-		UserId:       data.UserID,
-		ProfitUserId: data.ProfitUserID,
-		Price:        data.Price,
-	})
-	if err != nil {
-		// 如果消费失败了回滚用户的
-		return err
-	}
-	return nil
-}

+ 0 - 57
internal/svc/servicecontext.go

@@ -1,57 +0,0 @@
-package svc
-
-import (
-	"github.com/elastic/go-elasticsearch/v8"
-	"github.com/zeromicro/go-zero/core/stores/redis"
-	"github.com/zeromicro/go-zero/zrpc"
-	"net/http"
-	socialRpc "paopaoimrpc/apps/client/paopaosocial"
-	userRpc "paopaoimrpc/apps/client/paopaouser"
-	"paopaoimrpc/apps/pkg/constants"
-	"paopaoimtask/internal/config"
-	"paopaoimtask/pkg/es"
-	"paopaoimws/immodels"
-	"paopaoimws/websocket"
-)
-
-type ServiceContext struct {
-	Config   config.Config
-	WsClient websocket.Client
-	*redis.Redis
-	immodels.ChatLogModel
-	immodels.ConversationModel
-	EsClient *elasticsearch.Client
-	userRpc.PaoPaoUser
-	socialRpc.PaoPaoSocial
-}
-
-func NewServiceContext(c config.Config) *ServiceContext {
-	esClient, err := es.GetEsClient(c.ElasticSearchConf.Host, c.ElasticSearchConf.Port, c.ElasticSearchConf.Username, c.ElasticSearchConf.Password)
-	if err != nil {
-		panic(err)
-	}
-	userrpc, _ := zrpc.NewClient(c.UserRpc)
-	socialrpc, _ := zrpc.NewClient(c.SocialRpc)
-	svc := &ServiceContext{
-		Config:            c,
-		Redis:             redis.MustNewRedis(c.Redisx),
-		ChatLogModel:      immodels.MustChatLogModel(c.Mongo.Url, c.Mongo.Db),
-		ConversationModel: immodels.MustConversationModel(c.Mongo.Url, c.Mongo.Db),
-		PaoPaoUser:        userRpc.NewPaoPaoUser(userrpc),
-		PaoPaoSocial:      socialRpc.NewPaoPaoSocial(socialrpc),
-		EsClient:          esClient,
-	}
-	token, err := svc.GetSystemToken()
-	if err != nil {
-		panic(err)
-	}
-	header := http.Header{}
-	header.Set("Authorization", token)
-	svc.WsClient = websocket.NewClient(c.Ws.Host, websocket.WithClientHeader(header))
-	return svc
-}
-
-func (s *ServiceContext) GetSystemToken() (string, error) {
-	get, err := s.Redis.Get(constants.REDIS_SYSTEM_ROOT_TOKEN)
-	return get, err
-}

+ 0 - 65
mq/mq.go

@@ -1,65 +0,0 @@
-package mq
-
-import (
-	"paopaoimtask/pkg/constants"
-)
-
-type MsgChatTransfer struct {
-	MsgId              string `mapstructure:"msgId"`
-	ConversationId     string `json:"conversationId"`
-	constants.ChatType `json:"chatType"`
-	SendID             int64   `json:"sendId"`  //发送者
-	RecvID             int64   `json:"recvId"`  //接收者
-	RecvIDs            []int64 `json:"recvIds"` //多个接收者
-	SendTime           int64   `json:"sendTime"`
-
-	constants.MType `json:"mType"`
-	Content         string `json:"content"`
-}
-
-type MsgMarkRead struct {
-	ConversationId     string `json:"conversationId"`
-	constants.ChatType `json:"chatType"`
-	SendID             int64    `json:"sendId"` //发送者
-	RecvID             int64    `json:"recvId"` //接收者
-	MsgIds             []string `json:"msgIds"`
-}
-
-type MsgTweetData struct {
-	ID              int64                  `json:"id"`
-	UserID          int64                  `json:"user_id"`
-	CommentCount    int64                  `json:"comment_count"`
-	CollectionCount int64                  `json:"collection_count"`
-	ShareCount      int64                  `json:"share_count"`
-	UpvoteCount     int64                  `json:"upvote_count"`
-	Visibility      constants.PostVisibleT `json:"visibility"`
-	IsTop           int                    `json:"is_top"`
-	IsEssence       int                    `json:"is_essence"`
-	IsLock          int                    `json:"is_lock"`
-	LatestRepliedOn int64                  `json:"latest_replied_on"`
-	Tags            string                 `json:"tags"`
-	AttachmentPrice int64                  `json:"attachment_price"`
-	IP              string                 `json:"ip"`
-	IPLoc           string                 `json:"ip_loc"`
-	Contents        []MsgTweetContentData  `json:"contents"`
-	CreatedOn       int64                  `json:"created_on"`
-	ModifiedOn      int64                  `json:"modified_on"`
-	DeletedOn       int64                  `json:"deleted_on"`
-	IsDel           int64                  `json:"is_del"`
-}
-
-type MsgTweetContentData struct {
-	ID      int64                  `json:"id"`
-	PostID  int64                  `json:"post_id"`
-	UserID  int64                  `json:"user_id"`
-	Content string                 `json:"content"`
-	Type    constants.PostContentT `json:"type"`
-	Sort    int64                  `json:"sort"`
-}
-
-type UserBillContentData struct {
-	PostID       int64 `json:"post_id"`        // 咨询id
-	UserID       int64 `json:"user_id"`        // 用户id
-	ProfitUserID int64 `json:"profit_user_id"` // 受益者id
-	Price        int64 `json:"price"`          // 购买价格
-}

+ 6 - 6
mqClient/msgtransfer.go → msgtransfer.go

@@ -1,14 +1,14 @@
-package mqClient
+package paopaoqueue
 
 import (
 	"context"
 	"encoding/json"
 	"github.com/zeromicro/go-queue/kq"
-	"paopaoimtask/mq"
+	"paopaoqueue/types"
 )
 
 type MsgChatTransferClient interface {
-	Push(msg *mq.MsgChatTransfer) error
+	Push(msg *types.MsgChatTransfer) error
 }
 
 type msgChatTransferClient struct {
@@ -19,7 +19,7 @@ func NewMsgChatTransferClient(addr []string, topic string, opts ...kq.Pusher) Ms
 	return &msgChatTransferClient{pusher: kq.NewPusher(addr, topic)}
 }
 
-func (m *msgChatTransferClient) Push(msg *mq.MsgChatTransfer) error {
+func (m *msgChatTransferClient) Push(msg *types.MsgChatTransfer) error {
 	data, err := json.Marshal(msg)
 	if err != nil {
 		return err
@@ -28,7 +28,7 @@ func (m *msgChatTransferClient) Push(msg *mq.MsgChatTransfer) error {
 }
 
 type MsgReadTransferClient interface {
-	Push(msg *mq.MsgMarkRead) error
+	Push(msg *types.MsgMarkRead) error
 }
 
 type msgReadTransferClient struct {
@@ -39,7 +39,7 @@ func NewMsgReadTransferClient(addr []string, topic string, opts ...kq.Pusher) Ms
 	return &msgReadTransferClient{pusher: kq.NewPusher(addr, topic)}
 }
 
-func (m *msgReadTransferClient) Push(msg *mq.MsgMarkRead) error {
+func (m *msgReadTransferClient) Push(msg *types.MsgMarkRead) error {
 	data, err := json.Marshal(msg)
 	if err != nil {
 		return err

+ 0 - 66
pkg/bitmap/bitmap.go

@@ -1,66 +0,0 @@
-package bitmap
-
-type BitMap struct {
-	bits []byte
-	size int
-}
-
-func NewBitMap(size int) *BitMap {
-	if size == 0 {
-		size = 250
-	}
-
-	return &BitMap{
-		bits: make([]byte, size), // 字节的大小,每个字节中存8个bit
-		size: size * 8,           // 整个bitmap的大小
-	}
-}
-
-func (b *BitMap) Set(id int64) {
-	// 计算id在哪个bit
-	idx := hash(id) % b.size
-	// 计算在哪个byte
-	byteIdx := idx / 8
-	// 计算byte中的bit位置
-	bitIdx := idx % 8
-
-	b.bits[byteIdx] |= 1 << bitIdx
-}
-
-func (b *BitMap) IsSet(id int64) bool {
-	// 计算id在哪个bit
-	idx := hash(id) % b.size
-	// 计算在哪个byte
-	byteIdx := idx / 8
-	// 计算byte中的bit位置
-	bitIdx := idx % 8
-
-	return (b.bits[byteIdx] & (1 << bitIdx)) != 0
-}
-
-func (b *BitMap) Export() []byte {
-	return b.bits
-}
-
-func Load(bits []byte) *BitMap {
-	if len(bits) == 0 {
-		return NewBitMap(0)
-	}
-	return &BitMap{
-		bits: bits,
-		size: len(bits) * 8,
-	}
-}
-
-func hash(id int64) int {
-	// 使用BKDR哈希算法
-	seed := 131313 // 31 131 1313 13131 131313, etc
-	hash := 0
-
-	// 将整数id拆分为每个字节,并使用BKDR哈希算法计算哈希值
-	for i := 0; i < 8; i++ {
-		hash = hash*seed + int((id>>(i*8))&0xFF)
-	}
-
-	return hash & 0x7FFFFFFF
-}

+ 0 - 20
pkg/constants/es.go

@@ -1,20 +0,0 @@
-package constants
-
-type SearchType int32
-
-// 1、按照用户id查询2、按照用户推文权限搜索3、搜索全文
-const (
-	SearchByUserID SearchType = iota + 1
-	SearchByVisit
-	SearchByContent
-)
-
-type EsSortType int32
-
-// 1-按照用户热度排序2-按照用户创建时间排序3-按照用户最后回复时间排序4-用户置顶排序
-const (
-	SortByUserScore EsSortType = iota + 1
-	SortByUserCreateTime
-	SortByUserLastReply
-	SortByUserTop
-)

+ 0 - 31
pkg/es/esclient.go

@@ -1,31 +0,0 @@
-package es
-
-import (
-	"crypto/tls"
-	"fmt"
-	"net"
-	"net/http"
-	"time"
-
-	"github.com/elastic/go-elasticsearch/v8"
-)
-
-func GetEsClient(esHost string, esPort int64, esUsername, esPassword string) (*elasticsearch.Client, error) {
-	cfg := elasticsearch.Config{
-		Addresses: []string{
-			fmt.Sprintf("http://%s:%d", esHost, esPort),
-		},
-		Username: esUsername,
-		Password: esPassword,
-		Transport: &http.Transport{
-			MaxIdleConnsPerHost:   10,
-			ResponseHeaderTimeout: time.Second,
-			DialContext:           (&net.Dialer{Timeout: time.Second}).DialContext,
-			TLSClientConfig: &tls.Config{
-				MinVersion: tls.VersionTLS12,
-			},
-		},
-	}
-	return elasticsearch.NewClient(cfg)
-
-}

+ 0 - 358
pkg/es/tweet_es.go

@@ -1,358 +0,0 @@
-package es
-
-import (
-	"bytes"
-	"context"
-	"encoding/json"
-	"fmt"
-	"github.com/elastic/go-elasticsearch/v8"
-	"net/http"
-	"paopaoimtask/pkg/constants"
-	"strconv"
-	"strings"
-)
-
-type TweetIndex struct {
-}
-
-const __tweetMapping = `{
-	"mappings": {
-	  "properties": {
-		"id": {
-		  "type": "integer"
-		},
-		"user_id": {
-		  "type": "integer"
-		},
-		"contents": {
-		  "type": "nested",
-		  "properties": {
-			"id": {
-			  "type": "integer"
-			},
-			"post_id": {
-			  "type": "integer"
-			},
-			"content": {
-			  "type": "text",
-			  "fields": {
-				"keyword": {
-				  "type": "keyword",
-				  "ignore_above": 256
-				}
-			  }
-			},
-			"type": {
-			  "type": "integer"
-			},
-			"sort": {
-			  "type": "integer"
-			}
-		  }
-		},
-		"comment_count": {
-		  "type": "integer"
-		},
-		"collection_count": {
-		  "type": "integer"
-		},
-		"share_count": {
-		  "type": "integer"
-		},
-		"upvote_count": {
-		  "type": "integer"
-		},
-		"visibility": {
-		  "type": "integer"
-		},
-		"is_top": {
-		  "type": "integer"
-		},
-		"is_essence": {
-		  "type": "integer"
-		},
-		"is_lock": {
-		  "type": "integer"
-		},
-		"latest_replied_on": {
-		  "type": "integer"
-		},
-		"created_on": {
-		  "type": "integer"
-		},
-		"modified_on": {
-		  "type": "integer"
-		},
-		"tags": {
-		  "type": "object",
-		  "dynamic": true,
-		  "properties": {
-			"*": {
-			  "type": "integer"
-			}
-		  }
-		},
-		"attachment_price": {
-		  "type": "integer"
-		},
-		"ip_loc": {
-		  "type": "text"
-		}
-	  }
-	}
-}`
-
-type TweetEsData struct {
-	Id       int64 `json:"id"`
-	UserId   int64 `json:"user_id"`
-	Contents []struct {
-		ID      int64  `json:"id"`
-		PostID  int64  `json:"post_id"`
-		Content string `json:"content"`
-		Type    int32  `json:"type"`
-		Sort    int32  `json:"sort"`
-	} `json:"contents"`
-	CommentCount    int64            `json:"comment_count"`
-	CollectionCount int64            `json:"collection_count"`
-	ShareCount      int64            `json:"share_count"`
-	UpvoteCount     int64            `json:"upvote_count"`
-	Visibility      int32            `json:"visibility"`
-	IsTop           int32            `json:"is_top"`
-	IsEssence       int32            `json:"is_essence"`
-	IsLock          int32            `json:"is_lock"`
-	LatestRepliedOn int64            `json:"latest_replied_on"`
-	CreatedOn       int64            `json:"created_on"`
-	ModifiedOn      int64            `json:"modified_on"`
-	Tags            map[string]int32 `json:"tags"`
-	AttachmentPrice int64            `json:"attachment_price"`
-	IpLoc           string           `json:"ip_loc"`
-}
-
-//func (t *TweetEsData) MarshalJSON() ([]byte, error) {
-//	return json.Marshal(t)
-//}
-
-func (t *TweetIndex) Create(tsData *TweetEsData, esClient *elasticsearch.Client, esTweetIndex string) error {
-	// 判断是否有索引
-	indexBody, err := esClient.Indices.Exists([]string{esTweetIndex})
-	if err != nil || indexBody.StatusCode == http.StatusNotFound {
-		fmt.Println("aaaaaaaaaa:内庭院")
-		cBody, cErr := esClient.Indices.Create(esTweetIndex, esClient.Indices.Create.WithBody(strings.NewReader(__tweetMapping)))
-		if cErr != nil {
-			return cErr
-		}
-		defer cBody.Body.Close()
-	}
-	defer indexBody.Body.Close()
-	docId := tsData.Id
-	doc, _ := json.Marshal(tsData)
-	reader := bytes.NewReader(doc)
-	indexRes, iErr := esClient.Index(esTweetIndex, reader, esClient.Index.WithDocumentID(strconv.FormatInt(docId, 10)))
-	if iErr != nil {
-		return iErr
-	}
-	defer indexRes.Body.Close()
-	return nil
-}
-
-// Get tweet es 查询,1、按照用户id查询2、按照用户推文权限搜索3、搜索全文, 排序:1-按照用户热度排序2-获取最新发布的文章3-按照用户最后回复时间排序4-用户置顶排序
-func (t *TweetIndex) Get(esClient *elasticsearch.Client, esTweetIndex string, searchType []constants.SearchType, sortType []constants.EsSortType, userId int64, userVisit int32, tweetContent string, limit int64, offset int64) (tweetInfos []*TweetEsData, total int64, err error) {
-	var filters = make([]map[string]interface{}, 0)
-	tweetInfos = make([]*TweetEsData, 0)
-	total = 0
-	for _, st := range searchType {
-		switch st {
-		case constants.SearchByUserID:
-			filters = append(filters, map[string]interface{}{
-				"term": map[string]interface{}{
-					"user_id": userId,
-				},
-			})
-			break
-		case constants.SearchByVisit:
-			filters = append(filters, map[string]interface{}{
-				"term": map[string]interface{}{
-					"visibility": userVisit,
-				},
-			})
-			break
-		case constants.SearchByContent:
-			filters = append(filters, map[string]interface{}{
-				"nested": map[string]interface{}{
-					"path": "contents",
-					"query": map[string]interface{}{
-						"query_string": map[string]interface{}{
-							"query":  "*" + tweetContent + "*",
-							"fields": []string{"contents.content", "contents.content.keyword"},
-						},
-					},
-				},
-			})
-			break
-		default:
-			break
-		}
-	}
-
-	var sorts = make([]map[string]interface{}, 0)
-	for _, esSortType := range sortType {
-		switch esSortType {
-		case constants.SortByUserScore:
-			fallthrough
-		case constants.SortByUserCreateTime:
-			sorts = append(sorts, map[string]interface{}{
-				"created_on": map[string]interface{}{
-					"order": "desc",
-				},
-			})
-			break
-		case constants.SortByUserLastReply:
-			sorts = append(sorts, map[string]interface{}{
-				"latest_replied_on": map[string]interface{}{
-					"order": "desc",
-				},
-			})
-			break
-		case constants.SortByUserTop:
-			sorts = append(sorts, map[string]interface{}{
-				"is_top": map[string]interface{}{
-					"order": "desc",
-				},
-			})
-			break
-		default:
-			break
-		}
-	}
-	var query map[string]interface{}
-	var from, size int64 = offset, limit
-	if limit <= 0 {
-		size = 10
-	}
-	if offset <= 0 {
-		from = 0
-	}
-	if len(filters) == 0 {
-		query = map[string]interface{}{
-			"query": map[string]interface{}{
-				"match_all": map[string]interface{}{},
-			},
-			"sort": sorts,
-			"from": 0,
-			"size": 10,
-		}
-	} else {
-		query = map[string]interface{}{
-			"query": map[string]interface{}{
-				"bool": map[string]interface{}{
-					"filter": filters,
-				},
-			},
-			"sort": sorts,
-			"from": from,
-			"size": size,
-		}
-	}
-	fmt.Println(query)
-	marshal, _ := json.Marshal(query)
-	result, err := esClient.Search(
-		esClient.Search.WithContext(context.Background()),
-		esClient.Search.WithIndex(esTweetIndex),
-		esClient.Search.WithBody(bytes.NewReader(marshal)),
-	)
-	if err != nil {
-		fmt.Println("errr: ", err.Error())
-		return
-	}
-	defer result.Body.Close()
-	if result.IsError() {
-		fmt.Println("搜索报错了: ", result.String())
-		return
-	}
-	var rows map[string]interface{}
-	if err = json.NewDecoder(result.Body).Decode(&rows); err != nil {
-		return
-	}
-	if count, ok := rows["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"]; ok {
-		total = int64(count.(float64))
-	}
-	hits := rows["hits"].(map[string]interface{})["hits"].([]interface{})
-	for _, item := range hits {
-		if row, ok := item.(map[string]interface{})["_source"]; ok {
-			sourceByte, _ := json.Marshal(row)
-			var doc TweetEsData
-			err = json.Unmarshal(sourceByte, &doc)
-			if err != nil {
-				return
-			}
-			tweetInfos = append(tweetInfos, &doc)
-		}
-	}
-	return
-}
-
-func (t *TweetIndex) Delete(id int64, esClient *elasticsearch.Client, esTweetIndex string) error {
-	exists, err := esClient.Indices.Exists([]string{esTweetIndex})
-	if err != nil || exists.StatusCode == http.StatusNotFound {
-		// 没有找到对应的索引,返回错误
-		return err
-	}
-	defer exists.Body.Close()
-	response, err := esClient.Delete(esTweetIndex, strconv.FormatInt(id, 10))
-	if err != nil || response.StatusCode == http.StatusNotFound {
-		return err
-	}
-	defer response.Body.Close()
-	return nil
-}
-
-func (t *TweetIndex) Update(id int64, esClient *elasticsearch.Client, esTweetIndex string, values map[string]interface{}) error {
-	exists, err := esClient.Indices.Exists([]string{esTweetIndex})
-	if err != nil || exists.StatusCode == http.StatusNotFound {
-		// 没有找到对应的索引,返回错误
-		return err
-	}
-	defer exists.Body.Close()
-	marshal, _ := json.Marshal(map[string]interface{}{"doc": values})
-	update, err := esClient.Update(esTweetIndex, strconv.FormatInt(id, 10), bytes.NewReader(marshal))
-	if err != nil || update.StatusCode == http.StatusNotFound {
-		return err
-	}
-	defer update.Body.Close()
-	return nil
-}
-
-func (t *TweetIndex) GetRow(id int64, esClient *elasticsearch.Client, esTweetIndex string) (tweetInfos *TweetEsData, err error) {
-	exists, err := esClient.Indices.Exists([]string{esTweetIndex})
-	if err != nil || exists.StatusCode == http.StatusNotFound {
-		// 没有找到对应的索引,返回错误
-		return nil, err
-	}
-	defer exists.Body.Close()
-	res, err := esClient.Get(esTweetIndex, strconv.FormatInt(id, 10))
-	if err != nil {
-		return nil, err
-	}
-	if res.IsError() {
-		return nil, err
-	}
-	fmt.Println(res)
-	var row map[string]interface{}
-	if err = json.NewDecoder(res.Body).Decode(&row); err != nil {
-		return nil, err
-	}
-	if found, ok := row["found"]; ok && !found.(bool) {
-		return nil, nil
-	}
-	if source, ok := row["_source"]; ok {
-		// 有数据
-		marshal, _ := json.Marshal(source)
-		var doc TweetEsData
-		err = json.Unmarshal(marshal, &doc)
-		if err != nil {
-			return nil, err
-		}
-		return &doc, nil
-	}
-	return nil, nil
-}

+ 0 - 67
pkg/log/zaplog.go

@@ -1,67 +0,0 @@
-package log
-
-import (
-	"fmt"
-	"github.com/zeromicro/go-zero/core/logx"
-	"go.uber.org/zap"
-)
-
-const callerSkipOffset = 3
-
-type ZapWriter struct {
-	logger *zap.Logger
-}
-
-func NewZapWriter(opts ...zap.Option) logx.Writer {
-	opts = append(opts, zap.AddCallerSkip(callerSkipOffset))
-	logger, _ := zap.NewProduction(opts...)
-	defer logger.Sync()
-
-	return &ZapWriter{
-		logger: logger,
-	}
-}
-
-func (w *ZapWriter) Alert(v interface{}) {
-	w.logger.Error(fmt.Sprint(v))
-}
-
-func (w *ZapWriter) Close() error {
-	return w.logger.Sync()
-}
-
-func (w *ZapWriter) Debug(v interface{}, fields ...logx.LogField) {
-	w.logger.Debug(fmt.Sprint(v), toZapFields(fields...)...)
-}
-
-func (w *ZapWriter) Error(v interface{}, fields ...logx.LogField) {
-	w.logger.Error(fmt.Sprint(v), toZapFields(fields...)...)
-}
-
-func (w *ZapWriter) Info(v interface{}, fields ...logx.LogField) {
-	w.logger.Info(fmt.Sprint(v), toZapFields(fields...)...)
-}
-
-func (w *ZapWriter) Severe(v interface{}) {
-	w.logger.Fatal(fmt.Sprint(v))
-}
-
-func (w *ZapWriter) Slow(v interface{}, fields ...logx.LogField) {
-	w.logger.Warn(fmt.Sprint(v), toZapFields(fields...)...)
-}
-
-func (w *ZapWriter) Stack(v interface{}) {
-	w.logger.Error(fmt.Sprint(v), zap.Stack("stack"))
-}
-
-func (w *ZapWriter) Stat(v interface{}, fields ...logx.LogField) {
-	w.logger.Info(fmt.Sprint(v), toZapFields(fields...)...)
-}
-
-func toZapFields(fields ...logx.LogField) []zap.Field {
-	zapFields := make([]zap.Field, 0, len(fields))
-	for _, f := range fields {
-		zapFields = append(zapFields, zap.Any(f.Key, f.Value))
-	}
-	return zapFields
-}

+ 0 - 41
task.go

@@ -1,41 +0,0 @@
-package main
-
-import (
-	"flag"
-	"fmt"
-	"github.com/zeromicro/go-zero/core/conf"
-	"github.com/zeromicro/go-zero/core/logx"
-	"github.com/zeromicro/go-zero/core/service"
-	"paopaoimtask/internal/config"
-	"paopaoimtask/internal/handler"
-	"paopaoimtask/internal/svc"
-	"paopaoimtask/pkg/log"
-)
-
-var configFile = flag.String("f", "etc/task-mq.yaml", "the config file")
-
-func main() {
-	flag.Parse()
-
-	var c config.Config
-	conf.MustLoad(*configFile, &c)
-
-	if err := c.SetUp(); err != nil {
-		panic(err)
-	}
-
-	ctx := svc.NewServiceContext(c)
-
-	writer := log.NewZapWriter()
-	logx.SetWriter(writer)
-
-	listen := handler.NewListen(ctx)
-
-	serviceGroup := service.NewServiceGroup()
-	for _, s := range listen.Services() {
-		serviceGroup.Add(s)
-	}
-
-	fmt.Printf("starting task server at %v....\n", c.ListenOn)
-	serviceGroup.Start()
-}

+ 4 - 4
mqClient/tweetTransfer.go → tweetTransfer.go

@@ -1,14 +1,14 @@
-package mqClient
+package paopaoqueue
 
 import (
 	"context"
 	"encoding/json"
 	"github.com/zeromicro/go-queue/kq"
-	"paopaoimtask/mq"
+	"paopaoqueue/types"
 )
 
 type TweetTransferClient interface {
-	Push(msg *mq.MsgTweetData) error
+	Push(msg *types.MsgTweetData) error
 }
 
 type tweetTransferClient struct {
@@ -19,7 +19,7 @@ func NewTweetTransferClient(addr []string, topic string, opts ...kq.Pusher) Twee
 	return &tweetTransferClient{pusher: kq.NewPusher(addr, topic)}
 }
 
-func (m *tweetTransferClient) Push(msg *mq.MsgTweetData) error {
+func (m *tweetTransferClient) Push(msg *types.MsgTweetData) error {
 	data, err := json.Marshal(msg)
 	if err != nil {
 		return err

+ 1 - 32
pkg/constants/task.go → types/constant.go

@@ -1,4 +1,4 @@
-package constants
+package types
 
 type MType int
 
@@ -20,10 +20,6 @@ const (
 	ContentMakeRead
 )
 
-const (
-	SYSTEM_ROOT_UID = -1000000
-)
-
 type PostVisibleT uint8
 
 const (
@@ -33,33 +29,6 @@ const (
 	PostVisitFollowing PostVisibleT = 60
 )
 
-type TweetVisibleType int8
-
-const (
-	// 推文可见性
-	TweetVisitPublic TweetVisibleType = iota
-	TweetVisitPrivate
-	TweetVisitFriend
-	TweetVisitFollowing
-	TweetVisitInvalid
-)
-
-// ToVisibleValue 转换权限
-func ToVisibleValue(visitType TweetVisibleType) PostVisibleT {
-	switch visitType {
-	case TweetVisitPublic:
-		return PostVisitPublic
-	case TweetVisitPrivate:
-		return PostVisitPrivate
-	case TweetVisitFriend:
-		return PostVisitFriend
-	case TweetVisitFollowing:
-		return PostVisitFollowing
-	default:
-		return PostVisitPrivate
-	}
-}
-
 type PostContentT int
 
 const (

+ 41 - 0
types/message.go

@@ -0,0 +1,41 @@
+package types
+
+type (
+	Msg struct {
+		MType       `mapstructure:"mType"`
+		Content     string            `mapstructure:"content"`     //内容
+		MsgID       string            `mapstructure:"msgId"`       // 消息id
+		ReadRecords map[string]string `mapstructure:"readRecords"` // 阅读消息记录
+	}
+
+	Chat struct {
+		ConversationId string                    `mapstructure:"conversationId"` // 会话id
+		ChatType       `mapstructure:"chatType"` // 会话类型
+		SendID         int64                     `mapstructure:"sendId"` //发送者
+		RecvID         int64                     `mapstructure:"recvId"` //接收者
+		Msg            `mapstructure:"msg"`      // 消息接口
+		SendTime       int64                     `mapstructure:"sendTime"` // 消息发送时间
+	}
+
+	Push struct {
+		ConversationId string                    `mapstructure:"conversationId"` // 会话id
+		ChatType       `mapstructure:"chatType"` // 会话类型
+		SendID         int64                     `mapstructure:"sendId"`      //发送者
+		RecvID         int64                     `mapstructure:"recvId"`      //接收者
+		RecvIDs        []int64                   `mapstructure:"recvIds"`     //多个接收者
+		SendTime       int64                     `mapstructure:"sendTime"`    //消息发送时间
+		MsgID          string                    `mapstructure:"msgId"`       // 消息id
+		ReadRecords    map[string]string         `mapstructure:"readRecords"` //消息阅读记录
+		ContentType    ContentType               `mapstructure:"contentType"` // 消息内容类型
+		MType          `mapstructure:"mType"`
+		Content        string `mapstructure:"content"` // 消息内容
+		ID             string `json:"id"`              // 消息id
+	}
+
+	MarkRead struct {
+		ChatType       `mapstructure:"chatType"`
+		RecvID         int64    `mapstructure:"recvId"` //接收者
+		ConversationId string   `mapstructure:"conversationId"`
+		MsgIds         []string `mapstructure:"msgIds"`
+	}
+)

+ 61 - 0
types/mqdata.go

@@ -0,0 +1,61 @@
+package types
+
+type MsgChatTransfer struct {
+	MsgId          string `mapstructure:"msgId"`
+	ConversationId string `json:"conversationId"`
+	ChatType       `json:"chatType"`
+	SendID         int64   `json:"sendId"`  //发送者
+	RecvID         int64   `json:"recvId"`  //接收者
+	RecvIDs        []int64 `json:"recvIds"` //多个接收者
+	SendTime       int64   `json:"sendTime"`
+
+	MType   `json:"mType"`
+	Content string `json:"content"`
+}
+
+type MsgMarkRead struct {
+	ConversationId string `json:"conversationId"`
+	ChatType       `json:"chatType"`
+	SendID         int64    `json:"sendId"` //发送者
+	RecvID         int64    `json:"recvId"` //接收者
+	MsgIds         []string `json:"msgIds"`
+}
+
+type MsgTweetData struct {
+	ID              int64                 `json:"id"`
+	UserID          int64                 `json:"user_id"`
+	CommentCount    int64                 `json:"comment_count"`
+	CollectionCount int64                 `json:"collection_count"`
+	ShareCount      int64                 `json:"share_count"`
+	UpvoteCount     int64                 `json:"upvote_count"`
+	Visibility      PostVisibleT          `json:"visibility"`
+	IsTop           int                   `json:"is_top"`
+	IsEssence       int                   `json:"is_essence"`
+	IsLock          int                   `json:"is_lock"`
+	LatestRepliedOn int64                 `json:"latest_replied_on"`
+	Tags            string                `json:"tags"`
+	AttachmentPrice int64                 `json:"attachment_price"`
+	IP              string                `json:"ip"`
+	IPLoc           string                `json:"ip_loc"`
+	Contents        []MsgTweetContentData `json:"contents"`
+	CreatedOn       int64                 `json:"created_on"`
+	ModifiedOn      int64                 `json:"modified_on"`
+	DeletedOn       int64                 `json:"deleted_on"`
+	IsDel           int64                 `json:"is_del"`
+}
+
+type MsgTweetContentData struct {
+	ID      int64        `json:"id"`
+	PostID  int64        `json:"post_id"`
+	UserID  int64        `json:"user_id"`
+	Content string       `json:"content"`
+	Type    PostContentT `json:"type"`
+	Sort    int64        `json:"sort"`
+}
+
+type UserBillContentData struct {
+	PostID       int64 `json:"post_id"`        // 咨询id
+	UserID       int64 `json:"user_id"`        // 用户id
+	ProfitUserID int64 `json:"profit_user_id"` // 受益者id
+	Price        int64 `json:"price"`          // 购买价格
+}

+ 4 - 4
mqClient/userbilltransfer.go → userbilltransfer.go

@@ -1,15 +1,15 @@
-package mqClient
+package paopaoqueue
 
 import (
 	"context"
 	"encoding/json"
 	"fmt"
 	"github.com/zeromicro/go-queue/kq"
-	"paopaoimtask/mq"
+	"paopaoqueue/types"
 )
 
 type UserBillTransferClient interface {
-	Push(msg *mq.UserBillContentData) error
+	Push(msg *types.UserBillContentData) error
 }
 
 type userBillTransferClient struct {
@@ -21,7 +21,7 @@ func NewUserBillTransferClient(addr []string, topic string, opts ...kq.Pusher) U
 	return &userBillTransferClient{pusher: kq.NewPusher(addr, topic)}
 }
 
-func (m *userBillTransferClient) Push(msg *mq.UserBillContentData) error {
+func (m *userBillTransferClient) Push(msg *types.UserBillContentData) error {
 	data, err := json.Marshal(msg)
 	if err != nil {
 		return err