v_grhhuang vor 9 Monaten
Ursprung
Commit
84111559a7

+ 23 - 0
etc/slowwildws-api.yaml

@@ -3,3 +3,26 @@ Host: 0.0.0.0
 Port: 9999
 
 MaxConnectionIdle: 300
+
+MsgChatTransfer:
+  Topic: msgChatTransfer
+  Addrs:
+    - 121.11.99.220:19094
+MsgReadTransfer:
+  Topic: msgReadTransfer
+  Addrs:
+    - 121.11.99.220:19094
+MsgPushToClientTransfer:
+  # service的名称
+  Name: MsgSendConsumer
+  # kafka 的多个 Broker 节点
+  Brokers:
+    - 121.11.99.220:19094
+  # 消费者组
+  Group: kafka
+  # 订阅的 Topic 主题
+  Topic: msgPushToClientTransfer
+  # 从头开始消费
+  Offset: first
+  # go-queue 内部是起多个 goroutine 从 kafka 中获取信息写入进程内的 channel,这个参数是控制此处的 goroutine 数量(⚠️ 并不是真正消费时的并发 goroutine 数量)
+  Consumers: 2

+ 10 - 1
go.mod

@@ -4,7 +4,14 @@ go 1.21.13
 
 require github.com/zeromicro/go-zero v1.8.1
 
