v_grhhuang 1 年間 前
コミット
aef23f17d3

+ 92 - 0
etc/task-mq.yaml

@@ -0,0 +1,92 @@
+# 模块名称
+Name: task-mq
+# 模块监听的ip和端口
+ListenOn: 0.0.0.0:10001
+# 用户消息消费配置,这里是采用go-zero自带的kafka配置,可以参考https://go-zero.dev/docs/tutorials/message-queue/kafka
+MsgChatTransfer:
+  # service的名称
+  Name: MsgChatTransfer
+  # kafka 的多个 Broker 节点
+  Brokers:
+    - 192.168.50.14:19094
+  # 消费者组
+  Group: kafka
+  # 订阅的 Topic 主题
+  Topic: msgChatTransfer
+  # 从头开始消费
+  Offset: first
+  # go-queue 内部是起多个 goroutine 从 kafka 中获取信息写入进程内的 channel,这个参数是控制此处的 goroutine 数量(⚠️ 并不是真正消费时的并发 goroutine 数量)
+  Consumers: 1
+# 如上的配置参数;此配置用于消息已读
+MsgReadTransfer:
+  Name: MsgReadTransfer
+  Brokers:
+    - 192.168.50.14:19094
+  Group: kafka
+  Topic: msgReadTransfer
+  Offset: first
+  Consumers: 1
+# 如上的配置参数;此配置用于资讯模块的异步写es和缓存
+TweetTransfer:
+  Name: TweetTransfer
+  Brokers:
+    - 192.168.50.14:19094
+  Group: kafka
+  Topic: tweetTransfer
+  Offset: first
+  Consumers: 1
+# 如上的配置参数;此配置用于用户购买附件之后的异步处理
+UserBillTransfer:
+  Name: UserBillTransfer
+  Brokers:
+    - 192.168.50.14:19094
+  Group: kafka
+  Topic: userBillTransfer
+  Offset: first
+  Consumers: 1
+# 消息已读处理的配置
+MsgReadHandler:
+  GroupMsgReadHandler: 1
+  GroupMsgReadRecordDelayTime: 60
+  GroupMsgReadRecordDelayCount: 2
+# redis的配置参数
+Redisx:
+  Host: 192.168.50.14:16379
+  Type: node
+  Pass: easy-chat
+# mongo的配置信息
+Mongo:
+  url: "mongodb://root:123456@192.168.50.14:47017"
+  Db: easy-chat
+# websocket的地址
+Ws:
+  Host: paopao-im-ws-svc:10003
+# 好友模块的地址
+SocialRpc:
+  Target: k8s://paopao-im/paopao-social-rpc-svc:10002
+# 用户模块的地址
+UserRpc:
+  Target: k8s://paopao-im/paopao-user-rpc-svc:10000
+# es的连接配置参数
+ElasticsearchConf:
+  # es ip地址
+  Host: 192.168.50.14
+  # es 的端口
+  Port: 19200
+  # es的用户名
+  Username: elastic
+  # es的账号密码
+  Password: huang123456
+  # es中需要写入的索引
+  TweetIndex: "es_index_tweet"
+
+Log:
+  ServiceName: task-mq
+  Mode: file
+  Encoding: json
+  Path: "/var/logs"
+  Level: "info"
+  Compress: true
+  KeepDays: 30
+  StackCooldownMillis: 100
+  Rotation: daily

+ 110 - 0
go.mod

@@ -0,0 +1,110 @@
+module paopaoimtask
+
+go 1.23.2
+
+require (
+	github.com/elastic/go-elasticsearch/v8 v8.16.0
+	github.com/jinzhu/copier v0.4.0
+	github.com/zeromicro/go-queue v1.2.2
+	github.com/zeromicro/go-zero v1.7.3
+	go.mongodb.org/mongo-driver v1.17.1
+	go.uber.org/zap v1.24.0
+)
+
+require (
+	github.com/golang/snappy v0.0.4 // indirect
+	github.com/gorilla/websocket v1.5.3 // indirect
+	github.com/montanaflynn/stats v0.7.1 // indirect
+	github.com/xdg-go/pbkdf2 v1.0.0 // indirect
+	github.com/xdg-go/scram v1.1.2 // indirect
+	github.com/xdg-go/stringprep v1.0.4 // indirect
+	github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
+	golang.org/x/crypto v0.28.0 // indirect
+	golang.org/x/sync v0.9.0 // indirect
+)
+
+require (
+	github.com/beorn7/perks v1.0.1 // indirect
+	github.com/cenkalti/backoff/v4 v4.3.0 // indirect
+	github.com/cespare/xxhash/v2 v2.3.0 // indirect
+	github.com/coreos/go-semver v0.3.1 // indirect
+	github.com/coreos/go-systemd/v22 v22.5.0 // indirect
+	github.com/davecgh/go-spew v1.1.1 // indirect
+	github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
+	github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect
+	github.com/emicklei/go-restful/v3 v3.11.0 // indirect
+	github.com/fatih/color v1.17.0 // indirect
+	github.com/go-logr/logr v1.4.2 // indirect
+	github.com/go-logr/stdr v1.2.2 // indirect
+	github.com/go-openapi/jsonpointer v0.19.6 // indirect
+	github.com/go-openapi/jsonreference v0.20.2 // indirect
+	github.com/go-openapi/swag v0.22.4 // indirect
+	github.com/gogo/protobuf v1.3.2 // indirect
+	github.com/golang/mock v1.6.0 // indirect
+	github.com/golang/protobuf v1.5.4 // indirect
+	github.com/google/gnostic-models v0.6.8 // indirect
+	github.com/google/go-cmp v0.6.0 // indirect
+	github.com/google/gofuzz v1.2.0 // indirect
+	github.com/google/uuid v1.6.0 // indirect
+	github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0 // indirect
+	github.com/josharian/intern v1.0.0 // indirect
+	github.com/json-iterator/go v1.1.12 // indirect
+	github.com/klauspost/compress v1.17.9 // indirect
+	github.com/mailru/easyjson v0.7.7 // indirect
+	github.com/mattn/go-colorable v0.1.13 // indirect
+	github.com/mattn/go-isatty v0.0.20 // indirect
+	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
+	github.com/modern-go/reflect2 v1.0.2 // indirect
+	github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
+	github.com/openzipkin/zipkin-go v0.4.3 // indirect
+	github.com/pelletier/go-toml/v2 v2.2.2 // indirect
+	github.com/pierrec/lz4/v4 v4.1.21 // indirect
+	github.com/prometheus/client_golang v1.20.5 // indirect
+	github.com/prometheus/client_model v0.6.1 // indirect
+	github.com/prometheus/common v0.55.0 // indirect
+	github.com/prometheus/procfs v0.15.1 // indirect
+	github.com/redis/go-redis/v9 v9.7.0 // indirect
+	github.com/segmentio/kafka-go v0.4.47 // indirect
+	github.com/spaolacci/murmur3 v1.1.0 // indirect
+	go.etcd.io/etcd/api/v3 v3.5.15 // indirect
+	go.etcd.io/etcd/client/pkg/v3 v3.5.15 // indirect
+	go.etcd.io/etcd/client/v3 v3.5.15 // indirect
+	go.opentelemetry.io/otel v1.32.0 // indirect
+	go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect
+	go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0 // indirect
+	go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.32.0 // indirect
+	go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.32.0 // indirect
+	go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.32.0 // indirect
+	go.opentelemetry.io/otel/exporters/zipkin v1.32.0 // indirect
+	go.opentelemetry.io/otel/metric v1.32.0 // indirect
+	go.opentelemetry.io/otel/sdk v1.32.0 // indirect
+	go.opentelemetry.io/otel/trace v1.32.0 // indirect
+	go.opentelemetry.io/proto/otlp v1.3.1 // indirect
+	go.uber.org/atomic v1.10.0 // indirect
+	go.uber.org/automaxprocs v1.6.0 // indirect
+	go.uber.org/multierr v1.9.0 // indirect
+	golang.org/x/net v0.30.0 // indirect
+	golang.org/x/oauth2 v0.23.0 // indirect
+	golang.org/x/sys v0.27.0 // indirect
+	golang.org/x/term v0.25.0 // indirect
+	golang.org/x/text v0.20.0 // indirect
+	golang.org/x/time v0.7.0 // indirect
+	google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 // indirect
+	google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 // indirect
+	google.golang.org/grpc v1.68.0 // indirect
+	google.golang.org/protobuf v1.35.2 // indirect
+	gopkg.in/inf.v0 v0.9.1 // indirect
+	gopkg.in/yaml.v2 v2.4.0 // indirect
+	gopkg.in/yaml.v3 v3.0.1 // indirect
+	k8s.io/api v0.29.3 // indirect
+	k8s.io/apimachinery v0.29.4 // indirect
+	k8s.io/client-go v0.29.3 // indirect
+	k8s.io/klog/v2 v2.110.1 // indirect
+	k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect
+	k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect
+	paopaoimrpc v1.0.0
+	paopaoimws v1.0.0
+	sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
+	sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
+	sigs.k8s.io/yaml v1.3.0 // indirect
+)

