tweet_es.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. package es
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "github.com/elastic/go-elasticsearch/v8"
  8. "net/http"
  9. "paopaoimtask/pkg/constants"
  10. "strconv"
  11. "strings"
  12. )
  13. type TweetIndex struct {
  14. }
  15. const __tweetMapping = `{
  16. "mappings": {
  17. "properties": {
  18. "id": {
  19. "type": "integer"
  20. },
  21. "user_id": {
  22. "type": "integer"
  23. },
  24. "contents": {
  25. "type": "nested",
  26. "properties": {
  27. "id": {
  28. "type": "integer"
  29. },
  30. "post_id": {
  31. "type": "integer"
  32. },
  33. "content": {
  34. "type": "text",
  35. "fields": {
  36. "keyword": {
  37. "type": "keyword",
  38. "ignore_above": 256
  39. }
  40. }
  41. },
  42. "type": {
  43. "type": "integer"
  44. },
  45. "sort": {
  46. "type": "integer"
  47. }
  48. }
  49. },
  50. "comment_count": {
  51. "type": "integer"
  52. },
  53. "collection_count": {
  54. "type": "integer"
  55. },
  56. "share_count": {
  57. "type": "integer"
  58. },
  59. "upvote_count": {
  60. "type": "integer"
  61. },
  62. "visibility": {
  63. "type": "integer"
  64. },
  65. "is_top": {
  66. "type": "integer"
  67. },
  68. "is_essence": {
  69. "type": "integer"
  70. },
  71. "is_lock": {
  72. "type": "integer"
  73. },
  74. "latest_replied_on": {
  75. "type": "integer"
  76. },
  77. "created_on": {
  78. "type": "integer"
  79. },
  80. "modified_on": {
  81. "type": "integer"
  82. },
  83. "tags": {
  84. "type": "object",
  85. "dynamic": true,
  86. "properties": {
  87. "*": {
  88. "type": "integer"
  89. }
  90. }
  91. },
  92. "attachment_price": {
  93. "type": "integer"
  94. },
  95. "ip_loc": {
  96. "type": "text"
  97. }
  98. }
  99. }
  100. }`
  101. type TweetEsData struct {
  102. Id int64 `json:"id"`
  103. UserId int64 `json:"user_id"`
  104. Contents []struct {
  105. ID int64 `json:"id"`
  106. PostID int64 `json:"post_id"`
  107. Content string `json:"content"`
  108. Type int32 `json:"type"`
  109. Sort int32 `json:"sort"`
  110. } `json:"contents"`
  111. CommentCount int64 `json:"comment_count"`
  112. CollectionCount int64 `json:"collection_count"`
  113. ShareCount int64 `json:"share_count"`
  114. UpvoteCount int64 `json:"upvote_count"`
  115. Visibility int32 `json:"visibility"`
  116. IsTop int32 `json:"is_top"`
  117. IsEssence int32 `json:"is_essence"`
  118. IsLock int32 `json:"is_lock"`
  119. LatestRepliedOn int64 `json:"latest_replied_on"`
  120. CreatedOn int64 `json:"created_on"`
  121. ModifiedOn int64 `json:"modified_on"`
  122. Tags map[string]int32 `json:"tags"`
  123. AttachmentPrice int64 `json:"attachment_price"`
  124. IpLoc string `json:"ip_loc"`
  125. }
  126. //func (t *TweetEsData) MarshalJSON() ([]byte, error) {
  127. // return json.Marshal(t)
  128. //}
  129. func (t *TweetIndex) Create(tsData *TweetEsData, esClient *elasticsearch.Client, esTweetIndex string) error {
  130. // 判断是否有索引
  131. indexBody, err := esClient.Indices.Exists([]string{esTweetIndex})
  132. if err != nil || indexBody.StatusCode == http.StatusNotFound {
  133. fmt.Println("aaaaaaaaaa:内庭院")
  134. cBody, cErr := esClient.Indices.Create(esTweetIndex, esClient.Indices.Create.WithBody(strings.NewReader(__tweetMapping)))
  135. if cErr != nil {
  136. return cErr
  137. }
  138. defer cBody.Body.Close()
  139. }
  140. defer indexBody.Body.Close()
  141. docId := tsData.Id
  142. doc, _ := json.Marshal(tsData)
  143. reader := bytes.NewReader(doc)
  144. indexRes, iErr := esClient.Index(esTweetIndex, reader, esClient.Index.WithDocumentID(strconv.FormatInt(docId, 10)))
  145. if iErr != nil {
  146. return iErr
  147. }
  148. defer indexRes.Body.Close()
  149. return nil
  150. }
  151. // Get tweet es 查询,1、按照用户id查询2、按照用户推文权限搜索3、搜索全文, 排序:1-按照用户热度排序2-获取最新发布的文章3-按照用户最后回复时间排序4-用户置顶排序
  152. func (t *TweetIndex) Get(esClient *elasticsearch.Client, esTweetIndex string, searchType []constants.SearchType, sortType []constants.EsSortType, userId int64, userVisit int32, tweetContent string, limit int64, offset int64) (tweetInfos []*TweetEsData, total int64, err error) {
  153. var filters = make([]map[string]interface{}, 0)
  154. tweetInfos = make([]*TweetEsData, 0)
  155. total = 0
  156. for _, st := range searchType {
  157. switch st {
  158. case constants.SearchByUserID:
  159. filters = append(filters, map[string]interface{}{
  160. "term": map[string]interface{}{
  161. "user_id": userId,
  162. },
  163. })
  164. break
  165. case constants.SearchByVisit:
  166. filters = append(filters, map[string]interface{}{
  167. "term": map[string]interface{}{
  168. "visibility": userVisit,
  169. },
  170. })
  171. break
  172. case constants.SearchByContent:
  173. filters = append(filters, map[string]interface{}{
  174. "nested": map[string]interface{}{
  175. "path": "contents",
  176. "query": map[string]interface{}{
  177. "query_string": map[string]interface{}{
  178. "query": "*" + tweetContent + "*",
  179. "fields": []string{"contents.content", "contents.content.keyword"},
  180. },
  181. },
  182. },
  183. })
  184. break
  185. default:
  186. break
  187. }
  188. }
  189. var sorts = make([]map[string]interface{}, 0)
  190. for _, esSortType := range sortType {
  191. switch esSortType {
  192. case constants.SortByUserScore:
  193. fallthrough
  194. case constants.SortByUserCreateTime:
  195. sorts = append(sorts, map[string]interface{}{
  196. "created_on": map[string]interface{}{
  197. "order": "desc",
  198. },
  199. })
  200. break
  201. case constants.SortByUserLastReply:
  202. sorts = append(sorts, map[string]interface{}{
  203. "latest_replied_on": map[string]interface{}{
  204. "order": "desc",
  205. },
  206. })
  207. break
  208. case constants.SortByUserTop:
  209. sorts = append(sorts, map[string]interface{}{
  210. "is_top": map[string]interface{}{
  211. "order": "desc",
  212. },
  213. })
  214. break
  215. default:
  216. break
  217. }
  218. }
  219. var query map[string]interface{}
  220. var from, size int64 = offset, limit
  221. if limit <= 0 {
  222. size = 10
  223. }
  224. if offset <= 0 {
  225. from = 0
  226. }
  227. if len(filters) == 0 {
  228. query = map[string]interface{}{
  229. "query": map[string]interface{}{
  230. "match_all": map[string]interface{}{},
  231. },
  232. "sort": sorts,
  233. "from": 0,
  234. "size": 10,
  235. }
  236. } else {
  237. query = map[string]interface{}{
  238. "query": map[string]interface{}{
  239. "bool": map[string]interface{}{
  240. "filter": filters,
  241. },
  242. },
  243. "sort": sorts,
  244. "from": from,
  245. "size": size,
  246. }
  247. }
  248. fmt.Println(query)
  249. marshal, _ := json.Marshal(query)
  250. result, err := esClient.Search(
  251. esClient.Search.WithContext(context.Background()),
  252. esClient.Search.WithIndex(esTweetIndex),
  253. esClient.Search.WithBody(bytes.NewReader(marshal)),
  254. )
  255. if err != nil {
  256. fmt.Println("errr: ", err.Error())
  257. return
  258. }
  259. defer result.Body.Close()
  260. if result.IsError() {
  261. fmt.Println("搜索报错了: ", result.String())
  262. return
  263. }
  264. var rows map[string]interface{}
  265. if err = json.NewDecoder(result.Body).Decode(&rows); err != nil {
  266. return
  267. }
  268. if count, ok := rows["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"]; ok {
  269. total = int64(count.(float64))
  270. }
  271. hits := rows["hits"].(map[string]interface{})["hits"].([]interface{})
  272. for _, item := range hits {
  273. if row, ok := item.(map[string]interface{})["_source"]; ok {
  274. sourceByte, _ := json.Marshal(row)
  275. var doc TweetEsData
  276. err = json.Unmarshal(sourceByte, &doc)
  277. if err != nil {
  278. return
  279. }
  280. tweetInfos = append(tweetInfos, &doc)
  281. }
  282. }
  283. return
  284. }
  285. func (t *TweetIndex) Delete(id int64, esClient *elasticsearch.Client, esTweetIndex string) error {
  286. exists, err := esClient.Indices.Exists([]string{esTweetIndex})
  287. if err != nil || exists.StatusCode == http.StatusNotFound {
  288. // 没有找到对应的索引,返回错误
  289. return err
  290. }
  291. defer exists.Body.Close()
  292. response, err := esClient.Delete(esTweetIndex, strconv.FormatInt(id, 10))
  293. if err != nil || response.StatusCode == http.StatusNotFound {
  294. return err
  295. }
  296. defer response.Body.Close()
  297. return nil
  298. }
  299. func (t *TweetIndex) Update(id int64, esClient *elasticsearch.Client, esTweetIndex string, values map[string]interface{}) error {
  300. exists, err := esClient.Indices.Exists([]string{esTweetIndex})
  301. if err != nil || exists.StatusCode == http.StatusNotFound {
  302. // 没有找到对应的索引,返回错误
  303. return err
  304. }
  305. defer exists.Body.Close()
  306. marshal, _ := json.Marshal(map[string]interface{}{"doc": values})
  307. update, err := esClient.Update(esTweetIndex, strconv.FormatInt(id, 10), bytes.NewReader(marshal))
  308. if err != nil || update.StatusCode == http.StatusNotFound {
  309. return err
  310. }
  311. defer update.Body.Close()
  312. return nil
  313. }
  314. func (t *TweetIndex) GetRow(id int64, esClient *elasticsearch.Client, esTweetIndex string) (tweetInfos *TweetEsData, err error) {
  315. exists, err := esClient.Indices.Exists([]string{esTweetIndex})
  316. if err != nil || exists.StatusCode == http.StatusNotFound {
  317. // 没有找到对应的索引,返回错误
  318. return nil, err
  319. }
  320. defer exists.Body.Close()
  321. res, err := esClient.Get(esTweetIndex, strconv.FormatInt(id, 10))
  322. if err != nil {
  323. return nil, err
  324. }
  325. if res.IsError() {
  326. return nil, err
  327. }
  328. fmt.Println(res)
  329. var row map[string]interface{}
  330. if err = json.NewDecoder(res.Body).Decode(&row); err != nil {
  331. return nil, err
  332. }
  333. if found, ok := row["found"]; ok && !found.(bool) {
  334. return nil, nil
  335. }
  336. if source, ok := row["_source"]; ok {
  337. // 有数据
  338. marshal, _ := json.Marshal(source)
  339. var doc TweetEsData
  340. err = json.Unmarshal(marshal, &doc)
  341. if err != nil {
  342. return nil, err
  343. }
  344. return &doc, nil
  345. }
  346. return nil, nil
  347. }