-require github.com/gorilla/websocket v1.5.3 // indirect
+require github.com/gorilla/websocket v1.5.3
+
+require (
+	git.banshen.xyz/huangguangrong/slow_wild_queue v0.0.2 // indirect
+	github.com/pierrec/lz4/v4 v4.1.21 // indirect
+	github.com/segmentio/kafka-go v0.4.47 // indirect
+	github.com/zeromicro/go-queue v1.2.2 // indirect
+)
 
 require (
 	github.com/beorn7/perks v1.0.1 // indirect
@@ -13,12 +20,14 @@ require (
 	github.com/fatih/color v1.18.0 // indirect
 	github.com/go-logr/logr v1.4.2 // indirect
 	github.com/go-logr/stdr v1.2.2 // indirect
+	github.com/golang-jwt/jwt v3.2.2+incompatible
 	github.com/golang-jwt/jwt/v4 v4.5.1 // indirect
 	github.com/google/uuid v1.6.0 // indirect
 	github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect
 	github.com/klauspost/compress v1.17.11 // indirect
 	github.com/mattn/go-colorable v0.1.13 // indirect
 	github.com/mattn/go-isatty v0.0.20 // indirect
+	github.com/mitchellh/mapstructure v1.5.0
 	github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
 	github.com/openzipkin/zipkin-go v0.4.3 // indirect
 	github.com/pelletier/go-toml/v2 v2.2.2 // indirect

+ 57 - 0
go.sum

@@ -1,3 +1,5 @@
+git.banshen.xyz/huangguangrong/slow_wild_queue v0.0.2 h1:dwT5N0cKAhkXlh/oXz6QZL7yk4OvM5Ir0zIVzlvCUKs=
+git.banshen.xyz/huangguangrong/slow_wild_queue v0.0.2/go.mod h1:qSnhKd5XhF4dBZMiVYiRiCyQdkJ5u6uawSUtYetq0Ys=
 github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
 github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
 github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
@@ -14,6 +16,8 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
 github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
 github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
 github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
+github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY=
+github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
 github.com/golang-jwt/jwt/v4 v4.5.1 h1:JdqV9zKUdtaa9gdPlywC3aeoEsR681PlKC+4F5gQgeo=
 github.com/golang-jwt/jwt/v4 v4.5.1/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
 github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
@@ -26,6 +30,7 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1
 github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k=
 github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 h1:2VTzZjLZBgl62/EtslCrtky5vbi9dd7HrQPQIx6wqiw=
 github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI=
+github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
 github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
 github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
 github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
@@ -39,12 +44,17 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk
 github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
 github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
 github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
+github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
+github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
 github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
 github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
 github.com/openzipkin/zipkin-go v0.4.3 h1:9EGwpqkgnwdEIJ+Od7QVSEIH+ocmm5nPat0G7sjsSdg=
 github.com/openzipkin/zipkin-go v0.4.3/go.mod h1:M9wCJZFWCo2RiY+o1eBCEMe0Dp2S5LDHcMZmk3RmK7c=
 github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
 github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
+github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
+github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
+github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
@@ -59,6 +69,8 @@ github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0leargg
 github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
 github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
 github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
+github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0=
+github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg=
 github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
 github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@@ -72,6 +84,12 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl
 github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
 github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
 github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
+github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
+github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
+github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
+github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
+github.com/zeromicro/go-queue v1.2.2 h1:3TMhRlI/8lZy13Sj6FBBWWRXlsQhGCchRxY2itfV1Is=
+github.com/zeromicro/go-queue v1.2.2/go.mod h1:5HiNTEw1tACi9itho0JYQ1+EpIGpSFM4tOQ4bit+yKM=
 github.com/zeromicro/go-zero v1.8.1 h1:iUYQEMQzS9Pb8ebzJtV3FGtv/YTjZxAh/NvLW/316wo=
 github.com/zeromicro/go-zero v1.8.1/go.mod h1:gc54Ad4qt7OJ0PbKajnYsSKsZBYN4JLRIXKlqDX2A2I=
 github.com/zeromicro/x v0.0.0-20240408115609-8224c482b07e h1:F5waakzloTfbJg2lcO1xvrzO6ssn7jQ38lXIDBz+nbQ=
@@ -102,14 +120,53 @@ go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
 go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
 go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
 go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
+golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
+golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
+golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
+golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
+golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
+golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
+golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
 golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8=
 golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc=
 golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
+golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
+golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
+golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
+golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
+golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
+golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
+golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
+golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
+golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
 golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM=
 golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
+golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 google.golang.org/genproto/googleapis/api v0.0.0-20240711142825-46eb208f015d h1:kHjw/5UfflP/L5EbledDrcG4C2597RtymmGRZvHiCuY=
 google.golang.org/genproto/googleapis/api v0.0.0-20240711142825-46eb208f015d/go.mod h1:mw8MG/Qz5wfgYr6VqVCiZcHe/GJEfI+oGGDCohaVgB0=
 google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 h1:BwIjyKYGsK9dMCBOorzRri8MQwmi7mT9rGHsCEinZkA=

+ 22 - 2
internal/config/config.go

@@ -1,8 +1,28 @@
 package config
 
-import "github.com/zeromicro/go-zero/rest"
+import (
+	"github.com/zeromicro/go-queue/kq"
+	"github.com/zeromicro/go-zero/rest"
+)
 
 type Config struct {
 	rest.RestConf
-	MaxConnectionIdle int
+	MaxConnectionIdle            int // 最大空闲连接时间,秒数
+	DefaultMaxConnectionIdleTime int // 默认空闲连接时间,秒数
+	DefaultAckTimeout            int // 默认ack超时时间,秒数
+	DefaultConcurrency           int // 默认并发数
+	Auth                         struct {
+		AccessSecret string
+		AccessExpire int64
+	}
+	MsgChatTransfer struct {
+		Topic string
+		Addrs []string
+	}
+	MsgReadTransfer struct {
+		Topic string
+		Addrs []string
+	}
+
+	MsgPushToClientTransfer kq.KqConf
 }

+ 14 - 0
internal/constants/common.go

@@ -0,0 +1,14 @@
+package constants
+
+type AckType int
+
+const (
+	NoAck    AckType = iota // 不需要确认
+	OnlyAck                 // 只需要确认, 不需要回包
+	RigorAck                // 严格确认, 需要回包
+)
+
+const UserID = "user_id"
+
+// 系统用户id
+const SYSTEM_ROOT_UID = 1024

+ 7 - 0
internal/constants/message.go

@@ -10,3 +10,10 @@ const (
 	FrameAck   MessageFrameType = 0x2
 	FrameNoAck MessageFrameType = 0x3
 )
+
+type ChatType int
+
+const (
+	GroupChatType ChatType = iota + 1
+	SingleChatType
+)

+ 10 - 8
internal/handler/routes.go

@@ -1,4 +1,3 @@
-// Code generated by goctl. DO NOT EDIT.
 package handler
 
 import (
@@ -10,12 +9,15 @@ import (
 
 func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
 	server.AddRoutes(
-		[]rest.Route{
-			{
-				Method:  http.MethodGet,
-				Path:    "/slowwild/ws",
-				Handler: SlowwildwsHandler(serverCtx),
-			},
-		},
+		rest.WithMiddlewares(
+			[]rest.Middleware{serverCtx.UserMiddleware},
+			[]rest.Route{
+				{
+					Method:  http.MethodGet,
+					Path:    "/slowwild/ws",
+					Handler: SlowwildwsHandler(serverCtx),
+				},
+			}...,
+		),
 	)
 }

+ 13 - 2
internal/handler/slowwildwshandler.go

@@ -3,6 +3,7 @@ package handler
 import (
 	"net/http"
 	"slowwildws/internal/logic"
+	"slowwildws/internal/server"
 	"slowwildws/internal/svc"
 
 	"github.com/gorilla/websocket"
@@ -25,9 +26,19 @@ func SlowwildwsHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
 			return
 		}
 
-		l := logic.NewConnectionLogic(r.Context(), svcCtx, conn)
+		// 初始化一个连接服务
+		connServer := svcCtx.WsServer.InitConn(r.Context(), conn, svcCtx.Config)
 
-		go l.Keepalive()
+		// 添加连接信息到连接池
+		svcCtx.WsServer.AddConn(connServer, r.Context())
 
+		// 开启协程校验心跳,保持连接
+		go connServer.Keepalive()
+
+		// 开启协程处理消息
+		go func(svcCtx *svc.ServiceContext, r *http.Request, connServer *server.ConnectionServer) {
+			l := logic.NewConnectionLogic(r.Context(), svcCtx)
+			l.HandlerConn(connServer)
+		}(svcCtx, r, connServer)
 	}
 }

+ 66 - 0
internal/logic/connectionlogic.go

@@ -0,0 +1,66 @@
+package logic
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"slowwildws/internal/server"
+	"slowwildws/internal/svc"
+	"slowwildws/internal/types"
+
+	"github.com/zeromicro/go-zero/core/logx"
+)
+
+type ConnectionLogic struct {
+	logx.Logger
+	ctx    context.Context
+	svcCtx *svc.ServiceContext
+}
+
+func NewConnectionLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ConnectionLogic {
+	return &ConnectionLogic{
+		Logger: logx.WithContext(ctx),
+		ctx:    ctx,
+		svcCtx: svcCtx,
+	}
+}
+
+// 处理连接信息
+func (c *ConnectionLogic) HandlerConn(conn *server.ConnectionServer) {
+	uid := c.svcCtx.WsServer.GetUserId(conn)
+	if uid == 0 {
+		// 如果用户id不存在,直接关闭连接
+		c.svcCtx.WsServer.Close(conn)
+		return
+	}
+	fmt.Println(fmt.Sprintf("用户id:%d", uid))
+	conn.Uid = uid
+	// 处理任务
+	go c.svcCtx.WsServer.HandleWrite(conn)
+
+	if c.svcCtx.WsServer.IsAck(nil) {
+		go c.svcCtx.WsServer.ReadAck(conn)
+	}
+	for {
+		// 获取消息
+		_, msg, err := conn.ReadMessage()
+		if err != nil {
+			c.Logger.Errorf("websocket conn read message error %v", err)
+			c.svcCtx.WsServer.Close(conn)
+			return
+		}
+		// 解析消息
+		var message types.Message
+		if err = json.Unmarshal(msg, &message); err != nil {
+			c.Logger.Errorf("json unmarshal err %v, msg %v", err, string(msg))
+			c.svcCtx.WsServer.Close(conn)
+			return
+		}
+		if c.svcCtx.WsServer.IsAck(&message) {
+			c.svcCtx.WsServer.Logger.Infof("conn message read ack msg %v", message)
+			conn.AppendMsgMq(&message)
+		} else {
+			conn.WriteMessageToChan(&message)
+		}
+	}
+}

