| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358 |
- package es
- import (
- "bytes"
- "context"
- "encoding/json"
- "fmt"
- "github.com/elastic/go-elasticsearch/v8"
- "net/http"
- "paopaoimtask/pkg/constants"
- "strconv"
- "strings"
- )
- type TweetIndex struct {
- }
- const __tweetMapping = `{
- "mappings": {
- "properties": {
- "id": {
- "type": "integer"
- },
- "user_id": {
- "type": "integer"
- },
- "contents": {
- "type": "nested",
- "properties": {
- "id": {
- "type": "integer"
- },
- "post_id": {
- "type": "integer"
- },
- "content": {
- "type": "text",
- "fields": {
- "keyword": {
- "type": "keyword",
- "ignore_above": 256
- }
- }
- },
- "type": {
- "type": "integer"
- },
- "sort": {
- "type": "integer"
- }
- }
- },
- "comment_count": {
- "type": "integer"
- },
- "collection_count": {
- "type": "integer"
- },
- "share_count": {
- "type": "integer"
- },
- "upvote_count": {
- "type": "integer"
- },
- "visibility": {
- "type": "integer"
- },
- "is_top": {
- "type": "integer"
- },
- "is_essence": {
- "type": "integer"
- },
- "is_lock": {
- "type": "integer"
- },
- "latest_replied_on": {
- "type": "integer"
- },
- "created_on": {
- "type": "integer"
- },
- "modified_on": {
- "type": "integer"
- },
- "tags": {
- "type": "object",
- "dynamic": true,
- "properties": {
- "*": {
- "type": "integer"
- }
- }
- },
- "attachment_price": {
- "type": "integer"
- },
- "ip_loc": {
- "type": "text"
- }
- }
- }
- }`
- type TweetEsData struct {
- Id int64 `json:"id"`
- UserId int64 `json:"user_id"`
- Contents []struct {
- ID int64 `json:"id"`
- PostID int64 `json:"post_id"`
- Content string `json:"content"`
- Type int32 `json:"type"`
- Sort int32 `json:"sort"`
- } `json:"contents"`
- CommentCount int64 `json:"comment_count"`
- CollectionCount int64 `json:"collection_count"`
- ShareCount int64 `json:"share_count"`
- UpvoteCount int64 `json:"upvote_count"`
- Visibility int32 `json:"visibility"`
- IsTop int32 `json:"is_top"`
- IsEssence int32 `json:"is_essence"`
- IsLock int32 `json:"is_lock"`
- LatestRepliedOn int64 `json:"latest_replied_on"`
- CreatedOn int64 `json:"created_on"`
- ModifiedOn int64 `json:"modified_on"`
- Tags map[string]int32 `json:"tags"`
- AttachmentPrice int64 `json:"attachment_price"`
- IpLoc string `json:"ip_loc"`
- }
- //func (t *TweetEsData) MarshalJSON() ([]byte, error) {
- // return json.Marshal(t)
- //}
- func (t *TweetIndex) Create(tsData *TweetEsData, esClient *elasticsearch.Client, esTweetIndex string) error {
- // 判断是否有索引
- indexBody, err := esClient.Indices.Exists([]string{esTweetIndex})
- if err != nil || indexBody.StatusCode == http.StatusNotFound {
- fmt.Println("aaaaaaaaaa:内庭院")
- cBody, cErr := esClient.Indices.Create(esTweetIndex, esClient.Indices.Create.WithBody(strings.NewReader(__tweetMapping)))
- if cErr != nil {
- return cErr
- }
- defer cBody.Body.Close()
- }
- defer indexBody.Body.Close()
- docId := tsData.Id
- doc, _ := json.Marshal(tsData)
- reader := bytes.NewReader(doc)
- indexRes, iErr := esClient.Index(esTweetIndex, reader, esClient.Index.WithDocumentID(strconv.FormatInt(docId, 10)))
- if iErr != nil {
- return iErr
- }
- defer indexRes.Body.Close()
- return nil
- }
- // Get tweet es 查询,1、按照用户id查询2、按照用户推文权限搜索3、搜索全文, 排序:1-按照用户热度排序2-获取最新发布的文章3-按照用户最后回复时间排序4-用户置顶排序
- func (t *TweetIndex) Get(esClient *elasticsearch.Client, esTweetIndex string, searchType []constants.SearchType, sortType []constants.EsSortType, userId int64, userVisit int32, tweetContent string, limit int64, offset int64) (tweetInfos []*TweetEsData, total int64, err error) {
- var filters = make([]map[string]interface{}, 0)
- tweetInfos = make([]*TweetEsData, 0)
- total = 0
- for _, st := range searchType {
- switch st {
- case constants.SearchByUserID:
- filters = append(filters, map[string]interface{}{
- "term": map[string]interface{}{
- "user_id": userId,
- },
- })
- break
- case constants.SearchByVisit:
- filters = append(filters, map[string]interface{}{
- "term": map[string]interface{}{
- "visibility": userVisit,
- },
- })
- break
- case constants.SearchByContent:
- filters = append(filters, map[string]interface{}{
- "nested": map[string]interface{}{
- "path": "contents",
- "query": map[string]interface{}{
- "query_string": map[string]interface{}{
- "query": "*" + tweetContent + "*",
- "fields": []string{"contents.content", "contents.content.keyword"},
- },
- },
- },
- })
- break
- default:
- break
- }
- }
- var sorts = make([]map[string]interface{}, 0)
- for _, esSortType := range sortType {
- switch esSortType {
- case constants.SortByUserScore:
- fallthrough
- case constants.SortByUserCreateTime:
- sorts = append(sorts, map[string]interface{}{
- "created_on": map[string]interface{}{
- "order": "desc",
- },
- })
- break
- case constants.SortByUserLastReply:
- sorts = append(sorts, map[string]interface{}{
- "latest_replied_on": map[string]interface{}{
- "order": "desc",
- },
- })
- break
- case constants.SortByUserTop:
- sorts = append(sorts, map[string]interface{}{
- "is_top": map[string]interface{}{
- "order": "desc",
- },
- })
- break
- default:
- break
- }
- }
- var query map[string]interface{}
- var from, size int64 = offset, limit
- if limit <= 0 {
- size = 10
- }
- if offset <= 0 {
- from = 0
- }
- if len(filters) == 0 {
- query = map[string]interface{}{
- "query": map[string]interface{}{
- "match_all": map[string]interface{}{},
- },
- "sort": sorts,
- "from": 0,
- "size": 10,
- }
- } else {
- query = map[string]interface{}{
- "query": map[string]interface{}{
- "bool": map[string]interface{}{
- "filter": filters,
- },
- },
- "sort": sorts,
- "from": from,
- "size": size,
- }
- }
- fmt.Println(query)
- marshal, _ := json.Marshal(query)
- result, err := esClient.Search(
- esClient.Search.WithContext(context.Background()),
- esClient.Search.WithIndex(esTweetIndex),
- esClient.Search.WithBody(bytes.NewReader(marshal)),
- )
- if err != nil {
- fmt.Println("errr: ", err.Error())
- return
- }
- defer result.Body.Close()
- if result.IsError() {
- fmt.Println("搜索报错了: ", result.String())
- return
- }
- var rows map[string]interface{}
- if err = json.NewDecoder(result.Body).Decode(&rows); err != nil {
- return
- }
- if count, ok := rows["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"]; ok {
- total = int64(count.(float64))
- }
- hits := rows["hits"].(map[string]interface{})["hits"].([]interface{})
- for _, item := range hits {
- if row, ok := item.(map[string]interface{})["_source"]; ok {
- sourceByte, _ := json.Marshal(row)
- var doc TweetEsData
- err = json.Unmarshal(sourceByte, &doc)
- if err != nil {
- return
- }
- tweetInfos = append(tweetInfos, &doc)
- }
- }
- return
- }
- func (t *TweetIndex) Delete(id int64, esClient *elasticsearch.Client, esTweetIndex string) error {
- exists, err := esClient.Indices.Exists([]string{esTweetIndex})
- if err != nil || exists.StatusCode == http.StatusNotFound {
- // 没有找到对应的索引,返回错误
- return err
- }
- defer exists.Body.Close()
- response, err := esClient.Delete(esTweetIndex, strconv.FormatInt(id, 10))
- if err != nil || response.StatusCode == http.StatusNotFound {
- return err
- }
- defer response.Body.Close()
- return nil
- }
- func (t *TweetIndex) Update(id int64, esClient *elasticsearch.Client, esTweetIndex string, values map[string]interface{}) error {
- exists, err := esClient.Indices.Exists([]string{esTweetIndex})
- if err != nil || exists.StatusCode == http.StatusNotFound {
- // 没有找到对应的索引,返回错误
- return err
- }
- defer exists.Body.Close()
- marshal, _ := json.Marshal(map[string]interface{}{"doc": values})
- update, err := esClient.Update(esTweetIndex, strconv.FormatInt(id, 10), bytes.NewReader(marshal))
- if err != nil || update.StatusCode == http.StatusNotFound {
- return err
- }
- defer update.Body.Close()
- return nil
- }
- func (t *TweetIndex) GetRow(id int64, esClient *elasticsearch.Client, esTweetIndex string) (tweetInfos *TweetEsData, err error) {
- exists, err := esClient.Indices.Exists([]string{esTweetIndex})
- if err != nil || exists.StatusCode == http.StatusNotFound {
- // 没有找到对应的索引,返回错误
- return nil, err
- }
- defer exists.Body.Close()
- res, err := esClient.Get(esTweetIndex, strconv.FormatInt(id, 10))
- if err != nil {
- return nil, err
- }
- if res.IsError() {
- return nil, err
- }
- fmt.Println(res)
- var row map[string]interface{}
- if err = json.NewDecoder(res.Body).Decode(&row); err != nil {
- return nil, err
- }
- if found, ok := row["found"]; ok && !found.(bool) {
- return nil, nil
- }
- if source, ok := row["_source"]; ok {
- // 有数据
- marshal, _ := json.Marshal(source)
- var doc TweetEsData
- err = json.Unmarshal(marshal, &doc)
- if err != nil {
- return nil, err
- }
- return &doc, nil
- }
- return nil, nil
- }
|