+ 44 - 0
internal/config/config.go

@@ -0,0 +1,44 @@
+package config
+
+import (
+	"github.com/zeromicro/go-queue/kq"
+	"github.com/zeromicro/go-zero/core/service"
+	"github.com/zeromicro/go-zero/core/stores/redis"
+	"github.com/zeromicro/go-zero/zrpc"
+)
+
+type Config struct {
+	service.ServiceConf
+	ListenOn         string
+	MsgChatTransfer  kq.KqConf
+	MsgReadTransfer  kq.KqConf
+	TweetTransfer    kq.KqConf
+	UserBillTransfer kq.KqConf
+	Redisx           redis.RedisConf
+
+	Mongo struct {
+		Url string
+		Db  string
+	}
+
+	Ws struct {
+		Host string
+	}
+
+	SocialRpc zrpc.RpcClientConf
+	UserRpc   zrpc.RpcClientConf
+
+	MsgReadHandler struct {
+		GroupMsgReadHandler          int32
+		GroupMsgReadRecordDelayTime  int64
+		GroupMsgReadRecordDelayCount int64
+	}
+
+	ElasticSearchConf struct {
+		Host       string
+		Port       int64
+		Username   string
+		Password   string
+		TweetIndex string
+	}
+}

+ 29 - 0
internal/handler/listen.go

@@ -0,0 +1,29 @@
+package handler
+
+import (
+	"github.com/zeromicro/go-queue/kq"
+	"github.com/zeromicro/go-zero/core/service"
+	"paopaoimtask/internal/handler/msgTransfer"
+	"paopaoimtask/internal/handler/tweetTransfer"
+	"paopaoimtask/internal/handler/userBillTransfer"
+	"paopaoimtask/internal/svc"
+)
+
+type Listen struct {
+	svc *svc.ServiceContext
+}
+
+func NewListen(svc *svc.ServiceContext) *Listen {
+	return &Listen{
+		svc,
+	}
+}
+
+func (l *Listen) Services() []service.Service {
+	return []service.Service{
+		kq.MustNewQueue(l.svc.Config.MsgChatTransfer, msgTransfer.NewMsgChatTransfer(l.svc)),
+		kq.MustNewQueue(l.svc.Config.MsgReadTransfer, msgTransfer.NewMsgReadTransfer(l.svc)),
+		kq.MustNewQueue(l.svc.Config.TweetTransfer, tweetTransfer.NewTweet2EsTransfer(l.svc)),
+		kq.MustNewQueue(l.svc.Config.UserBillTransfer, userBillTransfer.NewUserBillTransfer(l.svc)),
+	}
+}

+ 135 - 0
internal/handler/msgTransfer/groupMsgRead.go

@@ -0,0 +1,135 @@
+package msgTransfer
+
+import (
+	"github.com/zeromicro/go-zero/core/logx"
+	"paopaoimtask/pkg/constants"
+	"paopaoimws/ws"
+	"sync"
+	"time"
+)
+
+type groupMsgRead struct {
+	mu             sync.Mutex
+	conversationId string
+	push           *ws.Push      //推送方法
+	pushCh         chan *ws.Push // 通道推送
+
+	count int // 数量
+
+	pushTime time.Time // 上次推送时间
+
+	done chan struct{}
+}
+
+func newGroupMsgRead(push *ws.Push, pushCh chan *ws.Push) *groupMsgRead {
+	g := &groupMsgRead{
+		push:           push,
+		conversationId: push.ConversationId,
+		pushCh:         pushCh,
+		count:          1,
+		pushTime:       time.Time{},
+		done:           make(chan struct{}),
+	}
+	go g.transfer()
+	return g
+}
+
+func (g *groupMsgRead) mergePush(push *ws.Push) {
+	g.mu.Lock()
+	defer g.mu.Unlock()
+	if push == nil {
+		g.push = push
+	}
+	g.count++
+	for msgId, read := range push.ReadRecords {
+		g.push.ReadRecords[msgId] = read
+	}
+}
+
+// 是否是活跃的状态
+func (g *groupMsgRead) IsIdle() bool {
+	g.mu.Lock()
+	defer g.mu.Unlock()
+	return g.isIdle()
+}
+
+func (g *groupMsgRead) isIdle() bool {
+	pushTime := g.pushTime
+	val := GroupMsgReadRecordDelayTime*2 - time.Since(pushTime)
+	if val <= 0 && g.push == nil && g.count == 0 {
+		return true
+	}
+	return false
+}
+
+// 清理内容
+func (g *groupMsgRead) clear() {
+	select {
+	case <-g.done:
+	default:
+		close(g.done)
+	}
+	g.push = nil
+}
+
+func (g *groupMsgRead) transfer() {
+	// 超时发送
+	timer := time.NewTimer(GroupMsgReadRecordDelayTime / 2)
+	defer timer.Stop()
+	for {
+		select {
+		case <-g.done:
+			return
+		case <-timer.C:
+			g.mu.Lock()
+			pushTime := g.pushTime
+			val := GroupMsgReadRecordDelayTime - time.Since(pushTime)
+			push := g.push
+			if val > 0 && g.count < GroupMsgReadRecordDelayCount || push == nil {
+				// 时间和数量都没有达标
+				if val > 0 {
+					timer.Reset(val)
+				}
+				g.mu.Unlock()
+				continue
+			}
+			g.pushTime = time.Now()
+			g.push = nil
+			g.count = 0
+			timer.Reset(val)
+			g.mu.Unlock()
+			// 推送
+			logx.Infof("超过合并的条件推送 %v", push)
+			g.pushCh <- push
+		default:
+			g.mu.Lock()
+			if g.count >= GroupMsgReadRecordDelayCount {
+				push := g.push
+				// 达到了数量的话也要进行推送
+				g.pushTime = time.Now()
+				g.push = nil
+				g.count = 0
+				g.mu.Unlock()
+				// 推送
+				logx.Infof("超过合并的条件推送 %v", push)
+				g.pushCh <- push
+				continue
+			}
+			if g.isIdle() {
+				g.mu.Unlock()
+				g.pushCh <- &ws.Push{
+					ChatType:       constants.GroupChatType,
+					ConversationId: g.conversationId,
+				}
+				continue
+			}
+			g.mu.Unlock()
+			timeDelay := GroupMsgReadRecordDelayTime / 4
+			if timeDelay > time.Second {
+				timeDelay = time.Second
+			}
+			time.Sleep(timeDelay)
+		}
+	}
+	// 超量发送
+}