+ 57 - 0
internal/logic/conversation/conversationlogic.go

@@ -0,0 +1,57 @@
+package conversation
+
+import (
+	"context"
+	"fmt"
+	"slowwildws/internal/server"
+	"slowwildws/internal/svc"
+	"slowwildws/internal/types"
+	"slowwildws/internal/utils"
+	"time"
+
+	msgChat "git.banshen.xyz/huangguangrong/slow_wild_queue/types"
+	"github.com/mitchellh/mapstructure"
+	"github.com/zeromicro/go-zero/core/logx"
+)
+
+type ConversationLogic struct {
+	logx.Logger
+	svcCtx *svc.ServiceContext
+}
+
+func NewConversationLogic(ctx context.Context, svcCtx svc.ServiceContext) ConversationLogic {
+	return ConversationLogic{
+		Logger: logx.WithContext(ctx),
+		svcCtx: &svcCtx,
+	}
+}
+
+func (l *ConversationLogic) ConversationChat(srv *server.WebsocketServer, conn *server.ConnectionServer, msg *types.Message) {
+	var data msgChat.Chat
+	if err := mapstructure.Decode(msg.Data, &data); err != nil {
+		srv.Send(srv.NewErrMessage(err), conn)
+		return
+	}
+	switch data.ChatType {
+	case msgChat.SingleChatType:
+		data.ConversationId = utils.CombineUserID(conn.Uid, data.RecvID)
+	case msgChat.GroupChatType:
+		data.ConversationId = utils.UserIdToHex(data.RecvID)
+	}
+	// 推送到kafka
+	err := l.svcCtx.MsgChatTransferClient.Push(&msgChat.MsgChatTransfer{
+		MsgId:          msg.Id,
+		ConversationId: data.ConversationId,
+		ChatType:       msgChat.ChatType(data.ChatType),
+		SendID:         conn.Uid,
+		RecvID:         data.RecvID,
+		SendTime:       time.Now().Unix(),
+		MType:          msgChat.MType(data.MType),
+		Content:        data.Content,
+	})
+	if err != nil {
+		fmt.Println("消息推送失败")
+		srv.Send(srv.NewErrMessage(err), conn)
+		return
+	}
+}

