Ver Fonte

Merge branch 'master' of https://git.banshen.xyz/huangguangrong/slow_wild_backend

huangguangrong há 7 meses atrás
pai
commit
c2af7dcfce

+ 31 - 0
etc/slowwild.yaml

@@ -17,3 +17,34 @@ RedisConf:
   Host: 121.11.99.220:16379
   Password: ""
   DB: 0
+
+# 消息推送到客户端的topic
+MsgPushToClientTransfer:
+  Topic: msgPushToClientTransfer
+  Addrs:
+    - 121.11.99.220:19094
+# 用户消息消费配置,这里是采用go-zero自带的kafka配置,可以参考https://go-zero.dev/docs/tutorials/message-queue/kafka
+MsgChatTransfer:
+  # service的名称
+  Name: MsgChatTransfer
+  # kafka 的多个 Broker 节点
+  Brokers:
+    - 121.11.99.220:19094
+  # 消费者组
+  Group: kafka
+  # 订阅的 Topic 主题
+  Topic: msgChatTransfer
+  # 从头开始消费
+  Offset: first
+  # go-queue 内部是起多个 goroutine 从 kafka 中获取信息写入进程内的 channel,这个参数是控制此处的 goroutine 数量(⚠️ 并不是真正消费时的并发 goroutine 数量)
+  Consumers: 1
+#mongo配置
+Mongo:
+  Url: "mongodb://root:123456@121.11.99.220:47017"
+  Db: easy-chat
+  Collection: easy-chat-collection
+# 消息已读处理的配置
+MsgReadHandler:
+  GroupMsgReadHandler: 1
+  GroupMsgReadRecordDelayTime: 60
+  GroupMsgReadRecordDelayCount: 2

+ 19 - 1
go.mod

@@ -1,6 +1,8 @@
 module slowwild
 