+ 79 - 0
internal/handler/msgTransfer/msgChatTransfer.go

@@ -0,0 +1,79 @@
+package msgTransfer
+
+import (
+	"context"
+	"encoding/json"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"paopaoimtask/internal/svc"
+	"paopaoimtask/mq"
+	"paopaoimtask/pkg/bitmap"
+	"paopaoimws/immodels"
+	"paopaoimws/ws"
+	"strconv"
+)
+
+/**
+消息消费
+*/
+
+type MsgChatTransfer struct {
+	*baseMsgTransfer
+}
+
+func NewMsgChatTransfer(svc *svc.ServiceContext) *MsgChatTransfer {
+	return &MsgChatTransfer{
+		NewBaseMsgTransfer(svc),
+	}
+}
+
+// 消息消费的实现方法
+func (m *MsgChatTransfer) Consume(ctx context.Context, key, value string) error {
+	var (
+		data  mq.MsgChatTransfer
+		msgId = primitive.NewObjectID()
+	)
+	if err := json.Unmarshal([]byte(value), &data); err != nil {
+		return err
+	}
+	if err := m.addChatLog(ctx, msgId, &data); err != nil {
+		return err
+	}
+
+	return m.Transfer(ctx, &ws.Push{
+		MsgID:          msgId.Hex(),
+		ConversationId: data.ConversationId,
+		ChatType:       data.ChatType,
+		SendID:         data.SendID,
+		RecvID:         data.RecvID,
+		RecvIDs:        data.RecvIDs,
+		SendTime:       data.SendTime,
+		MType:          data.MType,
+		Content:        data.Content,
+	})
+}
+
+// 记录消息到mongodb
+func (m *MsgChatTransfer) addChatLog(ctx context.Context, msgId primitive.ObjectID, data *mq.MsgChatTransfer) error {
+	//记录消息
+	chatlog := immodels.ChatLog{
+		ID:             msgId,
+		ConversationId: data.ConversationId,
+		SendId:         strconv.FormatInt(data.SendID, 10),
+		RecvId:         strconv.FormatInt(data.RecvID, 10),
+		MsgFrom:        0,
+		MsgType:        data.MType,
+		MsgContent:     data.Content,
+		SendTime:       data.SendTime,
+		ChatType:       data.ChatType,
+	}
+	//设置自己为已读状态
+	readRecords := bitmap.NewBitMap(0)
+	readRecords.Set(data.SendID)
+	chatlog.ReadRecords = readRecords.Export()
+
+	err := m.svcCtx.ChatLogModel.Insert(ctx, &chatlog)
+	if err != nil {
+		return err
+	}
+	return m.svcCtx.ConversationModel.UpdateMsg(ctx, &chatlog)
+}

+ 144 - 0
internal/handler/msgTransfer/msgReadTransfer.go

@@ -0,0 +1,144 @@
+package msgTransfer
+
+import (
+	"context"
+	"encoding/base64"
+	"encoding/json"
+	"github.com/zeromicro/go-zero/core/stores/cache"
+	"paopaoimtask/internal/svc"
+	"paopaoimtask/mq"
+	"paopaoimtask/pkg/bitmap"
+	"paopaoimtask/pkg/constants"
+	"paopaoimws/ws"
+	"sync"
+	"time"
+)
+
+var (
+	GroupMsgReadRecordDelayTime  = time.Second // 超时时间
+	GroupMsgReadRecordDelayCount = 10          //最大消息处理数量
+)
+
+const (
+	GroupMsgReadHandlerAtTransfer = iota //是否开启群消息批量处理
+	GroupMsgReadHandlerDelayTransfer
+)
+
+type MsgReadTransfer struct {
+	*baseMsgTransfer
+	cache.Cache
+	mu        sync.Mutex
+	groupMsgs map[string]*groupMsgRead
+	push      chan *ws.Push
+}
+
+func NewMsgReadTransfer(svc *svc.ServiceContext) *MsgReadTransfer {
+	m := &MsgReadTransfer{
+		baseMsgTransfer: NewBaseMsgTransfer(svc),
+		groupMsgs:       make(map[string]*groupMsgRead, 1),
+		push:            make(chan *ws.Push, 1),
+	}
+	if svc.Config.MsgReadHandler.GroupMsgReadHandler != GroupMsgReadHandlerAtTransfer {
+		// 开启了群消息批量处理
+		if svc.Config.MsgReadHandler.GroupMsgReadRecordDelayCount > 0 {
+			GroupMsgReadRecordDelayCount = int(svc.Config.MsgReadHandler.GroupMsgReadRecordDelayCount)
+		}
+		if svc.Config.MsgReadHandler.GroupMsgReadRecordDelayTime > 0 {
+			GroupMsgReadRecordDelayTime = time.Duration(svc.Config.MsgReadHandler.GroupMsgReadRecordDelayTime) * time.Second
+		}
+	}
+	go m.transfer()
+	return m
+}
+
+func (m *MsgReadTransfer) Consume(ctx context.Context, key, value string) error {
+	var (
+		data mq.MsgMarkRead
+	)
+	if err := json.Unmarshal([]byte(value), &data); err != nil {
+		return err
+	}
+
+	// 更新已读未读处理
+	read, err := m.UpdateChatLogRead(ctx, &data)
+	if err != nil {
+		return err
+	}
+	push := &ws.Push{
+		ConversationId: data.ConversationId,
+		ChatType:       data.ChatType,
+		SendID:         data.SendID,
+		RecvID:         data.RecvID,
+		ContentType:    constants.ContentMakeRead,
+		ReadRecords:    read,
+	}
+	switch data.ChatType {
+	case constants.SingleChatType:
+		// 私聊直接推送
+		m.push <- push
+	case constants.GroupChatType:
+		// 判断是否开启合并消息
+		if m.svcCtx.Config.MsgReadHandler.GroupMsgReadHandler == GroupMsgReadHandlerAtTransfer {
+			m.push <- push
+		}
+		m.mu.Lock()
+		defer m.mu.Unlock()
+		push.SendID = 0
+		if _, ok := m.groupMsgs[push.ConversationId]; ok {
+			m.Infof("merge push %v", push.ConversationId)
+			m.groupMsgs[push.ConversationId].mergePush(push)
+		} else {
+			m.groupMsgs[push.ConversationId] = newGroupMsgRead(push, m.push)
+		}
+	}
+
+	return nil
+}
+
+func (m *MsgReadTransfer) UpdateChatLogRead(ctx context.Context, data *mq.MsgMarkRead) (map[string]string, error) {
+	res := make(map[string]string)
+	chatLogs, err := m.svcCtx.ChatLogModel.ListByMsgIds(ctx, data.MsgIds)
+	if err != nil {
+		return nil, err
+	}
+	for _, chat := range chatLogs {
+		switch chat.ChatType {
+		case constants.SingleChatType:
+			chat.ReadRecords = []byte{1}
+		case constants.GroupChatType:
+			readRecords := bitmap.Load(chat.ReadRecords)
+			readRecords.Set(data.SendID)
+			chat.ReadRecords = readRecords.Export()
+		}
+
+		res[chat.ID.Hex()] = base64.StdEncoding.EncodeToString(chat.ReadRecords)
+		err := m.svcCtx.ChatLogModel.UpdateMakeRead(ctx, chat.ID, chat.ReadRecords)
+		if err != nil {
+			return nil, err
+		}
+
+	}
+	return res, nil
+}
+
+func (m *MsgReadTransfer) transfer() {
+	for push := range m.push {
+		if push.RecvID > 0 || len(push.RecvIDs) > 0 {
+			if err := m.Transfer(context.Background(), push); err != nil {
+				m.Errorf("m transfer err %v push %v", err, push)
+			}
+		}
+		if push.ChatType == constants.SingleChatType {
+			continue
+		}
+		if m.svcCtx.Config.MsgReadHandler.GroupMsgReadHandler == GroupMsgReadHandlerAtTransfer {
+			continue
+		}
+		// 接收者id为空
+		m.mu.Lock()
+		if _, ok := m.groupMsgs[push.ConversationId]; ok && m.groupMsgs[push.ConversationId].IsIdle() {
+			m.groupMsgs[push.ConversationId].clear()
+			delete(m.groupMsgs, push.ConversationId)
+		}
+	}
+}