+ 0 - 29
internal/logic/slowwildwslogic.go

@@ -1,29 +0,0 @@
-package logic
-
-import (
-	"context"
-
-	"slowwildws/internal/svc"
-
-	"github.com/zeromicro/go-zero/core/logx"
-)
-
-type SlowwildwsLogic struct {
-	logx.Logger
-	ctx    context.Context
-	svcCtx *svc.ServiceContext
-}
-
-func NewSlowwildwsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *SlowwildwsLogic {
-	return &SlowwildwsLogic{
-		Logger: logx.WithContext(ctx),
-		ctx:    ctx,
-		svcCtx: svcCtx,
-	}
-}
-
-// func (l *SlowwildwsLogic) Slowwildws(req *types.Request) (resp *types.Response, err error) {
-// 	// todo: add your logic here and delete this line
-
-// 	return
-// }

+ 57 - 0
internal/middleware/jwtmiddleware.go

@@ -0,0 +1,57 @@
+package middleware
+
+import (
+	"context"
+	"errors"
+	"net/http"
+	"slowwildws/internal/constants"
+
+	"github.com/golang-jwt/jwt"
+	xhttp "github.com/zeromicro/x/http"
+)
+
+// 自定义 Claims 类型
+type MyCustomClaims struct {
+	UserAuthID int64 `json:"user_id"`
+	jwt.StandardClaims
+}
+
+type JwtMiddleware struct {
+	Secret string
+}
+
+func NewJwtMiddleware(secret string) *JwtMiddleware {
+	return &JwtMiddleware{
+		Secret: secret,
+	}
+}
+
+func (j *JwtMiddleware) Handle(next http.HandlerFunc) http.HandlerFunc {
+	return func(w http.ResponseWriter, r *http.Request) {
+		var authToken string
+		if authToken = r.Header.Get("Sec-Websocket-Protocol"); authToken != "" {
+			r.Header.Set("Authorization", authToken)
+		}
+		parseToken, err := jwt.ParseWithClaims(authToken, &MyCustomClaims{}, func(t *jwt.Token) (interface{}, error) {
+			return []byte(j.Secret), nil
+		})
+		if err != nil {
+			w.WriteHeader(http.StatusUnauthorized)
+			xhttp.JsonBaseResponseCtx(r.Context(), w, errors.New("token invalid"))
+			return
+		}
+		if !parseToken.Valid {
+			w.WriteHeader(http.StatusUnauthorized)
+			xhttp.JsonBaseResponseCtx(r.Context(), w, errors.New("token invalid"))
+			return
+		}
+		claims, ok := parseToken.Claims.(*MyCustomClaims)
+		if !ok {
+			w.WriteHeader(http.StatusUnauthorized)
+			xhttp.JsonBaseResponseCtx(r.Context(), w, errors.New("parse token invalid"))
+			return
+		}
+		*r = *r.WithContext(context.WithValue(r.Context(), constants.UserID, claims.UserAuthID))
+		next(w, r)
+	}
+}

+ 28 - 0
internal/server/authserver.go

@@ -0,0 +1,28 @@
+package server
+
+import (
+	"fmt"
+	"net/http"
+	"strconv"
+	"time"
+)
+
+type AuthServer struct {
+}
+
+func NewAuthLogic() *AuthServer {
+	return &AuthServer{}
+}
+
+func (l *AuthServer) GetChatUserId(r *http.Request) int64 {
+	query := r.URL.Query()
+	if query != nil && query["userId"] != nil {
+		var uid = fmt.Sprintf("%v", query["userId"])
+		atoi, err := strconv.Atoi(uid)
+		if err != nil {
+			return time.Now().UnixMilli()
+		}
+		return int64(atoi)
+	}
+	return time.Now().UnixMilli()
+}

