package es import ( "bytes" "context" "encoding/json" "fmt" "github.com/elastic/go-elasticsearch/v8" "net/http" "paopaoimtask/pkg/constants" "strconv" "strings" ) type TweetIndex struct { } const __tweetMapping = `{ "mappings": { "properties": { "id": { "type": "integer" }, "user_id": { "type": "integer" }, "contents": { "type": "nested", "properties": { "id": { "type": "integer" }, "post_id": { "type": "integer" }, "content": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "type": { "type": "integer" }, "sort": { "type": "integer" } } }, "comment_count": { "type": "integer" }, "collection_count": { "type": "integer" }, "share_count": { "type": "integer" }, "upvote_count": { "type": "integer" }, "visibility": { "type": "integer" }, "is_top": { "type": "integer" }, "is_essence": { "type": "integer" }, "is_lock": { "type": "integer" }, "latest_replied_on": { "type": "integer" }, "created_on": { "type": "integer" }, "modified_on": { "type": "integer" }, "tags": { "type": "object", "dynamic": true, "properties": { "*": { "type": "integer" } } }, "attachment_price": { "type": "integer" }, "ip_loc": { "type": "text" } } } }` type TweetEsData struct { Id int64 `json:"id"` UserId int64 `json:"user_id"` Contents []struct { ID int64 `json:"id"` PostID int64 `json:"post_id"` Content string `json:"content"` Type int32 `json:"type"` Sort int32 `json:"sort"` } `json:"contents"` CommentCount int64 `json:"comment_count"` CollectionCount int64 `json:"collection_count"` ShareCount int64 `json:"share_count"` UpvoteCount int64 `json:"upvote_count"` Visibility int32 `json:"visibility"` IsTop int32 `json:"is_top"` IsEssence int32 `json:"is_essence"` IsLock int32 `json:"is_lock"` LatestRepliedOn int64 `json:"latest_replied_on"` CreatedOn int64 `json:"created_on"` ModifiedOn int64 `json:"modified_on"` Tags map[string]int32 `json:"tags"` AttachmentPrice int64 `json:"attachment_price"` IpLoc string `json:"ip_loc"` } //func (t *TweetEsData) MarshalJSON() ([]byte, error) { // return json.Marshal(t) //} func (t *TweetIndex) Create(tsData *TweetEsData, esClient *elasticsearch.Client, esTweetIndex string) error { // 判断是否有索引 indexBody, err := esClient.Indices.Exists([]string{esTweetIndex}) if err != nil || indexBody.StatusCode == http.StatusNotFound { fmt.Println("aaaaaaaaaa:内庭院") cBody, cErr := esClient.Indices.Create(esTweetIndex, esClient.Indices.Create.WithBody(strings.NewReader(__tweetMapping))) if cErr != nil { return cErr } defer cBody.Body.Close() } defer indexBody.Body.Close() docId := tsData.Id doc, _ := json.Marshal(tsData) reader := bytes.NewReader(doc) indexRes, iErr := esClient.Index(esTweetIndex, reader, esClient.Index.WithDocumentID(strconv.FormatInt(docId, 10))) if iErr != nil { return iErr } defer indexRes.Body.Close() return nil } // Get tweet es 查询,1、按照用户id查询2、按照用户推文权限搜索3、搜索全文, 排序:1-按照用户热度排序2-获取最新发布的文章3-按照用户最后回复时间排序4-用户置顶排序 func (t *TweetIndex) Get(esClient *elasticsearch.Client, esTweetIndex string, searchType []constants.SearchType, sortType []constants.EsSortType, userId int64, userVisit int32, tweetContent string, limit int64, offset int64) (tweetInfos []*TweetEsData, total int64, err error) { var filters = make([]map[string]interface{}, 0) tweetInfos = make([]*TweetEsData, 0) total = 0 for _, st := range searchType { switch st { case constants.SearchByUserID: filters = append(filters, map[string]interface{}{ "term": map[string]interface{}{ "user_id": userId, }, }) break case constants.SearchByVisit: filters = append(filters, map[string]interface{}{ "term": map[string]interface{}{ "visibility": userVisit, }, }) break case constants.SearchByContent: filters = append(filters, map[string]interface{}{ "nested": map[string]interface{}{ "path": "contents", "query": map[string]interface{}{ "query_string": map[string]interface{}{ "query": "*" + tweetContent + "*", "fields": []string{"contents.content", "contents.content.keyword"}, }, }, }, }) break default: break } } var sorts = make([]map[string]interface{}, 0) for _, esSortType := range sortType { switch esSortType { case constants.SortByUserScore: fallthrough case constants.SortByUserCreateTime: sorts = append(sorts, map[string]interface{}{ "created_on": map[string]interface{}{ "order": "desc", }, }) break case constants.SortByUserLastReply: sorts = append(sorts, map[string]interface{}{ "latest_replied_on": map[string]interface{}{ "order": "desc", }, }) break case constants.SortByUserTop: sorts = append(sorts, map[string]interface{}{ "is_top": map[string]interface{}{ "order": "desc", }, }) break default: break } } var query map[string]interface{} var from, size int64 = offset, limit if limit <= 0 { size = 10 } if offset <= 0 { from = 0 } if len(filters) == 0 { query = map[string]interface{}{ "query": map[string]interface{}{ "match_all": map[string]interface{}{}, }, "sort": sorts, "from": 0, "size": 10, } } else { query = map[string]interface{}{ "query": map[string]interface{}{ "bool": map[string]interface{}{ "filter": filters, }, }, "sort": sorts, "from": from, "size": size, } } fmt.Println(query) marshal, _ := json.Marshal(query) result, err := esClient.Search( esClient.Search.WithContext(context.Background()), esClient.Search.WithIndex(esTweetIndex), esClient.Search.WithBody(bytes.NewReader(marshal)), ) if err != nil { fmt.Println("errr: ", err.Error()) return } defer result.Body.Close() if result.IsError() { fmt.Println("搜索报错了: ", result.String()) return } var rows map[string]interface{} if err = json.NewDecoder(result.Body).Decode(&rows); err != nil { return } if count, ok := rows["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"]; ok { total = int64(count.(float64)) } hits := rows["hits"].(map[string]interface{})["hits"].([]interface{}) for _, item := range hits { if row, ok := item.(map[string]interface{})["_source"]; ok { sourceByte, _ := json.Marshal(row) var doc TweetEsData err = json.Unmarshal(sourceByte, &doc) if err != nil { return } tweetInfos = append(tweetInfos, &doc) } } return } func (t *TweetIndex) Delete(id int64, esClient *elasticsearch.Client, esTweetIndex string) error { exists, err := esClient.Indices.Exists([]string{esTweetIndex}) if err != nil || exists.StatusCode == http.StatusNotFound { // 没有找到对应的索引,返回错误 return err } defer exists.Body.Close() response, err := esClient.Delete(esTweetIndex, strconv.FormatInt(id, 10)) if err != nil || response.StatusCode == http.StatusNotFound { return err } defer response.Body.Close() return nil } func (t *TweetIndex) Update(id int64, esClient *elasticsearch.Client, esTweetIndex string, values map[string]interface{}) error { exists, err := esClient.Indices.Exists([]string{esTweetIndex}) if err != nil || exists.StatusCode == http.StatusNotFound { // 没有找到对应的索引,返回错误 return err } defer exists.Body.Close() marshal, _ := json.Marshal(map[string]interface{}{"doc": values}) update, err := esClient.Update(esTweetIndex, strconv.FormatInt(id, 10), bytes.NewReader(marshal)) if err != nil || update.StatusCode == http.StatusNotFound { return err } defer update.Body.Close() return nil } func (t *TweetIndex) GetRow(id int64, esClient *elasticsearch.Client, esTweetIndex string) (tweetInfos *TweetEsData, err error) { exists, err := esClient.Indices.Exists([]string{esTweetIndex}) if err != nil || exists.StatusCode == http.StatusNotFound { // 没有找到对应的索引,返回错误 return nil, err } defer exists.Body.Close() res, err := esClient.Get(esTweetIndex, strconv.FormatInt(id, 10)) if err != nil { return nil, err } if res.IsError() { return nil, err } fmt.Println(res) var row map[string]interface{} if err = json.NewDecoder(res.Body).Decode(&row); err != nil { return nil, err } if found, ok := row["found"]; ok && !found.(bool) { return nil, nil } if source, ok := row["_source"]; ok { // 有数据 marshal, _ := json.Marshal(source) var doc TweetEsData err = json.Unmarshal(marshal, &doc) if err != nil { return nil, err } return &doc, nil } return nil, nil }