+ 72 - 0
internal/handler/msgTransfer/msgTransfer.go

@@ -0,0 +1,72 @@
+package msgTransfer
+
+import (
+	"context"
+	"fmt"
+	"github.com/zeromicro/go-zero/core/logx"
+	"paopaoimrpc/apps/client/paopaosocial"
+	"paopaoimtask/internal/svc"
+	"paopaoimtask/pkg/constants"
+	"paopaoimws/websocket"
+	"paopaoimws/ws"
+)
+
+type baseMsgTransfer struct {
+	svcCtx *svc.ServiceContext
+	logx.Logger
+}
+
+func NewBaseMsgTransfer(svc *svc.ServiceContext) *baseMsgTransfer {
+	return &baseMsgTransfer{
+		svcCtx: svc,
+		Logger: logx.WithContext(context.Background()),
+	}
+}
+
+func (b *baseMsgTransfer) Transfer(ctx context.Context, data *ws.Push) error {
+	var err error
+	switch data.ChatType {
+	case constants.SingleChatType:
+		err = b.single(ctx, data)
+	case constants.GroupChatType:
+		err = b.group(ctx, data)
+	}
+	return err
+}
+
+func (b *baseMsgTransfer) single(ctx context.Context, data *ws.Push) error {
+	return b.svcCtx.WsClient.Send(websocket.Message{
+		FrameType: websocket.FrameData,
+		Method:    "push",
+		FormID:    constants.SYSTEM_ROOT_UID,
+		Data:      data,
+		Id:        data.MsgID,
+	})
+}
+
+func (b *baseMsgTransfer) group(ctx context.Context, data *ws.Push) error {
+	// 查询群用户
+	fmt.Println("接收者id: ", data.RecvID)
+	fmt.Println("发送者id: ", data.SendID)
+	members, err := b.svcCtx.PaoPaoSocial.GetGroupMembers(ctx, &paopaosocial.GetGroupMembersReq{
+		GroupId: data.RecvID,
+		UserId:  data.SendID,
+	})
+	if err != nil {
+		return err
+	}
+	data.RecvIDs = make([]int64, 0, members.Count)
+	for _, item := range members.List {
+		if item.UserId == data.SendID {
+			continue
+		}
+		data.RecvIDs = append(data.RecvIDs, item.UserId)
+	}
+	return b.svcCtx.WsClient.Send(websocket.Message{
+		FrameType: websocket.FrameData,
+		Method:    "push",
+		FormID:    constants.SYSTEM_ROOT_UID,
+		Data:      data,
+		Id:        data.MsgID,
+	})
+}

+ 100 - 0
internal/handler/tweetTransfer/tweet2EsTransfer.go

@@ -0,0 +1,100 @@
+package tweetTransfer
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"github.com/jinzhu/copier"
+	"github.com/zeromicro/go-zero/core/logx"
+	"paopaoimrpc/apps/client/paopaouser"
+	"paopaoimtask/internal/svc"
+	"paopaoimtask/mq"
+	"paopaoimtask/pkg/es"
+	"strings"
+	"sync"
+)
+
+/**
+从kafka中读取数据写入到es中和redis中
+*/
+
+type Tweet2EsTransfer struct {
+	svcCtx *svc.ServiceContext
+	logx.Logger
+}
+
+func NewTweet2EsTransfer(svc *svc.ServiceContext) *Tweet2EsTransfer {
+	return &Tweet2EsTransfer{
+		svcCtx: svc,
+		Logger: logx.WithContext(context.Background()),
+	}
+}
+
+func (b *Tweet2EsTransfer) Consume(ctx context.Context, key, value string) error {
+	var (
+		data      mq.MsgTweetData
+		cacheData es.TweetEsData
+	)
+	if err := json.Unmarshal([]byte(value), &data); err != nil {
+		return err
+	}
+
+	copier.Copy(&cacheData, data)
+	tags := make(map[string]int32)
+	if data.Tags != "" {
+		for _, s := range strings.Split(data.Tags, ",") {
+			tags[s] = 1
+		}
+	}
+	cacheData.Tags = tags
+	// 解析之后写入es 和 redis
+	wg := sync.WaitGroup{}
+	wg.Add(2)
+	var mu sync.Mutex
+	var errors []error
+	go func(esData *es.TweetEsData) {
+		defer wg.Done()
+		err := b.setEs(&cacheData)
+		if err != nil {
+			mu.Lock()
+			errors = append(errors, err)
+			mu.Unlock()
+		}
+	}(&cacheData)
+	go func(esData *es.TweetEsData) {
+		defer wg.Done()
+		err := b.SetUserPostNum(&cacheData)
+		if err != nil {
+			mu.Lock()
+			errors = append(errors, err)
+			mu.Unlock()
+		}
+	}(&cacheData)
+	wg.Wait()
+	if len(errors) > 0 {
+		return errors[0]
+	}
+	return nil
+}
+
+func (b *Tweet2EsTransfer) setEs(data *es.TweetEsData) error {
+	var tweetIndex es.TweetIndex
+	err := tweetIndex.Create(data, b.svcCtx.EsClient, b.svcCtx.Config.ElasticSearchConf.TweetIndex)
+	if err != nil {
+		fmt.Println(err.Error())
+	}
+	return err
+}
+
+// 给用户加入资讯数量
+func (b *Tweet2EsTransfer) SetUserPostNum(data *es.TweetEsData) error {
+	_, err := b.svcCtx.PaoPaoUser.SetUserPostNum(context.Background(), &paopaouser.SetUserPostNumReq{
+		PostId:  data.Id,
+		UserId:  data.UserId,
+		SetType: 1,
+	})
+	if err != nil {
+		fmt.Println(err.Error())
+	}
+	return err
+}

+ 46 - 0
internal/handler/userBillTransfer/userbillTransfer.go