+ 18 - 14
internal/logic/connertionlogic.go → internal/server/connectionserver.go

@@ -1,9 +1,9 @@
-package logic
+package server
 
 import (
 	"context"
+	"slowwildws/internal/config"
 	"slowwildws/internal/constants"
-	"slowwildws/internal/svc"
 	"slowwildws/internal/types"
 	"sync"
 	"time"
@@ -12,10 +12,9 @@ import (
 	"github.com/zeromicro/go-zero/core/logx"
 )
 
-type ConnectionLogic struct {
+type ConnectionServer struct {
 	logx.Logger
 	ctx               context.Context
-	svcCtx            *svc.ServiceContext
 	conn              *websocket.Conn
 	idleMu            sync.Mutex                // 超时锁
 	idle              time.Time                 // 闲置时间
@@ -24,17 +23,17 @@ type ConnectionLogic struct {
 	readMessageList   []*types.Message          // 读取消息队列
 	readMessageSeq    map[string]*types.Message // 读取消息序列化
 	done              chan struct{}             // 结束方法
-	message           chan *types.Message       //消息通道
+	message           chan *types.Message       // 消息通道
+	Uid               int64                     // 用户id
 }
 
-func NewConnectionLogic(ctx context.Context, svcCtx *svc.ServiceContext, conn *websocket.Conn) *ConnectionLogic {
-	return &ConnectionLogic{
+func NewConnectionServer(ctx context.Context, conf config.Config, conn *websocket.Conn) *ConnectionServer {
+	return &ConnectionServer{
 		Logger:            logx.WithContext(ctx),
 		ctx:               ctx,
-		svcCtx:            svcCtx,
 		conn:              conn,
 		idle:              time.Now(),
-		maxConnectionIdle: time.Duration(svcCtx.Config.MaxConnectionIdle) * time.Second,
+		maxConnectionIdle: time.Duration(conf.MaxConnectionIdle) * time.Second,
 		done:              make(chan struct{}),
 		readMessageList:   make([]*types.Message, 0, 2),
 		readMessageSeq:    make(map[string]*types.Message, 2),
@@ -43,7 +42,7 @@ func NewConnectionLogic(ctx context.Context, svcCtx *svc.ServiceContext, conn *w
 }
 
 // 关闭连接
-func (c *ConnectionLogic) Close() error {
+func (c *ConnectionServer) Close() error {
 	select {
 	case <-c.done:
 	default:
@@ -53,7 +52,7 @@ func (c *ConnectionLogic) Close() error {
 }
 
 // 读取消息
-func (c *ConnectionLogic) ReadMessage() (messageType int, p []byte, err error) {
+func (c *ConnectionServer) ReadMessage() (messageType int, p []byte, err error) {
 	// 这里不能先获取锁,因为会被阻塞住,下面这个获取消息的方法是阻塞的,会导致锁一直得不到释放
 	messageType, p, err = c.conn.ReadMessage()
 	c.idleMu.Lock()
@@ -65,7 +64,7 @@ func (c *ConnectionLogic) ReadMessage() (messageType int, p []byte, err error) {
 }
 
 // 写消息
-func (c *ConnectionLogic) WriteMessage(messageType int, data []byte) error {
+func (c *ConnectionServer) WriteMessage(messageType int, data []byte) error {
 	c.idleMu.Lock()
 	defer func() {
 		c.idleMu.Unlock()
@@ -76,7 +75,7 @@ func (c *ConnectionLogic) WriteMessage(messageType int, data []byte) error {
 }
 
 // 心跳检测
-func (c *ConnectionLogic) Keepalive() {
+func (c *ConnectionServer) Keepalive() {
 	idlerTimer := time.NewTimer(c.maxConnectionIdle)
 	defer idlerTimer.Stop()
 
@@ -106,7 +105,7 @@ func (c *ConnectionLogic) Keepalive() {
 }
 
 // 添加消息到队列中
-func (c *ConnectionLogic) appendMsgMq(msg *types.Message) {
+func (c *ConnectionServer) AppendMsgMq(msg *types.Message) {
 	c.messageMu.Lock()
 	defer c.messageMu.Unlock()
 
@@ -130,3 +129,8 @@ func (c *ConnectionLogic) appendMsgMq(msg *types.Message) {
 	c.readMessageList = append(c.readMessageList, msg)
 	c.readMessageSeq[msg.Id] = msg
 }
+
+// 写入消息到通道中
+func (c *ConnectionServer) WriteMessageToChan(message *types.Message) {
+	c.message <- message
+}

+ 39 - 0
internal/server/messageserver.go

@@ -0,0 +1,39 @@
+package server
+
+import (
+	"encoding/json"
+	"slowwildws/internal/constants"
+	"slowwildws/internal/types"
+
+	"github.com/gorilla/websocket"
+)
+
+type MessageServer struct {
+}
+
+func NewMessageServer() *MessageServer {
+	return &MessageServer{}
+}
+
+func (s *MessageServer) Send(msg interface{}, conns ...*ConnectionServer) error {
+	if len(conns) == 0 {
+		return nil
+	}
+	data, err := json.Marshal(msg)
+	if err != nil {
+		return err
+	}
+	for _, conn := range conns {
+		if cErr := conn.WriteMessage(websocket.TextMessage, data); cErr != nil {
+			return cErr
+		}
+	}
+	return nil
+}
+
+func NewErrMessage(err error) *types.Message {
+	return &types.Message{
+		FrameType: constants.FrameErr,
+		Data:      err.Error(),
+	}
+}

+ 44 - 0
internal/server/optionsserver.go

@@ -0,0 +1,44 @@
+package server
+
+import (
+	"slowwildws/internal/config"
+	"slowwildws/internal/constants"
+	"time"
+)
+
+type ServerOptions func(opt *serverOption)
+
+type serverOption struct {
+	maxConnectionIdle time.Duration     // 连接空闲超时时间
+	ack               constants.AckType // ack类型
+	ackTimeout        time.Duration     // ack等待超时时间
+
+	concurrency int // 并发数
+}
+
+func newServerOptions(conf config.Config, opts ...ServerOptions) serverOption {
+	o := serverOption{
+		maxConnectionIdle: time.Duration(conf.DefaultMaxConnectionIdleTime) * time.Second,
+		ackTimeout:        time.Duration(conf.DefaultAckTimeout) * time.Second,
+		concurrency:       conf.DefaultConcurrency,
+	}
+	for _, opt := range opts {
+		opt(&o)
+	}
+	return o
+}
+
+func WithServerAck(ack constants.AckType) ServerOptions {
+	return func(opt *serverOption) {
+		opt.ack = ack
+	}
+}
+
+func WithServerMaxConnectionIdle(maxConnectionIdle time.Duration) ServerOptions {
+	return func(opt *serverOption) {
+		if maxConnectionIdle > 0 {
+			opt.maxConnectionIdle = maxConnectionIdle
+		}
+
+	}
+}

+ 228 - 0
internal/server/websocketserver.go

@@ -0,0 +1,228 @@
+package server
+
+import (
+	"context"
+	"fmt"
+	"net/http"
+	"slowwildws/internal/config"
+	"slowwildws/internal/constants"
+	"slowwildws/internal/types"
+	"sync"
+	"time"
+
+	"github.com/gorilla/websocket"
+	"github.com/zeromicro/go-zero/core/logx"
+	"github.com/zeromicro/go-zero/core/threading"
+)
+
+type WebsocketServer struct {
+	sync.RWMutex // 读写锁
+	connToUser   map[*ConnectionServer]int64
+	userToConn   map[int64]*ConnectionServer
+	upgrader     websocket.Upgrader
+	logx.Logger
+	opt *serverOption
+	*threading.TaskRunner
+	ctx           context.Context
+	messageServer *MessageServer
+}
+
+func NewWebsockerServer(conf config.Config, opts ...ServerOptions) *WebsocketServer {
+	opt := newServerOptions(conf, opts...)
+	return &WebsocketServer{
+		upgrader: websocket.Upgrader{
+			CheckOrigin: func(r *http.Request) bool {
+				return true
+			},
+		},
+		Logger:        logx.WithContext(context.Background()),
+		connToUser:    make(map[*ConnectionServer]int64),
+		userToConn:    make(map[int64]*ConnectionServer),
+		opt:           &opt,
+		TaskRunner:    threading.NewTaskRunner(opt.concurrency),
+		messageServer: NewMessageServer(),
+	}
+}
+
+// 初始化一个连接信息
+func (s *WebsocketServer) InitConn(ctx context.Context, conn *websocket.Conn, conf config.Config) *ConnectionServer {
+	return NewConnectionServer(ctx, conf, conn)
+}
+
+// 把连接添加到map中
+func (s *WebsocketServer) AddConn(conn *ConnectionServer, ctx context.Context) {
+	uid := ctx.Value(constants.UserID).(int64)
+
+	fmt.Println("获取到的用户id: ", uid)
+
+	s.RWMutex.Lock()
+	defer s.RWMutex.Unlock()
+	// 验证用户是否登录过
+	if c := s.userToConn[uid]; c != nil {
+		c.Close()
+	}
+	s.connToUser[conn] = uid
+	s.userToConn[uid] = conn
+	// todo 是否应该再这里加上用户在线标识
+}
+
+// 获取存储的用户id
+func (s *WebsocketServer) GetUserId(conn *ConnectionServer) int64 {
+	s.RWMutex.RLock()
+	defer s.RWMutex.RUnlock()
+
+	if conn == nil {
+		return 0
+	}
+
+	return s.connToUser[conn]
+}
+
+// 关闭这个连接服务
+func (s *WebsocketServer) Close(conn *ConnectionServer) {
+	s.RWMutex.Lock()
+	defer s.RWMutex.Unlock()
+	uid := s.connToUser[conn]
+	if uid == 0 {
+		// 已经关闭了
+		return
+	}
+	delete(s.connToUser, conn)
+	delete(s.userToConn, uid)
+	conn.Close()
+}
+
+// 判断是否需要ack确认
+func (s *WebsocketServer) IsAck(message *types.Message) bool {
+	if message == nil {
+		return s.opt.ack != constants.NoAck
+	}
+	if message.FormID == constants.SYSTEM_ROOT_UID {
+		//超级管理员不需要这个ack确认机制,直接发
+		return false
+	}
+	return s.opt.ack != constants.NoAck && message.FrameType != constants.FrameNoAck
+}
+
+// 读取消息的ack
+func (s *WebsocketServer) ReadAck(conn *ConnectionServer) {
+	for {
+		select {
+		case <-conn.done:
+			s.Infof("close message ack uid %v", conn.Uid)
+			return
+		default:
+
+		}
+		conn.messageMu.Lock()
+		if len(conn.readMessageList) == 0 {
+			conn.messageMu.Unlock()
+			// 没有消息的话让其休眠100毫秒再进行下一次判定
+			time.Sleep(100 * time.Millisecond)
+			continue
+		}
+
+		// 读取第一条消息
+		message := conn.readMessageList[0]
+		//判断ack的方式
+		switch s.opt.ack {
+		case constants.OnlyAck:
+			// 直接给客户端回复
+			s.messageServer.Send(&types.Message{
+				FrameType: constants.FrameAck,
+				Id:        message.Id,
+				AckSeq:    message.AckSeq + 1,
+			}, conn)
+			// 进行业务处理,从队列中移除
+			conn.readMessageList = conn.readMessageList[1:]
+			conn.messageMu.Unlock()
+			conn.message <- message
+		case constants.RigorAck:
+			if message.AckSeq == 0 {
+				// 还没有确认
+				conn.readMessageList[0].AckSeq++             //记录确认序号加1
+				conn.readMessageList[0].AckTime = time.Now() // 记录确认时间
+				s.messageServer.Send(&types.Message{
+					FrameType: constants.FrameAck,
+					Id:        message.Id,
+					AckSeq:    message.AckSeq + 1,
+				}, conn)
+				s.Infof("message ack RigorAck send mid %v, seq %v, time %v", message.Id, message.AckSeq, message.AckTime)
+				conn.messageMu.Unlock()
+				continue
+			}
+
+			// 再验证
+
+			// 1.客户端返回结果,再一次确认
+			msgSeq := conn.readMessageSeq[message.Id]
+			if msgSeq.AckSeq > message.AckSeq {
+				// 进行业务处理,从队列中移除
+				conn.readMessageList = conn.readMessageList[1:]
+				conn.messageMu.Unlock()
+				conn.message <- message
+				s.Infof("message ack rigorAck success mid %v", message.Id)
+				continue
+			}
+			// 2. 客户端没有确认,考虑是否再次发送确认消息
+			val := s.opt.ackTimeout - time.Since(message.AckTime)
+			fmt.Println("超时时间: ", val)
+			// 2.1 超过结束确认,抛出错误
+			if !message.AckTime.IsZero() && val <= 0 {
+				delete(conn.readMessageSeq, message.Id)
+				conn.readMessageList = conn.readMessageList[1:]
+				conn.messageMu.Unlock()
+				continue
+			}
+			// 2.2 未超过,超新发送
+			conn.messageMu.Unlock()
+			s.messageServer.Send(&types.Message{
+				FrameType: constants.FrameAck,
+				Id:        message.Id,
+				AckSeq:    message.AckSeq,
+			}, conn)
+			time.Sleep(3 * time.Second)
+		}
+	}
+
+}
+
+// 任务处理,读取消息体中的method方法,判断是否需要执行
+func (s *WebsocketServer) HandleWrite(conn *ConnectionServer) {
+	for {
+		select {
+		case <-conn.done:
+			//连接关闭
+			return
+		case message := <-conn.message:
+			switch message.FrameType {
+			case constants.FramePing:
+				s.messageServer.Send(&types.Message{FrameType: constants.FramePing}, conn)
+			case constants.FrameData, constants.FrameNoAck:
+				if handler, ok := message.Method; ok {
+					handler(s, conn, message)
+				} else {
+					s.messageServer.Send(&types.Message{FrameType: constants.FrameData, Data: []byte(fmt.Sprintf("不存在执行的方法 %v 请检查", message.Method))}, conn)
+				}
+			}
+			if s.opt.ack != constants.NoAck {
+				// 删除ack消息的序号记录
+				conn.messageMu.Lock()
+				delete(conn.readMessageSeq, message.Id)
+				conn.messageMu.Unlock()
+			}
+		}
+
+	}
+}
+
+func (s *WebsocketServer) Send(msg interface{}, conns ...*ConnectionServer) error {
+	return s.messageServer.Send(msg, conns...)
+}
+
+func (s *WebsocketServer) NewErrMessage(err error) *types.Message {
+	return &types.Message{
+		FrameType: constants.FrameData,
+		Data:      err.Error(),
+	}
+}

+ 17 - 2
internal/svc/servicecontext.go

@@ -2,14 +2,29 @@ package svc
 
 import (
 	"slowwildws/internal/config"
+	"slowwildws/internal/constants"
+	"slowwildws/internal/middleware"
+	"slowwildws/internal/server"
+	"time"
+
+	slowWildQueue "git.banshen.xyz/huangguangrong/slow_wild_queue"
+	"github.com/zeromicro/go-zero/rest"
 )
 
 type ServiceContext struct {
-	Config config.Config
+	Config                config.Config
+	WsServer              *server.WebsocketServer
+	UserMiddleware        rest.Middleware
+	MsgChatTransferClient slowWildQueue.MsgChatTransferClient
+	MsgReadTransferClient slowWildQueue.MsgReadTransferClient
 }
 
 func NewServiceContext(c config.Config) *ServiceContext {
 	return &ServiceContext{
-		Config: c,
+		Config:                c,
+		WsServer:              server.NewWebsockerServer(c, server.WithServerMaxConnectionIdle(time.Duration(c.MaxConnectionIdle)*time.Second), server.WithServerAck(constants.NoAck)),
+		UserMiddleware:        middleware.NewJwtMiddleware(c.Auth.AccessSecret).Handle,
+		MsgChatTransferClient: slowWildQueue.NewMsgChatTransferClient(c.MsgChatTransfer.Addrs, c.MsgChatTransfer.Topic),
+		MsgReadTransferClient: slowWildQueue.NewMsgReadTransferClient(c.MsgReadTransfer.Addrs, c.MsgReadTransfer.Topic),
 	}
 }

+ 41 - 0
internal/utils/wuid.go

@@ -0,0 +1,41 @@
+package utils
+
+import (
+	"errors"
+	"fmt"
+	"sort"
+	"strconv"
+)
+
+func CombineId(aid, bid string) string {
+	ids := []string{aid, bid}
+
+	sort.Slice(ids, func(i, j int) bool {
+		a, _ := strconv.ParseUint(ids[i], 0, 64)
+		b, _ := strconv.ParseUint(ids[j], 0, 64)
+		return a < b
+	})
+
+	return fmt.Sprintf("%s_%s", ids[0], ids[1])
+}
+
+func CombineUserID(aid, bid int64) string {
+	ids := []int64{aid, bid}
+	sort.Slice(ids, func(i, j int) bool {
+		return ids[i] < ids[j]
+	})
+	return fmt.Sprintf("%s%s", fmt.Sprintf("%012x", ids[0]), fmt.Sprintf("%012x", ids[1]))
+}
+
+func UserIdToHex(id int64) string {
+	return fmt.Sprintf("%012x", id)
+}
+
+func HexToUID(hex string) (int64, error) {
+	var originalID int
+	_, err := fmt.Sscanf(hex, "%x", &originalID)
+	if err != nil {
+		return int64(originalID), errors.New("用户id回转失败")
+	}
+	return int64(originalID), nil
+}