-go 1.22.0
+go 1.23.6
+
+toolchain go1.23.7
 
 require (
 	git.banshen.xyz/huangguangrong/slow_wild_protobuff v0.1.8
@@ -8,6 +10,22 @@ require (
 	google.golang.org/grpc v1.70.0
 )
 
+require (
+	git.banshen.xyz/huangguangrong/slow_wild_queue v0.0.3 // indirect
+	github.com/golang/snappy v0.0.4 // indirect
+	github.com/montanaflynn/stats v0.7.1 // indirect
+	github.com/pierrec/lz4/v4 v4.1.21 // indirect
+	github.com/segmentio/kafka-go v0.4.47 // indirect
+	github.com/xdg-go/pbkdf2 v1.0.0 // indirect
+	github.com/xdg-go/scram v1.1.2 // indirect
+	github.com/xdg-go/stringprep v1.0.4 // indirect
+	github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
+	github.com/zeromicro/go-queue v1.2.2 // indirect
+	go.mongodb.org/mongo-driver v1.17.3 // indirect
+	golang.org/x/crypto v0.32.0 // indirect
+	golang.org/x/sync v0.11.0 // indirect
+)
+
 require (
 	filippo.io/edwards25519 v1.1.0 // indirect
 	github.com/beorn7/perks v1.0.1 // indirect

+ 57 - 0
go.sum

@@ -30,6 +30,8 @@ git.banshen.xyz/huangguangrong/slow_wild_protobuff v0.1.6 h1:AZtJwYwmLnq/pDYtPd4
 git.banshen.xyz/huangguangrong/slow_wild_protobuff v0.1.6/go.mod h1:v8AzHCelFBbIkoY+gR4WIEuc6mG5okJ12IXbjvGJWHk=
 git.banshen.xyz/huangguangrong/slow_wild_protobuff v0.1.8 h1:BW3DLwhDNATa45Jyzp18LCbXPNGEcz9k3v9mct81ayE=
 git.banshen.xyz/huangguangrong/slow_wild_protobuff v0.1.8/go.mod h1:RvtFTWaCnJcB8iy/clOqBoimd7UxSx5+KB96G+bdq5c=
+git.banshen.xyz/huangguangrong/slow_wild_queue v0.0.3 h1:6rhWRpRD1lXn87QSAtdZYz9pRJxECzwG8F9uuy65bDQ=
+git.banshen.xyz/huangguangrong/slow_wild_queue v0.0.3/go.mod h1:5rNoIiZvaamaB9xSOXefuEflTeUbpMfQgrZk8dc0x70=
 github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 h1:uvdUDbHQHO85qeSydJtItA4T55Pw6BtAejd0APRJOCE=
 github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
 github.com/alicebob/miniredis/v2 v2.34.0 h1:mBFWMaJSNL9RwdGRyEDoAAv8OQc5UlEhLDQggTglU/0=
@@ -89,6 +91,8 @@ github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
 github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
 github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
 github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
+github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
+github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
 github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I=
 github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U=
 github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
@@ -117,6 +121,7 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr
 github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
 github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
 github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
+github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
 github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
 github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
 github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
@@ -141,6 +146,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w
 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
 github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
 github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
+github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE=
+github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow=
 github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
 github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
 github.com/onsi/ginkgo/v2 v2.13.0 h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4=
@@ -151,6 +158,9 @@ github.com/openzipkin/zipkin-go v0.4.3 h1:9EGwpqkgnwdEIJ+Od7QVSEIH+ocmm5nPat0G7s
 github.com/openzipkin/zipkin-go v0.4.3/go.mod h1:M9wCJZFWCo2RiY+o1eBCEMe0Dp2S5LDHcMZmk3RmK7c=
 github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
 github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
+github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
+github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
+github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
 github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
 github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -169,6 +179,8 @@ github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa
 github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
 github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
 github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
+github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0=
+github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg=
 github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
 github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
 github.com/spf13/cast v1.7.1 h1:cuNEagBQEHWN1FnbGEjCXL2szYEXqfJPbP2HNUaca9Y=
@@ -188,11 +200,22 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl
 github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
 github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
 github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
+github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
+github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
+github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
+github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
+github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
+github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
+github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM=
+github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI=
 github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
 github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
 github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
+github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
 github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
 github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
+github.com/zeromicro/go-queue v1.2.2 h1:3TMhRlI/8lZy13Sj6FBBWWRXlsQhGCchRxY2itfV1Is=
+github.com/zeromicro/go-queue v1.2.2/go.mod h1:5HiNTEw1tACi9itho0JYQ1+EpIGpSFM4tOQ4bit+yKM=
 github.com/zeromicro/go-zero v1.8.0 h1:4g/8VW+fOyM51HZYPeI3mXIZdEX+Fl6SsdYX2H5PYw4=
 github.com/zeromicro/go-zero v1.8.0/go.mod h1:xDBF+/iDzj30zPvu6HNUIbpz1J6+/g3Sx9D/DytJfss=
 go.etcd.io/etcd/api/v3 v3.5.15 h1:3KpLJir1ZEBrYuV2v+Twaa/e2MdDCEZ/70H+lzEiwsk=
@@ -201,6 +224,8 @@ go.etcd.io/etcd/client/pkg/v3 v3.5.15 h1:fo0HpWz/KlHGMCC+YejpiCmyWDEuIpnTDzpJLB5
 go.etcd.io/etcd/client/pkg/v3 v3.5.15/go.mod h1:mXDI4NAOwEiszrHCb0aqfAYNCrZP4e9hRca3d1YK8EU=
 go.etcd.io/etcd/client/v3 v3.5.15 h1:23M0eY4Fd/inNv1ZfU3AxrbbOdW79r9V9Rl62Nm6ip4=
 go.etcd.io/etcd/client/v3 v3.5.15/go.mod h1:CLSJxrYjvLtHsrPKsy7LmZEE+DK2ktfd2bN4RhBMwlU=
+go.mongodb.org/mongo-driver v1.17.3 h1:TQyXhnsWfWtgAhMtOgtYHMTkZIfBTpMTsMnd9ZBeHxQ=
+go.mongodb.org/mongo-driver v1.17.3/go.mod h1:Hy04i7O2kC4RS06ZrhPRqj/u4DTYkFDAAccj+rVKqgQ=
 go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U=
 go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg=
 go.opentelemetry.io/otel/exporters/jaeger v1.17.0 h1:D7UpUy2Xc2wsi1Ras6V40q806WM07rqoCWzXu7Sqy+4=
@@ -238,14 +263,25 @@ go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
 golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
+golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
+golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
+golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
+golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
 golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
 golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
 golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
+golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
 golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
 golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
+golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
 golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
+golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
+golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
+golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
+golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
 golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
 golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
 golang.org/x/oauth2 v0.24.0 h1:KTBBxWqUa0ykRPLtV69rRto9TLXcqYkeswu48x/gvNE=
@@ -254,21 +290,40 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
 golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w=
+golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
 golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
+golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
+golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
+golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
+golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
 golang.org/x/term v0.28.0 h1:/Ts8HFuMR2E6IP/jlo7QVLZHggjKQbhu/7H0LJFr3Gg=
 golang.org/x/term v0.28.0/go.mod h1:Sw/lC2IAUZ92udQNf3WodGtn4k/XoLyZoh8v/8uiwek=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
+golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
+golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
+golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
+golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
 golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
 golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
 golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM=
@@ -280,6 +335,8 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn
 golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
 golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
 golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
+golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
+golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
 golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg=
 golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

+ 27 - 1
internal/config/config.go

@@ -1,6 +1,9 @@
 package config
 
-import "github.com/zeromicro/go-zero/zrpc"
+import (
+	"github.com/zeromicro/go-queue/kq"
+	"github.com/zeromicro/go-zero/zrpc"
+)
 
 type Config struct {
 	zrpc.RpcServerConf
@@ -12,4 +15,27 @@ type Config struct {
 		Password string
 		DB       int
 	}
+
+	// 消息推送到客户端的kafka配置
+	MsgPushToClientTransfer struct {
+		Topic string
+		Addrs []string
+	}
+
+	// mongodb配置
+	Mongo struct {
+		Url        string
+		Db         string
+		Collection string
+	}
+
+	// 消息已读配置信息
+	MsgReadHandler struct {
+		GroupMsgReadHandler          int32 // 群消息已读处理
+		GroupMsgReadRecordDelayTime  int64 // 群消息已读延迟时间
+		GroupMsgReadRecordDelayCount int64 // 群消息已读延迟次数
+	}
+
+	// 消息队列消费
+	MsgChatTransfer kq.KqConf
 }

+ 2 - 0
internal/constants/common.go

@@ -1,3 +1,5 @@
 package constants
 
 const UserIDKey = "user_id"
+
+const SystemUserId = 1024

+ 24 - 0
internal/handler/listen.go

@@ -0,0 +1,24 @@
+package handler
+
+import (
+	"slowwild/internal/handler/msgTransfer"
+	"slowwild/internal/svc"
+
+	"github.com/zeromicro/go-queue/kq"
+	"github.com/zeromicro/go-zero/core/service"
+)
+
+type Listen struct {
+	svc *svc.ServiceContext
+}
+
+func NewListen(svc *svc.ServiceContext) *Listen {
+	return &Listen{
+		svc,
+	}
+}
+
+func (l *Listen) Services() []service.Service {
+	return []service.Service{
+		kq.MustNewQueue(l.svc.Config.MsgChatTransfer, msgTransfer.NewMsgChatTransfer(l.svc))}
+}

+ 132 - 0
internal/handler/msgTransfer/groupMsgRead.go

@@ -0,0 +1,132 @@
+package msgTransfer
+
+import (
+	"sync"
+	"time"
+
+	msgChat "git.banshen.xyz/huangguangrong/slow_wild_queue/types"
+	"github.com/zeromicro/go-zero/core/logx"
+)
+
+type groupMsgRead struct {
+	mu             sync.Mutex         // 锁
+	conversationId string             // 会话id
+	push           *msgChat.Push      //推送方法
+	pushCh         chan *msgChat.Push // 通道推送
+	count          int                // 数量
+	pushTime       time.Time          // 上次推送时间
+	done           chan struct{}
+}
+
+func newGroupMsgRead(push *msgChat.Push, pushCh chan *msgChat.Push) *groupMsgRead {
+	g := &groupMsgRead{
+		push:           push,
+		conversationId: push.ConversationId,
+		pushCh:         pushCh,
+		count:          1,
+		pushTime:       time.Time{},
+		done:           make(chan struct{}),
+	}
+	go g.transfer()
+	return g
+}
+
+func (g *groupMsgRead) mergePush(push *msgChat.Push) {
+	g.mu.Lock()
+	defer g.mu.Unlock()
+	if push == nil {
+		g.push = push
+	}
+	g.count++
+	for msgId, read := range push.ReadRecords {
+		g.push.ReadRecords[msgId] = read
+	}
+}
+
+// 是否是活跃的状态
+func (g *groupMsgRead) IsIdle() bool {
+	g.mu.Lock()
+	defer g.mu.Unlock()
+	return g.isIdle()
+}
+
+func (g *groupMsgRead) isIdle() bool {
+	pushTime := g.pushTime
+	val := GroupMsgReadRecordDelayTime*2 - time.Since(pushTime)
+	if val <= 0 && g.push == nil && g.count == 0 {
+		return true
+	}
+	return false
+}
+
+// 清理内容
+func (g *groupMsgRead) clear() {
+	select {
+	case <-g.done:
+	default:
+		close(g.done)
+	}
+	g.push = nil
+}
+
+func (g *groupMsgRead) transfer() {
+	// 超时发送
+	timer := time.NewTimer(GroupMsgReadRecordDelayTime / 2)
+	defer timer.Stop()
+	for {
+		select {
+		case <-g.done:
+			return
+		case <-timer.C:
+			g.mu.Lock()
+			pushTime := g.pushTime
+			val := GroupMsgReadRecordDelayTime - time.Since(pushTime)
+			push := g.push
+			if val > 0 && g.count < GroupMsgReadRecordDelayCount || push == nil {
+				// 时间和数量都没有达标
+				if val > 0 {
+					timer.Reset(val)
+				}
+				g.mu.Unlock()
+				continue
+			}
+			g.pushTime = time.Now()
+			g.push = nil
+			g.count = 0
+			timer.Reset(val)
+			g.mu.Unlock()
+			// 推送
+			logx.Infof("超过合并的条件推送 %v", push)
+			g.pushCh <- push
+		default:
+			g.mu.Lock()
+			if g.count >= GroupMsgReadRecordDelayCount {
+				push := g.push
+				// 达到了数量的话也要进行推送
+				g.pushTime = time.Now()
+				g.push = nil
+				g.count = 0
+				g.mu.Unlock()
+				// 推送
+				logx.Infof("超过合并的条件推送 %v", push)
+				g.pushCh <- push
+				continue
+			}
+			if g.isIdle() {
+				g.mu.Unlock()
+				g.pushCh <- &msgChat.Push{
+					ChatType:       msgChat.GroupChatType,
+					ConversationId: g.conversationId,
+				}
+				continue
+			}
+			g.mu.Unlock()
+			timeDelay := GroupMsgReadRecordDelayTime / 4
+			if timeDelay > time.Second {
+				timeDelay = time.Second
+			}
+			time.Sleep(timeDelay)
+		}
+	}
+	// 超量发送
+}

+ 78 - 0
internal/handler/msgTransfer/msgChatTransfer.go

@@ -0,0 +1,78 @@
+package msgTransfer
+
+import (
+	"context"
+	"encoding/json"
+	"slowwild/internal/immodel"
+	"slowwild/internal/svc"
+	"slowwild/internal/utils"
+	"strconv"
+
+	msgChat "git.banshen.xyz/huangguangrong/slow_wild_queue/types"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+)
+
+/**
+消息消费
+*/
+
+type MsgChatTransfer struct {
+	*BaseMsgTransfer
+}
+
+func NewMsgChatTransfer(svc *svc.ServiceContext) *MsgChatTransfer {
+	return &MsgChatTransfer{
+		NewBaseMsgTransfer(svc),
+	}
+}
+
+// 消息消费的实现方法
+func (m *MsgChatTransfer) Consume(ctx context.Context, key, value string) error {
+	var (
+		data  msgChat.MsgChatTransfer
+		msgId = primitive.NewObjectID()
+	)
+	if err := json.Unmarshal([]byte(value), &data); err != nil {
+		return err
+	}
+	if err := m.addChatLog(ctx, msgId, &data); err != nil {
+		return err
+	}
+
+	return m.Transfer(ctx, &msgChat.Push{
+		MsgID:          msgId.Hex(),
+		ConversationId: data.ConversationId,
+		ChatType:       data.ChatType,
+		SendID:         data.SendID,
+		RecvID:         data.RecvID,
+		RecvIDs:        data.RecvIDs,
+		SendTime:       data.SendTime,
+		MType:          data.MType,
+		Content:        data.Content,
+	})
+}
+
+// 记录消息到mongodb
+func (m *MsgChatTransfer) addChatLog(ctx context.Context, msgId primitive.ObjectID, data *msgChat.MsgChatTransfer) error {
+	//记录消息
+	chatlog := immodel.ChatLog{
+		ID:             msgId,
+		ConversationId: data.ConversationId,
+		SendId:         strconv.FormatInt(data.SendID, 10),
+		RecvId:         strconv.FormatInt(data.RecvID, 10),
+		MsgType:        data.MType,
+		MsgContent:     data.Content,
+		SendTime:       data.SendTime,
+		ChatType:       data.ChatType,
+	}
+	//设置自己为已读状态
+	readRecords := utils.NewBitMap(0)
+	readRecords.Set(data.SendID)
+	chatlog.ReadRecords = readRecords.Export()
+
+	err := m.svcCtx.ChatLogModel.Insert(ctx, &chatlog)
+	if err != nil {
+		return err
+	}
+	return m.svcCtx.ConversationModel.UpdateMsg(ctx, &chatlog)
+}

+ 143 - 0
internal/handler/msgTransfer/msgReadTransfer.go

@@ -0,0 +1,143 @@
+package msgTransfer
+
+import (
+	"context"
+	"encoding/base64"
+	"encoding/json"
+	"slowwild/internal/svc"
+	"slowwild/internal/utils"
+	"sync"
+	"time"
+
+	msgChat "git.banshen.xyz/huangguangrong/slow_wild_queue/types"
+	"github.com/zeromicro/go-zero/core/stores/cache"
+)
+
+var (
+	GroupMsgReadRecordDelayTime  = time.Second // 超时时间
+	GroupMsgReadRecordDelayCount = 10          //最大消息处理数量
+)
+
+const (
+	GroupMsgReadHandlerAtTransfer = iota //是否开启群消息批量处理
+	GroupMsgReadHandlerDelayTransfer
+)
+
+type MsgReadTransfer struct {
+	*BaseMsgTransfer
+	cache.Cache
+	mu        sync.Mutex
+	groupMsgs map[string]*groupMsgRead
+	push      chan *msgChat.Push
+}
+
+func NewMsgReadTransfer(svc *svc.ServiceContext) *MsgReadTransfer {
+	m := &MsgReadTransfer{
+		BaseMsgTransfer: NewBaseMsgTransfer(svc),
+		groupMsgs:       make(map[string]*groupMsgRead, 1),
+		push:            make(chan *msgChat.Push, 1),
+	}
+	if svc.Config.MsgReadHandler.GroupMsgReadHandler != GroupMsgReadHandlerAtTransfer {
+		// 开启了群消息批量处理
+		if svc.Config.MsgReadHandler.GroupMsgReadRecordDelayCount > 0 {
+			GroupMsgReadRecordDelayCount = int(svc.Config.MsgReadHandler.GroupMsgReadRecordDelayCount)
+		}
+		if svc.Config.MsgReadHandler.GroupMsgReadRecordDelayTime > 0 {
+			GroupMsgReadRecordDelayTime = time.Duration(svc.Config.MsgReadHandler.GroupMsgReadRecordDelayTime) * time.Second
+		}
+	}
+	go m.transfer()
+	return m
+}
+
+func (m *MsgReadTransfer) Consume(ctx context.Context, key, value string) error {
+	var (
+		data msgChat.MsgMarkRead
+	)
+	if err := json.Unmarshal([]byte(value), &data); err != nil {
+		return err
+	}
+
+	// 更新已读未读处理
+	read, err := m.UpdateChatLogRead(ctx, &data)
+	if err != nil {
+		return err
+	}
+	push := &msgChat.Push{
+		ConversationId: data.ConversationId,
+		ChatType:       data.ChatType,
+		SendID:         data.SendID,
+		RecvID:         data.RecvID,
+		ContentType:    msgChat.ContentMakeRead,
+		ReadRecords:    read,
+	}
+	switch data.ChatType {
+	case msgChat.SingleChatType:
+		// 私聊直接推送
+		m.push <- push
+	case msgChat.GroupChatType:
+		// 判断是否开启合并消息
+		if m.svcCtx.Config.MsgReadHandler.GroupMsgReadHandler == GroupMsgReadHandlerAtTransfer {
+			m.push <- push
+		}
+		m.mu.Lock()
+		defer m.mu.Unlock()
+		push.SendID = 0
+		if _, ok := m.groupMsgs[push.ConversationId]; ok {
+			m.Infof("merge push %v", push.ConversationId)
+			m.groupMsgs[push.ConversationId].mergePush(push)
+		} else {
+			m.groupMsgs[push.ConversationId] = newGroupMsgRead(push, m.push)
+		}
+	}
+
+	return nil
+}
+
+func (m *MsgReadTransfer) UpdateChatLogRead(ctx context.Context, data *msgChat.MsgMarkRead) (map[string]string, error) {
+	res := make(map[string]string)
+	chatLogs, err := m.svcCtx.ChatLogModel.ListByMsgIds(ctx, data.MsgIds)
+	if err != nil {
+		return nil, err
+	}
+	for _, chat := range chatLogs {
+		switch chat.ChatType {
+		case msgChat.SingleChatType:
+			chat.ReadRecords = []byte{1}
+		case msgChat.GroupChatType:
+			readRecords := utils.Load(chat.ReadRecords)
+			readRecords.Set(data.SendID)
+			chat.ReadRecords = readRecords.Export()
+		}
+
+		res[chat.ID.Hex()] = base64.StdEncoding.EncodeToString(chat.ReadRecords)
+		err := m.svcCtx.ChatLogModel.UpdateMakeRead(ctx, chat.ID, chat.ReadRecords)
+		if err != nil {
+			return nil, err
+		}
+
+	}
+	return res, nil
+}
+
+func (m *MsgReadTransfer) transfer() {
+	for push := range m.push {
+		if push.RecvID > 0 || len(push.RecvIDs) > 0 {
+			if err := m.Transfer(context.Background(), push); err != nil {
+				m.Errorf("m transfer err %v push %v", err, push)
+			}
+		}
+		if push.ChatType == msgChat.SingleChatType {
+			continue
+		}
+		if m.svcCtx.Config.MsgReadHandler.GroupMsgReadHandler == GroupMsgReadHandlerAtTransfer {
+			continue
+		}
+		// 接收者id为空
+		m.mu.Lock()
+		if _, ok := m.groupMsgs[push.ConversationId]; ok && m.groupMsgs[push.ConversationId].IsIdle() {
+			m.groupMsgs[push.ConversationId].clear()
+			delete(m.groupMsgs, push.ConversationId)
+		}
+	}
+}

+ 69 - 0
internal/handler/msgTransfer/msgTransfer.go

@@ -0,0 +1,69 @@
+package msgTransfer
+
+import (
+	"context"
+	"fmt"
+	"slowwild/internal/constants"
+	"slowwild/internal/svc"
+
+	msgChat "git.banshen.xyz/huangguangrong/slow_wild_queue/types"
+	"github.com/zeromicro/go-zero/core/logx"
+)
+
+type BaseMsgTransfer struct {
+	svcCtx *svc.ServiceContext
+	logx.Logger
+}
+
+func NewBaseMsgTransfer(svc *svc.ServiceContext) *BaseMsgTransfer {
+	return &BaseMsgTransfer{
+		svcCtx: svc,
+		Logger: logx.WithContext(context.Background()),
+	}
+}
+
+func (b *BaseMsgTransfer) Transfer(ctx context.Context, data *msgChat.Push) error {
+	var err error
+	switch data.ChatType {
+	case msgChat.SingleChatType:
+		err = b.single(ctx, data)
+	case msgChat.GroupChatType:
+		err = b.group(ctx, data)
+	}
+	return err
+}
+
+func (b *BaseMsgTransfer) single(ctx context.Context, data *msgChat.Push) error {
+	// 重新投送回消息队列,消费者接到之后直接推送到客户端
+	fmt.Println("single 推送消息队列")
+	b.svcCtx.MsgPushToClientTransfer.Push(&msgChat.MessagePushData{
+		FrameType: msgChat.FrameData,
+		Id:        data.MsgID,
+		Method:    "push",
+		FormID:    constants.SystemUserId,
+		UserID:    data.RecvID,
+		Data:      data,
+	})
+	return nil
+}
+
+func (b *BaseMsgTransfer) group(ctx context.Context, data *msgChat.Push) error {
+	// 查询群用户
+	// socialLogic := paopaosociallogic.NewGetGroupMembersLogic(ctx, b.svcCtx)
+	// members, err := socialLogic.GetGroupMembers(&paopaosocial.GetGroupMembersReq{
+	// 	GroupId: data.RecvID,
+	// 	UserId:  data.SendID,
+	// })
+	// if err != nil {
+	// 	return err
+	// }
+	// data.RecvIDs = make([]int64, 0, members.Count)
+	// for _, item := range members.List {
+	// 	if item.UserId == data.SendID {
+	// 		continue
+	// 	}
+	// 	data.RecvIDs = append(data.RecvIDs, item.UserId)
+	// }
+	// // 重新投送到消息队列,消费者接收到消息之后立即推送出去
+	return nil
+}

+ 25 - 0
internal/immodel/chatlogmodel.go

@@ -0,0 +1,25 @@
+package immodel
+
+import "github.com/zeromicro/go-zero/core/stores/mon"
+
+var _ ChatLogModel = (*customChatLogModel)(nil)
+
+type (
+	// ChatLogModel is an interface to be customized, add more methods here,
+	// and implement the added methods in customChatLogModel.
+	ChatLogModel interface {
+		chatLogModel
+	}
+
+	customChatLogModel struct {
+		*defaultChatLogModel
+	}
+)
+
+// NewChatLogModel returns a model for the mongo.
+func NewChatLogModel(url, db, collection string) ChatLogModel {
+	conn := mon.MustNewModel(url, db, collection)
+	return &customChatLogModel{
+		defaultChatLogModel: newDefaultChatLogModel(conn),
+	}
+}

+ 107 - 0
internal/immodel/chatlogmodelgen.go

@@ -0,0 +1,107 @@
+// Code generated by goctl. DO NOT EDIT.
+package immodel
+
+import (
+	"context"
+	"time"
+
+	"github.com/zeromicro/go-zero/core/stores/mon"
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"go.mongodb.org/mongo-driver/mongo"
+)
+
+type chatLogModel interface {
+	Insert(ctx context.Context, data *ChatLog) error
+	FindOne(ctx context.Context, id string) (*ChatLog, error)
+	Update(ctx context.Context, data *ChatLog) (*mongo.UpdateResult, error)
+	Delete(ctx context.Context, id string) (int64, error)
+	ListByMsgIds(ctx context.Context, msgIds []string) ([]*ChatLog, error)
+	UpdateMakeRead(ctx context.Context, id primitive.ObjectID, readRecords []byte) error
+}
+
+type defaultChatLogModel struct {
+	conn *mon.Model
+}
+
+func newDefaultChatLogModel(conn *mon.Model) *defaultChatLogModel {
+	return &defaultChatLogModel{conn: conn}
+}
+
+func (m *defaultChatLogModel) Insert(ctx context.Context, data *ChatLog) error {
+	if data.ID.IsZero() {
+		data.ID = primitive.NewObjectID()
+		data.CreateAt = time.Now()
+		data.UpdateAt = time.Now()
+	}
+
+	_, err := m.conn.InsertOne(ctx, data)
+	return err
+}
+
+func (m *defaultChatLogModel) FindOne(ctx context.Context, id string) (*ChatLog, error) {
+	oid, err := primitive.ObjectIDFromHex(id)
+	if err != nil {
+		return nil, ErrInvalidObjectId
+	}
+
+	var data ChatLog
+
+	err = m.conn.FindOne(ctx, &data, bson.M{"_id": oid})
+	switch err {
+	case nil:
+		return &data, nil
+	case mon.ErrNotFound:
+		return nil, ErrNotFound
+	default:
+		return nil, err
+	}
+}
+
+func (m *defaultChatLogModel) Update(ctx context.Context, data *ChatLog) (*mongo.UpdateResult, error) {
+	data.UpdateAt = time.Now()
+
+	res, err := m.conn.UpdateOne(ctx, bson.M{"_id": data.ID}, bson.M{"$set": data})
+	return res, err
+}
+
+func (m *defaultChatLogModel) Delete(ctx context.Context, id string) (int64, error) {
+	oid, err := primitive.ObjectIDFromHex(id)
+	if err != nil {
+		return 0, ErrInvalidObjectId
+	}
+
+	res, err := m.conn.DeleteOne(ctx, bson.M{"_id": oid})
+	return res, err
+}
+
+// 根据消息id区查询
+func (m *defaultChatLogModel) ListByMsgIds(ctx context.Context, msgIds []string) ([]*ChatLog, error) {
+	var data []*ChatLog
+
+	ids := make([]primitive.ObjectID, 0, len(msgIds))
+	for _, id := range msgIds {
+		oid, _ := primitive.ObjectIDFromHex(id)
+		ids = append(ids, oid)
+	}
+	filter := bson.M{
+		"_id": bson.M{
+			"$in": ids,
+		},
+	}
+	err := m.conn.Find(ctx, &data, filter)
+	switch err {
+	case nil:
+		return data, nil
+	case mon.ErrNotFound:
+		return nil, ErrNotFound
+	default:
+		return nil, err
+	}
+}
+
+// 修改已读记录
+func (m *defaultChatLogModel) UpdateMakeRead(ctx context.Context, id primitive.ObjectID, readRecords []byte) error {
+	_, err := m.conn.UpdateOne(ctx, bson.M{"_id": id}, bson.M{"$set": bson.M{"readRecords": readRecords}})
+	return err
+}

+ 28 - 0
internal/immodel/chatlogtypes.go

@@ -0,0 +1,28 @@
+package immodel
+
+import (
+	"time"
+
+	msgChat "git.banshen.xyz/huangguangrong/slow_wild_queue/types"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+)
+
+/**
+ * 聊天记录
+ */
+type ChatLog struct {
+	ID primitive.ObjectID `bson:"_id,omitempty" json:"id,omitempty"`
+
+	ConversationId string           `bson:"conversationId"` // 会话id
+	SendId         string           `bson:"sendId"`         // 发送者id
+	RecvId         string           `bson:"recvId"`         // 接收者id
+	ChatType       msgChat.ChatType `bson:"chatType"`       // 会话类型
+	MsgType        msgChat.MType    `bson:"msgType"`        // 消息类型
+	MsgContent     string           `bson:"msgContent"`     // 消息内容
+	SendTime       int64            `bson:"sendTime"`       // 消息发送时间
+	Status         int              `bson:"status"`         // 消息状态 0-正常 1-删除
+	ReadRecords    []byte           `bson:"readRecords"`    // 记录该消息的已读未读
+
+	UpdateAt time.Time `bson:"updateAt,omitempty" json:"updateAt,omitempty"`
+	CreateAt time.Time `bson:"createAt,omitempty" json:"createAt,omitempty"`
+}

+ 25 - 0
internal/immodel/conversationlistmodel.go

@@ -0,0 +1,25 @@
+package immodel
+
+import "github.com/zeromicro/go-zero/core/stores/mon"
+
+var _ ConversationListModel = (*customConversationListModel)(nil)
+
+type (
+	// ConversationListModel is an interface to be customized, add more methods here,
+	// and implement the added methods in customConversationListModel.
+	ConversationListModel interface {
+		conversationListModel
+	}
+
+	customConversationListModel struct {
+		*defaultConversationListModel
+	}
+)
+
+// NewConversationListModel returns a model for the mongo.
+func NewConversationListModel(url, db, collection string) ConversationListModel {
+	conn := mon.MustNewModel(url, db, collection)
+	return &customConversationListModel{
+		defaultConversationListModel: newDefaultConversationListModel(conn),
+	}
+}

+ 74 - 0
internal/immodel/conversationlistmodelgen.go

@@ -0,0 +1,74 @@
+// Code generated by goctl. DO NOT EDIT.
+package immodel
+
+import (
+	"context"
+	"time"
+
+	"github.com/zeromicro/go-zero/core/stores/mon"
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"go.mongodb.org/mongo-driver/mongo"
+)
+
+type conversationListModel interface {
+	Insert(ctx context.Context, data *ConversationList) error
+	FindOne(ctx context.Context, id string) (*ConversationList, error)
+	Update(ctx context.Context, data *ConversationList) (*mongo.UpdateResult, error)
+	Delete(ctx context.Context, id string) (int64, error)
+}
+
+type defaultConversationListModel struct {
+	conn *mon.Model
+}
+
+func newDefaultConversationListModel(conn *mon.Model) *defaultConversationListModel {
+	return &defaultConversationListModel{conn: conn}
+}
+
+func (m *defaultConversationListModel) Insert(ctx context.Context, data *ConversationList) error {
+	if data.ID.IsZero() {
+		data.ID = primitive.NewObjectID()
+		data.CreateAt = time.Now()
+		data.UpdateAt = time.Now()
+	}
+
+	_, err := m.conn.InsertOne(ctx, data)
+	return err
+}
+
+func (m *defaultConversationListModel) FindOne(ctx context.Context, id string) (*ConversationList, error) {
+	oid, err := primitive.ObjectIDFromHex(id)
+	if err != nil {
+		return nil, ErrInvalidObjectId
+	}
+
+	var data ConversationList
+
+	err = m.conn.FindOne(ctx, &data, bson.M{"_id": oid})
+	switch err {
+	case nil:
+		return &data, nil
+	case mon.ErrNotFound:
+		return nil, ErrNotFound
+	default:
+		return nil, err
+	}
+}
+
+func (m *defaultConversationListModel) Update(ctx context.Context, data *ConversationList) (*mongo.UpdateResult, error) {
+	data.UpdateAt = time.Now()
+
+	res, err := m.conn.UpdateOne(ctx, bson.M{"_id": data.ID}, bson.M{"$set": data})
+	return res, err
+}
+
+func (m *defaultConversationListModel) Delete(ctx context.Context, id string) (int64, error) {
+	oid, err := primitive.ObjectIDFromHex(id)
+	if err != nil {
+		return 0, ErrInvalidObjectId
+	}
+
+	res, err := m.conn.DeleteOne(ctx, bson.M{"_id": oid})
+	return res, err
+}

+ 17 - 0
internal/immodel/conversationlisttypes.go

@@ -0,0 +1,17 @@
+package immodel
+
+import (
+	"time"
+
+	"go.mongodb.org/mongo-driver/bson/primitive"
+)
+
+type ConversationList struct {
+	ID primitive.ObjectID `bson:"_id,omitempty" json:"id,omitempty"`
+
+	UserId           string                   `bson:"userId"`           // 用户id
+	ConversationList map[string]*Conversation `bson:"conversationList"` // 会话列表
+
+	UpdateAt time.Time `bson:"updateAt,omitempty" json:"updateAt,omitempty"`
+	CreateAt time.Time `bson:"createAt,omitempty" json:"createAt,omitempty"`
+}

+ 25 - 0
internal/immodel/conversationmodel.go

@@ -0,0 +1,25 @@
+package immodel
+
+import "github.com/zeromicro/go-zero/core/stores/mon"
+
+var _ ConversationModel = (*customConversationModel)(nil)
+
+type (
+	// ConversationModel is an interface to be customized, add more methods here,
+	// and implement the added methods in customConversationModel.
+	ConversationModel interface {
+		conversationModel
+	}
+
+	customConversationModel struct {
+		*defaultConversationModel
+	}
+)
+
+// NewConversationModel returns a model for the mongo.
+func NewConversationModel(url, db, collection string) ConversationModel {
+	conn := mon.MustNewModel(url, db, collection)
+	return &customConversationModel{
+		defaultConversationModel: newDefaultConversationModel(conn),
+	}
+}

+ 87 - 0
internal/immodel/conversationmodelgen.go

@@ -0,0 +1,87 @@
+// Code generated by goctl. DO NOT EDIT.
+package immodel
+
+import (
+	"context"
+	"time"
+
+	"github.com/zeromicro/go-zero/core/stores/mon"
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"go.mongodb.org/mongo-driver/mongo"
+)
+
+type conversationModel interface {
+	Insert(ctx context.Context, data *Conversation) error
+	FindOne(ctx context.Context, id string) (*Conversation, error)
+	Update(ctx context.Context, data *Conversation) (*mongo.UpdateResult, error)
+	Delete(ctx context.Context, id string) (int64, error)
+	UpdateMsg(ctx context.Context, chatLog *ChatLog) error
+}
+
+type defaultConversationModel struct {
+	conn *mon.Model
+}
+
+func newDefaultConversationModel(conn *mon.Model) *defaultConversationModel {
+	return &defaultConversationModel{conn: conn}
+}
+
+func (m *defaultConversationModel) Insert(ctx context.Context, data *Conversation) error {
+	if data.ID.IsZero() {
+		data.ID = primitive.NewObjectID()
+		data.CreateAt = time.Now()
+		data.UpdateAt = time.Now()
+	}
+
+	_, err := m.conn.InsertOne(ctx, data)
+	return err
+}
+
+func (m *defaultConversationModel) FindOne(ctx context.Context, id string) (*Conversation, error) {
+	oid, err := primitive.ObjectIDFromHex(id)
+	if err != nil {
+		return nil, ErrInvalidObjectId
+	}
+
+	var data Conversation
+
+	err = m.conn.FindOne(ctx, &data, bson.M{"_id": oid})
+	switch err {
+	case nil:
+		return &data, nil
+	case mon.ErrNotFound:
+		return nil, ErrNotFound
+	default:
+		return nil, err
+	}
+}
+
+func (m *defaultConversationModel) Update(ctx context.Context, data *Conversation) (*mongo.UpdateResult, error) {
+	data.UpdateAt = time.Now()
+
+	res, err := m.conn.UpdateOne(ctx, bson.M{"_id": data.ID}, bson.M{"$set": data})
+	return res, err
+}
+
+func (m *defaultConversationModel) Delete(ctx context.Context, id string) (int64, error) {
+	oid, err := primitive.ObjectIDFromHex(id)
+	if err != nil {
+		return 0, ErrInvalidObjectId
+	}
+
+	res, err := m.conn.DeleteOne(ctx, bson.M{"_id": oid})
+	return res, err
+}
+
+func (m *defaultConversationModel) UpdateMsg(ctx context.Context, chatLog *ChatLog) error {
+	_, err := m.conn.UpdateOne(ctx,
+		bson.M{"conversationId": chatLog.ConversationId},
+		bson.M{
+			// 更新会话总消息数
+			"$inc": bson.M{"total": 1},
+			"$set": bson.M{"msg": chatLog},
+		},
+	)
+	return err
+}

+ 26 - 0
internal/immodel/conversationtypes.go

@@ -0,0 +1,26 @@
+package immodel
+
+import (
+	"time"
+
+	msgChat "git.banshen.xyz/huangguangrong/slow_wild_queue/types"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+)
+
+/**
+* 会话model
+ */
+
+type Conversation struct {
+	ID primitive.ObjectID `bson:"_id,omitempty" json:"id,omitempty"`
+
+	ConversationId string           `bson:"conversationId,omitempty"` // 会话id
+	ChatType       msgChat.ChatType `bson:"chatType,omitempty"`       // 会话类型
+	IsShow         bool             `bson:"isShow,omitempty"`         // 是否显示false-否 true-是
+	Total          int              `bson:"total,omitempty"`          // 消息总数
+	ReadSeq        int64            `bson:"seq"`                      // 消息已读序号
+	Msg            *ChatLog         `bson:"msg,omitempty"`            // 最近一条消息
+
+	UpdateAt time.Time `bson:"updateAt,omitempty" json:"updateAt,omitempty"`
+	CreateAt time.Time `bson:"createAt,omitempty" json:"createAt,omitempty"`
+}

+ 12 - 0
internal/immodel/error.go

@@ -0,0 +1,12 @@
+package immodel
+
+import (
+	"errors"
+
+	"github.com/zeromicro/go-zero/core/stores/mon"
+)
+
+var (
+	ErrNotFound        = mon.ErrNotFound
+	ErrInvalidObjectId = errors.New("invalid objectId")
+)

+ 2 - 2
internal/model/tag_model.go

@@ -113,7 +113,7 @@ func (m *TagModel) GetTagStats(ctx context.Context, tagId int64) (postCount, fol
 	}
 
 	// 获取关注数
-	err = m.conn.WithContext(ctx).Model(&UserFollow{}).
+	err = m.conn.WithContext(ctx).Model(&UserFollowTag{}).
 		Where("tag_id = ? AND is_del = 0", tagId).
 		Count(&followCount).Error
 	if err != nil {
@@ -126,7 +126,7 @@ func (m *TagModel) GetTagStats(ctx context.Context, tagId int64) (postCount, fol
 // IsTagFollowedByUser 检查用户是否关注了话题
 func (m *TagModel) IsTagFollowedByUser(ctx context.Context, userId, tagId int64) (bool, error) {
 	var count int64
-	err := m.conn.WithContext(ctx).Model(&UserFollow{}).
+	err := m.conn.WithContext(ctx).Model(&UserFollowTag{}).
 		Where("user_id = ? AND tag_id = ? AND is_del = 0", userId, tagId).
 		Count(&count).Error
 	return count > 0, err

+ 31 - 20
internal/svc/servicecontext.go

@@ -4,9 +4,11 @@ import (
 	"context"
 	"fmt"
 	"slowwild/internal/config"
+	"slowwild/internal/immodel"
 	"slowwild/internal/model"
 	"time"
 
+	slowWildQueue "git.banshen.xyz/huangguangrong/slow_wild_queue"
 	"github.com/go-redis/redis/v8"
 	"gorm.io/driver/mysql"
 	"gorm.io/gorm"
@@ -14,16 +16,20 @@ import (
 )
 
 type ServiceContext struct {
-	Config           config.Config
-	Redis            *redis.Client
-	UserModel        *model.UserModel
-	UserFollowModel  *model.UserFollowModel
-	PostModel        *model.PostModel
-	PostContentModel *model.PostContentModel
-	TagModel         *model.TagModel
-	MessageModel     *model.MessageModel
-	CommentModel     *model.CommentModel
-	PostActionModel  *model.PostActionModel
+	Config                  config.Config
+	Redis                   *redis.Client
+	UserModel               *model.UserModel
+	UserFollowModel         *model.UserFollowModel
+	PostModel               *model.PostModel
+	PostContentModel        *model.PostContentModel
+	TagModel                *model.TagModel
+	MessageModel            *model.MessageModel
+	CommentModel            *model.CommentModel
+	PostActionModel         *model.PostActionModel
+	ChatLogModel            immodel.ChatLogModel
+	ConversationModel       immodel.ConversationModel
+	ConversationListModel   immodel.ConversationListModel
+	MsgPushToClientTransfer slowWildQueue.MsgPushClientTransferClient
 }
 
 func NewServiceContext(c config.Config) *ServiceContext {
@@ -53,19 +59,24 @@ func NewServiceContext(c config.Config) *ServiceContext {
 		},
 	})
 	if err != nil {
+		fmt.Println("数据库连接失败", err.Error())
 		panic(err)
 	}
 
 	return &ServiceContext{
-		Config:           c,
-		Redis:            rdb,
-		UserModel:        model.NewUserModel(db),
-		UserFollowModel:  model.NewUserFollowModel(db),
-		PostModel:        model.NewPostModel(db, rdb),
-		PostContentModel: model.NewPostContentModel(db),
-		TagModel:         model.NewTagModel(db, rdb),
-		MessageModel:     model.NewMessageModel(db),
-		CommentModel:     model.NewCommentModel(db, rdb),
-		PostActionModel:  model.NewPostActionModel(db, rdb),
+		Config:                  c,
+		Redis:                   rdb,
+		UserModel:               model.NewUserModel(db),
+		UserFollowModel:         model.NewUserFollowModel(db),
+		PostModel:               model.NewPostModel(db, rdb),
+		PostContentModel:        model.NewPostContentModel(db),
+		TagModel:                model.NewTagModel(db, rdb),
+		MessageModel:            model.NewMessageModel(db),
+		CommentModel:            model.NewCommentModel(db, rdb),
+		PostActionModel:         model.NewPostActionModel(db, rdb),
+		ChatLogModel:            immodel.NewChatLogModel(c.Mongo.Url, c.Mongo.Db, c.Mongo.Collection),
+		ConversationModel:       immodel.NewConversationModel(c.Mongo.Url, c.Mongo.Db, c.Mongo.Collection),
+		ConversationListModel:   immodel.NewConversationListModel(c.Mongo.Url, c.Mongo.Db, c.Mongo.Collection),
+		MsgPushToClientTransfer: slowWildQueue.NewMsgPushClientTransferClient(c.MsgPushToClientTransfer.Addrs, c.MsgPushToClientTransfer.Topic),
 	}
 }

+ 66 - 0
internal/utils/bitmap.go

@@ -0,0 +1,66 @@
+package utils
+
+type BitMap struct {
+	bits []byte
+	size int
+}
+
+func NewBitMap(size int) *BitMap {
+	if size == 0 {
+		size = 250
+	}
+
+	return &BitMap{
+		bits: make([]byte, size), // 字节的大小,每个字节中存8个bit
+		size: size * 8,           // 整个bitmap的大小
+	}
+}
+
+func (b *BitMap) Set(id int64) {
+	// 计算id在哪个bit
+	idx := hash(id) % b.size
+	// 计算在哪个byte
+	byteIdx := idx / 8
+	// 计算byte中的bit位置
+	bitIdx := idx % 8
+
+	b.bits[byteIdx] |= 1 << bitIdx
+}
+
+func (b *BitMap) IsSet(id int64) bool {
+	// 计算id在哪个bit
+	idx := hash(id) % b.size
+	// 计算在哪个byte
+	byteIdx := idx / 8
+	// 计算byte中的bit位置
+	bitIdx := idx % 8
+
+	return (b.bits[byteIdx] & (1 << bitIdx)) != 0
+}
+
+func (b *BitMap) Export() []byte {
+	return b.bits
+}
+
+func Load(bits []byte) *BitMap {
+	if len(bits) == 0 {
+		return NewBitMap(0)
+	}
+	return &BitMap{
+		bits: bits,
+		size: len(bits) * 8,
+	}
+}
+
+func hash(id int64) int {
+	// 使用BKDR哈希算法
+	seed := 131313 // 31 131 1313 13131 131313, etc
+	hash := 0
+
+	// 将整数id拆分为每个字节,并使用BKDR哈希算法计算哈希值
+	for i := 0; i < 8; i++ {
+		hash = hash*seed + int((id>>(i*8))&0xFF)
+	}
+
+	return hash & 0x7FFFFFFF
+}

+ 13 - 0
slowwild.go

@@ -5,6 +5,7 @@ import (
 	"fmt"
 
 	"slowwild/internal/config"
+	"slowwild/internal/handler"
 	"slowwild/internal/middleware/auth"
 	"slowwild/internal/server"
 	"slowwild/internal/svc"
@@ -38,6 +39,18 @@ func main() {
 	// 添加用户认证拦截器
 	s.AddUnaryInterceptors(auth.NewUserAuthInterceptor())
 
+	// 开一个协程专门监听消息推送
+	go func(svc *svc.ServiceContext, conf config.Config) {
+		listen := handler.NewListen(ctx)
+
+		serviceGroup := service.NewServiceGroup()
+		defer serviceGroup.Stop()
+		for _, s := range listen.Services() {
+			serviceGroup.Add(s)
+		}
+		serviceGroup.Start()
+	}(ctx, c)
+
 	fmt.Printf("Starting rpc server at %s...\n", c.ListenOn)
 	s.Start()
 }