@@ -0,0 +1,46 @@
+package userBillTransfer
+
+import (
+	"context"
+	"encoding/json"
+	"github.com/zeromicro/go-zero/core/logx"
+	"paopaoimrpc/apps/client/paopaouser"
+	"paopaoimtask/internal/svc"
+	"paopaoimtask/mq"
+)
+
+/**
+从kafka中读取数据写入到es中和redis中
+*/
+
+type UserBillTransfer struct {
+	svcCtx *svc.ServiceContext
+	logx.Logger
+}
+
+func NewUserBillTransfer(svc *svc.ServiceContext) *UserBillTransfer {
+	return &UserBillTransfer{
+		svcCtx: svc,
+		Logger: logx.WithContext(context.Background()),
+	}
+}
+
+func (b *UserBillTransfer) Consume(ctx context.Context, key, value string) error {
+	var (
+		data mq.UserBillContentData
+	)
+	if err := json.Unmarshal([]byte(value), &data); err != nil {
+		return err
+	}
+	_, err := b.svcCtx.PaoPaoUser.AttachmentProfit(context.Background(), &paopaouser.AttachmentProfitReq{
+		TweetId:      data.PostID,
+		UserId:       data.UserID,
+		ProfitUserId: data.ProfitUserID,
+		Price:        data.Price,
+	})
+	if err != nil {
+		// 如果消费失败了回滚用户的
+		return err
+	}
+	return nil
+}

+ 57 - 0
internal/svc/servicecontext.go

@@ -0,0 +1,57 @@
+package svc
+
+import (
+	"github.com/elastic/go-elasticsearch/v8"
+	"github.com/zeromicro/go-zero/core/stores/redis"
+	"github.com/zeromicro/go-zero/zrpc"
+	"net/http"
+	socialRpc "paopaoimrpc/apps/client/paopaosocial"
+	userRpc "paopaoimrpc/apps/client/paopaouser"
+	"paopaoimrpc/apps/pkg/constants"
+	"paopaoimtask/internal/config"
+	"paopaoimtask/pkg/es"
+	"paopaoimws/immodels"
+	"paopaoimws/websocket"
+)
+
+type ServiceContext struct {
+	Config   config.Config
+	WsClient websocket.Client
+	*redis.Redis
+	immodels.ChatLogModel
+	immodels.ConversationModel
+	EsClient *elasticsearch.Client
+	userRpc.PaoPaoUser
+	socialRpc.PaoPaoSocial
+}
+
+func NewServiceContext(c config.Config) *ServiceContext {
+	esClient, err := es.GetEsClient(c.ElasticSearchConf.Host, c.ElasticSearchConf.Port, c.ElasticSearchConf.Username, c.ElasticSearchConf.Password)
+	if err != nil {
+		panic(err)
+	}
+	userrpc, _ := zrpc.NewClient(c.UserRpc)
+	socialrpc, _ := zrpc.NewClient(c.SocialRpc)
+	svc := &ServiceContext{
+		Config:            c,
+		Redis:             redis.MustNewRedis(c.Redisx),
+		ChatLogModel:      immodels.MustChatLogModel(c.Mongo.Url, c.Mongo.Db),
+		ConversationModel: immodels.MustConversationModel(c.Mongo.Url, c.Mongo.Db),
+		PaoPaoUser:        userRpc.NewPaoPaoUser(userrpc),
+		PaoPaoSocial:      socialRpc.NewPaoPaoSocial(socialrpc),
+		EsClient:          esClient,
+	}
+	token, err := svc.GetSystemToken()
+	if err != nil {
+		panic(err)
+	}
+	header := http.Header{}
+	header.Set("Authorization", token)
+	svc.WsClient = websocket.NewClient(c.Ws.Host, websocket.WithClientHeader(header))
+	return svc
+}
+
+func (s *ServiceContext) GetSystemToken() (string, error) {
+	get, err := s.Redis.Get(constants.REDIS_SYSTEM_ROOT_TOKEN)
+	return get, err
+}

+ 65 - 0
mq/mq.go

@@ -0,0 +1,65 @@
+package mq
+
+import (
+	"paopaoimtask/pkg/constants"
+)
+
+type MsgChatTransfer struct {
+	MsgId              string `mapstructure:"msgId"`
+	ConversationId     string `json:"conversationId"`
+	constants.ChatType `json:"chatType"`
+	SendID             int64   `json:"sendId"`  //发送者
+	RecvID             int64   `json:"recvId"`  //接收者
+	RecvIDs            []int64 `json:"recvIds"` //多个接收者
+	SendTime           int64   `json:"sendTime"`
+
+	constants.MType `json:"mType"`
+	Content         string `json:"content"`
+}
+
+type MsgMarkRead struct {
+	ConversationId     string `json:"conversationId"`
+	constants.ChatType `json:"chatType"`
+	SendID             int64    `json:"sendId"` //发送者
+	RecvID             int64    `json:"recvId"` //接收者
+	MsgIds             []string `json:"msgIds"`
+}
+
+type MsgTweetData struct {
+	ID              int64                  `json:"id"`
+	UserID          int64                  `json:"user_id"`
+	CommentCount    int64                  `json:"comment_count"`
+	CollectionCount int64                  `json:"collection_count"`
+	ShareCount      int64                  `json:"share_count"`
+	UpvoteCount     int64                  `json:"upvote_count"`
+	Visibility      constants.PostVisibleT `json:"visibility"`
+	IsTop           int                    `json:"is_top"`
+	IsEssence       int                    `json:"is_essence"`
+	IsLock          int                    `json:"is_lock"`
+	LatestRepliedOn int64                  `json:"latest_replied_on"`
+	Tags            string                 `json:"tags"`
+	AttachmentPrice int64                  `json:"attachment_price"`
+	IP              string                 `json:"ip"`
+	IPLoc           string                 `json:"ip_loc"`
+	Contents        []MsgTweetContentData  `json:"contents"`
+	CreatedOn       int64                  `json:"created_on"`
+	ModifiedOn      int64                  `json:"modified_on"`
+	DeletedOn       int64                  `json:"deleted_on"`
+	IsDel           int64                  `json:"is_del"`
+}
+
+type MsgTweetContentData struct {
+	ID      int64                  `json:"id"`
+	PostID  int64                  `json:"post_id"`
+	UserID  int64                  `json:"user_id"`
+	Content string                 `json:"content"`
+	Type    constants.PostContentT `json:"type"`
+	Sort    int64                  `json:"sort"`
+}
+
+type UserBillContentData struct {
+	PostID       int64 `json:"post_id"`        // 咨询id
+	UserID       int64 `json:"user_id"`        // 用户id
+	ProfitUserID int64 `json:"profit_user_id"` // 受益者id
+	Price        int64 `json:"price"`          // 购买价格
+}

+ 48 - 0
mqClient/msgtransfer.go

@@ -0,0 +1,48 @@
+package mqClient
+
+import (
+	"context"
+	"encoding/json"
+	"github.com/zeromicro/go-queue/kq"
+	"paopaoimtask/mq"
+)
+
+type MsgChatTransferClient interface {
+	Push(msg *mq.MsgChatTransfer) error
+}
+
+type msgChatTransferClient struct {
+	pusher *kq.Pusher
+}
+
+func NewMsgChatTransferClient(addr []string, topic string, opts ...kq.Pusher) MsgChatTransferClient {
+	return &msgChatTransferClient{pusher: kq.NewPusher(addr, topic)}
+}
+
+func (m *msgChatTransferClient) Push(msg *mq.MsgChatTransfer) error {
+	data, err := json.Marshal(msg)
+	if err != nil {
+		return err
+	}
+	return m.pusher.Push(context.Background(), string(data))
+}
+
+type MsgReadTransferClient interface {
+	Push(msg *mq.MsgMarkRead) error
+}
+
+type msgReadTransferClient struct {
+	pusher *kq.Pusher
+}
+
+func NewMsgReadTransferClient(addr []string, topic string, opts ...kq.Pusher) MsgReadTransferClient {
+	return &msgReadTransferClient{pusher: kq.NewPusher(addr, topic)}
+}
+
+func (m *msgReadTransferClient) Push(msg *mq.MsgMarkRead) error {
+	data, err := json.Marshal(msg)
+	if err != nil {
+		return err
+	}
+	return m.pusher.Push(context.Background(), string(data))
+}

+ 28 - 0
mqClient/tweetTransfer.go

@@ -0,0 +1,28 @@
+package mqClient
+
+import (
+	"context"
+	"encoding/json"
+	"github.com/zeromicro/go-queue/kq"
+	"paopaoimtask/mq"
+)
+
+type TweetTransferClient interface {
+	Push(msg *mq.MsgTweetData) error
+}
+
+type tweetTransferClient struct {
+	pusher *kq.Pusher
+}
+
+func NewTweetTransferClient(addr []string, topic string, opts ...kq.Pusher) TweetTransferClient {
+	return &tweetTransferClient{pusher: kq.NewPusher(addr, topic)}
+}
+
+func (m *tweetTransferClient) Push(msg *mq.MsgTweetData) error {
+	data, err := json.Marshal(msg)
+	if err != nil {
+		return err
+	}
+	return m.pusher.Push(context.Background(), string(data))
+}

+ 30 - 0
mqClient/userbilltransfer.go

@@ -0,0 +1,30 @@
+package mqClient
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"github.com/zeromicro/go-queue/kq"
+	"paopaoimtask/mq"
+)
+
+type UserBillTransferClient interface {
+	Push(msg *mq.UserBillContentData) error
+}
+
+type userBillTransferClient struct {
+	pusher *kq.Pusher
+}
+
+func NewUserBillTransferClient(addr []string, topic string, opts ...kq.Pusher) UserBillTransferClient {
+	fmt.Println("topic: ", topic)
+	return &userBillTransferClient{pusher: kq.NewPusher(addr, topic)}
+}
+
+func (m *userBillTransferClient) Push(msg *mq.UserBillContentData) error {
+	data, err := json.Marshal(msg)
+	if err != nil {
+		return err
+	}
+	return m.pusher.Push(context.Background(), string(data))
+}

+ 66 - 0
pkg/bitmap/bitmap.go

@@ -0,0 +1,66 @@
+package bitmap
+
+type BitMap struct {
+	bits []byte
+	size int
+}
+
+func NewBitMap(size int) *BitMap {
+	if size == 0 {
+		size = 250
+	}
+
+	return &BitMap{
+		bits: make([]byte, size), // 字节的大小,每个字节中存8个bit
+		size: size * 8,           // 整个bitmap的大小
+	}
+}
+
+func (b *BitMap) Set(id int64) {
+	// 计算id在哪个bit
+	idx := hash(id) % b.size
+	// 计算在哪个byte
+	byteIdx := idx / 8
+	// 计算byte中的bit位置
+	bitIdx := idx % 8
+
+	b.bits[byteIdx] |= 1 << bitIdx
+}
+
+func (b *BitMap) IsSet(id int64) bool {
+	// 计算id在哪个bit
+	idx := hash(id) % b.size
+	// 计算在哪个byte
+	byteIdx := idx / 8
+	// 计算byte中的bit位置
+	bitIdx := idx % 8
+
+	return (b.bits[byteIdx] & (1 << bitIdx)) != 0
+}
+
+func (b *BitMap) Export() []byte {
+	return b.bits
+}
+
+func Load(bits []byte) *BitMap {
+	if len(bits) == 0 {
+		return NewBitMap(0)
+	}
+	return &BitMap{
+		bits: bits,
+		size: len(bits) * 8,
+	}
+}
+
+func hash(id int64) int {
+	// 使用BKDR哈希算法
+	seed := 131313 // 31 131 1313 13131 131313, etc
+	hash := 0
+
+	// 将整数id拆分为每个字节,并使用BKDR哈希算法计算哈希值
+	for i := 0; i < 8; i++ {
+		hash = hash*seed + int((id>>(i*8))&0xFF)
+	}
+
+	return hash & 0x7FFFFFFF
+}

+ 20 - 0
pkg/constants/es.go

@@ -0,0 +1,20 @@
+package constants
+
+type SearchType int32
+
+// 1、按照用户id查询2、按照用户推文权限搜索3、搜索全文
+const (
+	SearchByUserID SearchType = iota + 1
+	SearchByVisit
+	SearchByContent
+)
+
+type EsSortType int32
+
+// 1-按照用户热度排序2-按照用户创建时间排序3-按照用户最后回复时间排序4-用户置顶排序
+const (
+	SortByUserScore EsSortType = iota + 1
+	SortByUserCreateTime
+	SortByUserLastReply
+	SortByUserTop
+)

+ 75 - 0
pkg/constants/task.go

@@ -0,0 +1,75 @@
+package constants
+
+type MType int
+
+const (
+	TextMType MType = iota
+)
+
+type ChatType int
+
+const (
+	GroupChatType ChatType = iota + 1
+	SingleChatType
+)
+
+type ContentType int
+
+const (
+	ContentChatMsg ContentType = iota
+	ContentMakeRead
+)
+
+const (
+	SYSTEM_ROOT_UID = -1000000
+)
+
+type PostVisibleT uint8
+
+const (
+	PostVisitPublic    PostVisibleT = 90
+	PostVisitPrivate   PostVisibleT = 0
+	PostVisitFriend    PostVisibleT = 50
+	PostVisitFollowing PostVisibleT = 60
+)
+
+type TweetVisibleType int8
+
+const (
+	// 推文可见性
+	TweetVisitPublic TweetVisibleType = iota
+	TweetVisitPrivate
+	TweetVisitFriend
+	TweetVisitFollowing
+	TweetVisitInvalid
+)
+
+// ToVisibleValue 转换权限
+func ToVisibleValue(visitType TweetVisibleType) PostVisibleT {
+	switch visitType {
+	case TweetVisitPublic:
+		return PostVisitPublic
+	case TweetVisitPrivate:
+		return PostVisitPrivate
+	case TweetVisitFriend:
+		return PostVisitFriend
+	case TweetVisitFollowing:
+		return PostVisitFollowing
+	default:
+		return PostVisitPrivate
+	}
+}
+
+type PostContentT int
+
+const (
+	// 类型,1标题,2文字段落,3图片地址,4视频地址,5语音地址,6链接地址,7附件资源
+	ContentTypeTitle PostContentT = iota + 1
+	ContentTypeText
+	ContentTypeImage
+	ContentTypeVideo
+	ContentTypeAudio
+	ContentTypeLink
+	ContentTypeAttachment
+	ContentTypeChargeAttachment
+)

+ 31 - 0
pkg/es/esclient.go

@@ -0,0 +1,31 @@
+package es
+
+import (
+	"crypto/tls"
+	"fmt"
+	"net"
+	"net/http"
+	"time"
+
+	"github.com/elastic/go-elasticsearch/v8"
+)
+
+func GetEsClient(esHost string, esPort int64, esUsername, esPassword string) (*elasticsearch.Client, error) {
+	cfg := elasticsearch.Config{
+		Addresses: []string{
+			fmt.Sprintf("http://%s:%d", esHost, esPort),
+		},
+		Username: esUsername,
+		Password: esPassword,
+		Transport: &http.Transport{
+			MaxIdleConnsPerHost:   10,
+			ResponseHeaderTimeout: time.Second,
+			DialContext:           (&net.Dialer{Timeout: time.Second}).DialContext,
+			TLSClientConfig: &tls.Config{
+				MinVersion: tls.VersionTLS12,
+			},
+		},
+	}
+	return elasticsearch.NewClient(cfg)
+
+}

+ 358 - 0
pkg/es/tweet_es.go

@@ -0,0 +1,358 @@
+package es
+
+import (
+	"bytes"
+	"context"
+	"encoding/json"
+	"fmt"
+	"github.com/elastic/go-elasticsearch/v8"
+	"net/http"
+	"paopaoimtask/pkg/constants"
+	"strconv"
+	"strings"
+)
+
+type TweetIndex struct {
+}
+
+const __tweetMapping = `{
+	"mappings": {
+	  "properties": {
+		"id": {
+		  "type": "integer"
+		},
+		"user_id": {
+		  "type": "integer"
+		},
+		"contents": {
+		  "type": "nested",
+		  "properties": {
+			"id": {
+			  "type": "integer"
+			},
+			"post_id": {
+			  "type": "integer"
+			},
+			"content": {
+			  "type": "text",
+			  "fields": {
+				"keyword": {
+				  "type": "keyword",
+				  "ignore_above": 256
+				}
+			  }
+			},
+			"type": {
+			  "type": "integer"
+			},
+			"sort": {
+			  "type": "integer"
+			}
+		  }
+		},
+		"comment_count": {
+		  "type": "integer"
+		},
+		"collection_count": {
+		  "type": "integer"
+		},
+		"share_count": {
+		  "type": "integer"
+		},
+		"upvote_count": {
+		  "type": "integer"
+		},
+		"visibility": {
+		  "type": "integer"
+		},
+		"is_top": {
+		  "type": "integer"
+		},
+		"is_essence": {
+		  "type": "integer"
+		},
+		"is_lock": {
+		  "type": "integer"
+		},
+		"latest_replied_on": {
+		  "type": "integer"
+		},
+		"created_on": {
+		  "type": "integer"
+		},
+		"modified_on": {
+		  "type": "integer"
+		},
+		"tags": {
+		  "type": "object",
+		  "dynamic": true,
+		  "properties": {
+			"*": {
+			  "type": "integer"
+			}
+		  }
+		},
+		"attachment_price": {
+		  "type": "integer"
+		},
+		"ip_loc": {
+		  "type": "text"
+		}
+	  }
+	}
+}`
+
+type TweetEsData struct {
+	Id       int64 `json:"id"`
+	UserId   int64 `json:"user_id"`
+	Contents []struct {
+		ID      int64  `json:"id"`
+		PostID  int64  `json:"post_id"`
+		Content string `json:"content"`
+		Type    int32  `json:"type"`
+		Sort    int32  `json:"sort"`
+	} `json:"contents"`
+	CommentCount    int64            `json:"comment_count"`
+	CollectionCount int64            `json:"collection_count"`
+	ShareCount      int64            `json:"share_count"`
+	UpvoteCount     int64            `json:"upvote_count"`
+	Visibility      int32            `json:"visibility"`
+	IsTop           int32            `json:"is_top"`
+	IsEssence       int32            `json:"is_essence"`
+	IsLock          int32            `json:"is_lock"`
+	LatestRepliedOn int64            `json:"latest_replied_on"`
+	CreatedOn       int64            `json:"created_on"`
+	ModifiedOn      int64            `json:"modified_on"`
+	Tags            map[string]int32 `json:"tags"`
+	AttachmentPrice int64            `json:"attachment_price"`
+	IpLoc           string           `json:"ip_loc"`
+}
+
+//func (t *TweetEsData) MarshalJSON() ([]byte, error) {
+//	return json.Marshal(t)
+//}
+
+func (t *TweetIndex) Create(tsData *TweetEsData, esClient *elasticsearch.Client, esTweetIndex string) error {
+	// 判断是否有索引
+	indexBody, err := esClient.Indices.Exists([]string{esTweetIndex})
+	if err != nil || indexBody.StatusCode == http.StatusNotFound {
+		fmt.Println("aaaaaaaaaa:内庭院")
+		cBody, cErr := esClient.Indices.Create(esTweetIndex, esClient.Indices.Create.WithBody(strings.NewReader(__tweetMapping)))
+		if cErr != nil {
+			return cErr
+		}
+		defer cBody.Body.Close()
+	}
+	defer indexBody.Body.Close()
+	docId := tsData.Id
+	doc, _ := json.Marshal(tsData)
+	reader := bytes.NewReader(doc)
+	indexRes, iErr := esClient.Index(esTweetIndex, reader, esClient.Index.WithDocumentID(strconv.FormatInt(docId, 10)))
+	if iErr != nil {
+		return iErr
+	}
+	defer indexRes.Body.Close()
+	return nil
+}
+
+// Get tweet es 查询,1、按照用户id查询2、按照用户推文权限搜索3、搜索全文, 排序:1-按照用户热度排序2-获取最新发布的文章3-按照用户最后回复时间排序4-用户置顶排序
+func (t *TweetIndex) Get(esClient *elasticsearch.Client, esTweetIndex string, searchType []constants.SearchType, sortType []constants.EsSortType, userId int64, userVisit int32, tweetContent string, limit int64, offset int64) (tweetInfos []*TweetEsData, total int64, err error) {
+	var filters = make([]map[string]interface{}, 0)
+	tweetInfos = make([]*TweetEsData, 0)
+	total = 0
+	for _, st := range searchType {
+		switch st {
+		case constants.SearchByUserID:
+			filters = append(filters, map[string]interface{}{
+				"term": map[string]interface{}{
+					"user_id": userId,
+				},
+			})
+			break
+		case constants.SearchByVisit:
+			filters = append(filters, map[string]interface{}{
+				"term": map[string]interface{}{
+					"visibility": userVisit,
+				},
+			})
+			break
+		case constants.SearchByContent:
+			filters = append(filters, map[string]interface{}{
+				"nested": map[string]interface{}{
+					"path": "contents",
+					"query": map[string]interface{}{
+						"query_string": map[string]interface{}{
+							"query":  "*" + tweetContent + "*",
+							"fields": []string{"contents.content", "contents.content.keyword"},
+						},
+					},
+				},
+			})
+			break
+		default:
+			break
+		}
+	}
+
+	var sorts = make([]map[string]interface{}, 0)
+	for _, esSortType := range sortType {
+		switch esSortType {
+		case constants.SortByUserScore:
+			fallthrough
+		case constants.SortByUserCreateTime:
+			sorts = append(sorts, map[string]interface{}{
+				"created_on": map[string]interface{}{
+					"order": "desc",
+				},
+			})
+			break
+		case constants.SortByUserLastReply:
+			sorts = append(sorts, map[string]interface{}{
+				"latest_replied_on": map[string]interface{}{
+					"order": "desc",
+				},
+			})
+			break
+		case constants.SortByUserTop:
+			sorts = append(sorts, map[string]interface{}{
+				"is_top": map[string]interface{}{
+					"order": "desc",
+				},
+			})
+			break
+		default:
+			break
+		}
+	}
+	var query map[string]interface{}
+	var from, size int64 = offset, limit
+	if limit <= 0 {
+		size = 10
+	}
+	if offset <= 0 {
+		from = 0
+	}
+	if len(filters) == 0 {
+		query = map[string]interface{}{
+			"query": map[string]interface{}{
+				"match_all": map[string]interface{}{},
+			},
+			"sort": sorts,
+			"from": 0,
+			"size": 10,
+		}
+	} else {
+		query = map[string]interface{}{
+			"query": map[string]interface{}{
+				"bool": map[string]interface{}{
+					"filter": filters,
+				},
+			},
+			"sort": sorts,
+			"from": from,
+			"size": size,
+		}
+	}
+	fmt.Println(query)
+	marshal, _ := json.Marshal(query)
+	result, err := esClient.Search(
+		esClient.Search.WithContext(context.Background()),
+		esClient.Search.WithIndex(esTweetIndex),
+		esClient.Search.WithBody(bytes.NewReader(marshal)),
+	)
+	if err != nil {
+		fmt.Println("errr: ", err.Error())
+		return
+	}
+	defer result.Body.Close()
+	if result.IsError() {
+		fmt.Println("搜索报错了: ", result.String())
+		return
+	}
+	var rows map[string]interface{}
+	if err = json.NewDecoder(result.Body).Decode(&rows); err != nil {
+		return
+	}
+	if count, ok := rows["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"]; ok {
+		total = int64(count.(float64))
+	}
+	hits := rows["hits"].(map[string]interface{})["hits"].([]interface{})
+	for _, item := range hits {
+		if row, ok := item.(map[string]interface{})["_source"]; ok {
+			sourceByte, _ := json.Marshal(row)
+			var doc TweetEsData
+			err = json.Unmarshal(sourceByte, &doc)
+			if err != nil {
+				return
+			}
+			tweetInfos = append(tweetInfos, &doc)
+		}
+	}
+	return
+}
+
+func (t *TweetIndex) Delete(id int64, esClient *elasticsearch.Client, esTweetIndex string) error {
+	exists, err := esClient.Indices.Exists([]string{esTweetIndex})
+	if err != nil || exists.StatusCode == http.StatusNotFound {
+		// 没有找到对应的索引,返回错误
+		return err
+	}
+	defer exists.Body.Close()
+	response, err := esClient.Delete(esTweetIndex, strconv.FormatInt(id, 10))
+	if err != nil || response.StatusCode == http.StatusNotFound {
+		return err
+	}
+	defer response.Body.Close()
+	return nil
+}
+
+func (t *TweetIndex) Update(id int64, esClient *elasticsearch.Client, esTweetIndex string, values map[string]interface{}) error {
+	exists, err := esClient.Indices.Exists([]string{esTweetIndex})
+	if err != nil || exists.StatusCode == http.StatusNotFound {
+		// 没有找到对应的索引,返回错误
+		return err
+	}
+	defer exists.Body.Close()
+	marshal, _ := json.Marshal(map[string]interface{}{"doc": values})
+	update, err := esClient.Update(esTweetIndex, strconv.FormatInt(id, 10), bytes.NewReader(marshal))
+	if err != nil || update.StatusCode == http.StatusNotFound {
+		return err
+	}
+	defer update.Body.Close()
+	return nil
+}
+
+func (t *TweetIndex) GetRow(id int64, esClient *elasticsearch.Client, esTweetIndex string) (tweetInfos *TweetEsData, err error) {
+	exists, err := esClient.Indices.Exists([]string{esTweetIndex})
+	if err != nil || exists.StatusCode == http.StatusNotFound {
+		// 没有找到对应的索引,返回错误
+		return nil, err
+	}
+	defer exists.Body.Close()
+	res, err := esClient.Get(esTweetIndex, strconv.FormatInt(id, 10))
+	if err != nil {
+		return nil, err
+	}
+	if res.IsError() {
+		return nil, err
+	}
+	fmt.Println(res)
+	var row map[string]interface{}
+	if err = json.NewDecoder(res.Body).Decode(&row); err != nil {
+		return nil, err
+	}
+	if found, ok := row["found"]; ok && !found.(bool) {
+		return nil, nil
+	}
+	if source, ok := row["_source"]; ok {
+		// 有数据
+		marshal, _ := json.Marshal(source)
+		var doc TweetEsData
+		err = json.Unmarshal(marshal, &doc)
+		if err != nil {
+			return nil, err
+		}
+		return &doc, nil
+	}
+	return nil, nil
+}

+ 67 - 0
pkg/log/zaplog.go

@@ -0,0 +1,67 @@
+package log
+
+import (
+	"fmt"
+	"github.com/zeromicro/go-zero/core/logx"
+	"go.uber.org/zap"
+)
+
+const callerSkipOffset = 3
+
+type ZapWriter struct {
+	logger *zap.Logger
+}
+
+func NewZapWriter(opts ...zap.Option) logx.Writer {
+	opts = append(opts, zap.AddCallerSkip(callerSkipOffset))
+	logger, _ := zap.NewProduction(opts...)
+	defer logger.Sync()
+
+	return &ZapWriter{
+		logger: logger,
+	}
+}
+
+func (w *ZapWriter) Alert(v interface{}) {
+	w.logger.Error(fmt.Sprint(v))
+}
+
+func (w *ZapWriter) Close() error {
+	return w.logger.Sync()
+}
+
+func (w *ZapWriter) Debug(v interface{}, fields ...logx.LogField) {
+	w.logger.Debug(fmt.Sprint(v), toZapFields(fields...)...)
+}
+
+func (w *ZapWriter) Error(v interface{}, fields ...logx.LogField) {
+	w.logger.Error(fmt.Sprint(v), toZapFields(fields...)...)
+}
+
+func (w *ZapWriter) Info(v interface{}, fields ...logx.LogField) {
+	w.logger.Info(fmt.Sprint(v), toZapFields(fields...)...)
+}
+
+func (w *ZapWriter) Severe(v interface{}) {
+	w.logger.Fatal(fmt.Sprint(v))
+}
+
+func (w *ZapWriter) Slow(v interface{}, fields ...logx.LogField) {
+	w.logger.Warn(fmt.Sprint(v), toZapFields(fields...)...)
+}
+
+func (w *ZapWriter) Stack(v interface{}) {
+	w.logger.Error(fmt.Sprint(v), zap.Stack("stack"))
+}
+
+func (w *ZapWriter) Stat(v interface{}, fields ...logx.LogField) {
+	w.logger.Info(fmt.Sprint(v), toZapFields(fields...)...)
+}
+
+func toZapFields(fields ...logx.LogField) []zap.Field {
+	zapFields := make([]zap.Field, 0, len(fields))
+	for _, f := range fields {
+		zapFields = append(zapFields, zap.Any(f.Key, f.Value))
+	}
+	return zapFields
+}

+ 41 - 0
task.go

@@ -0,0 +1,41 @@
+package main
+
+import (
+	"flag"
+	"fmt"
+	"github.com/zeromicro/go-zero/core/conf"
+	"github.com/zeromicro/go-zero/core/logx"
+	"github.com/zeromicro/go-zero/core/service"
+	"paopaoimtask/internal/config"
+	"paopaoimtask/internal/handler"
+	"paopaoimtask/internal/svc"
+	"paopaoimtask/pkg/log"
+)
+
+var configFile = flag.String("f", "etc/task-mq.yaml", "the config file")
+
+func main() {
+	flag.Parse()
+
+	var c config.Config
+	conf.MustLoad(*configFile, &c)
+
+	if err := c.SetUp(); err != nil {
+		panic(err)
+	}
+
+	ctx := svc.NewServiceContext(c)
+
+	writer := log.NewZapWriter()
+	logx.SetWriter(writer)
+
+	listen := handler.NewListen(ctx)
+
+	serviceGroup := service.NewServiceGroup()
+	for _, s := range listen.Services() {
+		serviceGroup.Add(s)
+	}
+
+	fmt.Printf("starting task server at %v....\n", c.ListenOn)
+	serviceGroup.Start()
+}