代码地址:https://gitee.com/lymgoforIT/golang-trick/tree/master/35-go-elasticsearch
go-elasticsearch
是Elasticsearch
官方提供的 Go
客户端。每个 Elasticsearch
版本会有一个对应的 go-elasticsearch
版本。官方会维护最近的两个主要版本。
go-elasticsearch
提供了 Low-level
和 Fully-typed
两套API
。本文以 Fully-typed API
为例介绍 go-elasticsearch
的常用方法。
本文接下来将以电商平台 “用户评价” 数据为例,演示
Go
语言Elasticsearch
客户端的相关操作。原文参考地址:https://mp.weixin.qq.com/s/Em-xPi2ZqBALiFX9ebb2fw
关于如何使用 docker 在本地搭建 Elasticsearch 环境请查看我之前的博客: 56.windows docker
安装ES、Go操作ES(github.com/olivere/elastic/v7库)
执行以下命令安装v8
版本的 go
客户端。
go get github.com/elastic/go-elasticsearch/v8@latest
import "github.com/elastic/go-elasticsearch/v8"
可以根据实际需求导入不同的客户端版本,也支持在一个项目中导入不同的客户端版本。
import (
elasticsearch7 "github.com/elastic/go-elasticsearch/v7"
elasticsearch8 "github.com/elastic/go-elasticsearch/v8"
)
// ...
es7, _ := elasticsearch7.NewDefaultClient()
es8, _ := elasticsearch8.NewDefaultClient()
指定要连接 ES
的相关配置,并创建客户端连接。
这里按实际工作中的习惯创建目录。如在项目下创建
dao
目录,其下面可能继续有MySQL、Redis、MQ、ES
等目录,本文只涉及到ES
,所以就是先建es
目录,接着下面建立base.go
用于创建连接和初始化等工作。之后的索引和文档等操作可以分别新建对应的.go
文件,当然,操作es
的函数很多时,也可以根据业务拆分建立.go
文件。
base.go
package es
import (
"fmt"
elasticsearch "github.com/elastic/go-elasticsearch/v8"
)
var EsClinet *elasticsearch.TypedClient
func InitEsConn() {
// ES 配置
cfg := elasticsearch.Config{
Addresses: []string{
"http://localhost:9200",
},
}
// 创建客户端连接
client, err := elasticsearch.NewTypedClient(cfg)
if err != nil {
fmt.Printf("elasticsearch.NewTypedClient failed, err:%v\n", err)
panic(err)
}
EsClinet = client
}
main.go
package main
import "golang-trick/35-go-elasticsearch/dao/es"
func main() {
// 初始化ES客户端全局连接
es.InitEsConn()
}
索引主要包含创建和删除操作
索引相关操作我们都放到index.go
文件中
package es
import (
"context"
"fmt"
)
// CreateIndex 创建索引
func CreateIndex(indexName string) error {
resp, err := EsClinet.Indices.
Create(indexName).
Do(context.Background())
if err != nil {
fmt.Printf("create index failed, err:%v\n", err)
return err
}
fmt.Printf("index:%#v\n", resp.Index)
return nil
}
// DeleteIndex 删除索引
func DeleteIndex(indexName string) error{
_, err := EsClinet.Indices. // 表明是对索引的操作,而Index则表示是要操作具体索引下的文档
Delete(indexName).
Do(context.Background())
if err != nil {
fmt.Printf("delete index failed,err:%v\n", err)
return err
}
fmt.Printf("delete index successed,indexName:%s", indexName)
return nil
}
创建一个名为 my-review-1
(用户评价) 的 index
。
main.go
package main
import "golang-trick/35-go-elasticsearch/dao/es"
func main() {
// 初始化ES客户端全局连接
es.InitEsConn()
es.CreateIndex("my-review-1")
}
本文主要是对用户评价做检索案例,所以需要建立model
,我们一般习惯在项目路径下建立model包
专门存放model
,入下:
定义与 document
数据对应的 Review
(评价数据) 和 Tag
(评价标签) 结构体。
review.go
// Review 评价数据
type Review struct {
ID int64 `json:"id"`
UserID int64 `json:"userID"`
Score uint8 `json:"score"`
Content string `json:"content"`
Tags []Tag `json:"tags"`
Status int `json:"status"`
PublishTime time.Time `json:"publishDate"`
}
// Tag 评价标签
type Tag struct {
Code int `json:"code"`
Title string `json:"title"`
}
创建一条 document
并添加到 my-review-1
的 index
中。
在es
包下创建review_doc.go
文件,表示是对于review
的文档相关操作。
review_doc.go
package es
import (
"context"
"fmt"
"golang-trick/35-go-elasticsearch/model"
"strconv"
)
// indexDocument 索引文档
func CreateDocument(review model.Review, indexName string) {
// 添加文档
resp, err := EsClinet.Index(indexName).
Id(strconv.FormatInt(review.ID, 10)). // 指定文档唯一ID
Document(review).
Do(context.Background())
if err != nil {
fmt.Printf("indexing document failed, err:%v\n", err)
return
}
fmt.Printf("result:%#v\n", resp.Result)
}
main.go
package main
import (
"golang-trick/35-go-elasticsearch/dao/es"
"golang-trick/35-go-elasticsearch/model"
"time"
)
func main() {
// 初始化ES客户端全局连接
es.InitEsConn()
// es.CreateIndex("my-review-1")
// 定义 document 结构体对象
d1 := model.Review{
ID: 1,
UserID: 147982601,
Score: 5,
Content: "这是一个好评!",
Tags: []model.Tag{
{1000, "好评"},
{1100, "物超所值"},
{9000, "有图"},
},
Status: 2,
PublishTime: time.Now(),
}
es.CreateDocument(d1, "my-review-1")
}
// GetDocument 根据文档ID获取文档
func GetDocumentByDocId(id string, indexName string) {
resp, err := EsClinet.Get(indexName, id).
Do(context.Background())
if err != nil {
fmt.Printf("get document by id failed, err:%v\n", err)
return
}
fmt.Printf("fileds:%s\n", resp.Source_)
}
构建搜索查询可以使用结构化的查询条件。
// SearchAllDocument 搜索指定索引下所有文档
func SearchAllDocument(indexName string) {
// 搜索文档
resp, err := EsClinet.Search().
Index(indexName).
Request(&search.Request{
Query: &types.Query{
MatchAll: &types.MatchAllQuery{},
},
}).
Do(context.Background())
if err != nil {
fmt.Printf("search document failed, err:%v\n", err)
return
}
fmt.Printf("total: %d\n", resp.Hits.Total.Value)
// 遍历所有结果
for _, hit := range resp.Hits.Hits {
fmt.Printf("%s\n", hit.Source_)
}
}
下面是在 my-review-1
中搜索 content
包含 “好评” 的文档。
// SearchDocument 指定条件搜索文档
func SearchDocument(indexName string) {
// 搜索content中包含好评的文档
resp, err := EsClinet.Search().
Index(indexName).
Request(&search.Request{
Query: &types.Query{
MatchPhrase: map[string]types.MatchPhraseQuery{
"content": {Query: "好评"},
},
},
}).
Do(context.Background())
if err != nil {
fmt.Printf("search document failed, err:%v\n", err)
return
}
fmt.Printf("total: %d\n", resp.Hits.Total.Value)
// 遍历所有结果
for _, hit := range resp.Hits.Hits {
fmt.Printf("%s\n", hit.Source_)
}
}
在 my-review-1
上运行一个平均值聚合,得到所有文档 score
的平均值。
// AggregationDemo 聚合
func AggregationDemo(indexName string) {
avgScoreAgg, err := EsClinet.Search().
Index(indexName).
Request(
&search.Request{
Size: some.Int(0),
Aggregations: map[string]types.Aggregations{
"avg_score": { // 将所有文档的 score 的平均值聚合为 avg_score
Avg: &types.AverageAggregation{
Field: some.String("score"),
},
},
},
},
).Do(context.Background())
if err != nil {
fmt.Printf("aggregation failed, err:%v\n", err)
return
}
fmt.Printf("avgScore:%#v\n", avgScoreAgg.Aggregations["avg_score"])
}
使用新值更新文档。
// UpdateDocument 更新文档
func UpdateDocument(review model.Review, indexName string) {
修改后的结构体变量
//d1 := Review{
// ID: 1,
// UserID: 147982601,
// Score: 5,
// Content: "这是一个修改后的好评!", // 有修改
// Tags: []Tag{ // 有修改
// {1000, "好评"},
// {9000, "有图"},
// },
// Status: 2,
// PublishTime: time.Now(),
//}
resp, err := EsClinet.Update(indexName, fmt.Sprintf("%d", review.ID)). // 通过唯一文档ID指定要更新的文档
Doc(review). // 使用结构体变量更新
Do(context.Background())
if err != nil {
fmt.Printf("update document failed, err:%v\n", err)
return
}
fmt.Printf("result:%v\n", resp.Result)
}
更新可以使用结构体变量也可以使用原始JSON
字符串数据。
// UpdateDocumentByJson 更新文档
func UpdateDocumentByJson(docId,str ,indexName string) {
修改后的JSON字符串
//str := `{
// "id":1,
// "userID":147982601,
// "score":5,
// "content":"这是一个二次修改后的好评!",
// "tags":[
// {
// "code":1000,
// "title":"好评"
// },
// {
// "code":9000,
// "title":"有图"
// }
// ],
// "status":2,
// "publishDate":"2023-12-16T15:27:18.219385+08:00"
//}`
// 直接使用JSON字符串更新
resp, err := EsClinet.Update(indexName, docId).
Request(&update.Request{
Doc: json.RawMessage(str),
}).
Do(context.Background())
if err != nil {
fmt.Printf("update document failed, err:%v\n", err)
return
}
fmt.Printf("result:%v\n", resp.Result)
}
根据文档 id
删除文档。
// DeleteDocument 删除 document
func DeleteDocument(docId,indexName string) {
resp, err := EsClinet.Delete(indexName, docId).
Do(context.Background())
if err != nil {
fmt.Printf("delete document failed, err:%v\n", err)
return
}
fmt.Printf("result:%v\n", resp.Result)
}
package es
import (
"context"
"encoding/json"
"fmt"
"github.com/elastic/go-elasticsearch/v8/typedapi/core/search"
"github.com/elastic/go-elasticsearch/v8/typedapi/core/update"
"github.com/elastic/go-elasticsearch/v8/typedapi/some"
"github.com/elastic/go-elasticsearch/v8/typedapi/types"
"golang-trick/35-go-elasticsearch/model"
"strconv"
)
// indexDocument 索引文档
func CreateDocument(review model.Review, indexName string) {
// 添加文档
resp, err := EsClinet.Index(indexName). // 表示是要操作具体索引下的文档
Id(strconv.FormatInt(review.ID, 10)). // 指定文档唯一ID
Document(review).
Do(context.Background())
if err != nil {
fmt.Printf("indexing document failed, err:%v\n", err)
return
}
fmt.Printf("result:%#v\n", resp.Result)
}
// GetDocument 根据文档ID获取文档
func GetDocumentByDocId(id string, indexName string) {
resp, err := EsClinet.Get(indexName, id).
Do(context.Background())
if err != nil {
fmt.Printf("get document by id failed, err:%v\n", err)
return
}
fmt.Printf("fileds:%s\n", resp.Source_)
}
// SearchAllDocument 搜索指定索引下所有文档
func SearchAllDocument(indexName string) {
// 搜索文档
resp, err := EsClinet.Search().
Index(indexName).
Request(&search.Request{
Query: &types.Query{
MatchAll: &types.MatchAllQuery{},
},
}).
Do(context.Background())
if err != nil {
fmt.Printf("search document failed, err:%v\n", err)
return
}
fmt.Printf("total: %d\n", resp.Hits.Total.Value)
// 遍历所有结果
for _, hit := range resp.Hits.Hits {
fmt.Printf("%s\n", hit.Source_)
}
}
// SearchDocument 指定条件搜索文档
func SearchDocument(indexName string) {
// 搜索content中包含好评的文档
resp, err := EsClinet.Search().
Index(indexName).
Request(&search.Request{
Query: &types.Query{
MatchPhrase: map[string]types.MatchPhraseQuery{
"content": {Query: "好评"},
},
},
}).
Do(context.Background())
if err != nil {
fmt.Printf("search document failed, err:%v\n", err)
return
}
fmt.Printf("total: %d\n", resp.Hits.Total.Value)
// 遍历所有结果
for _, hit := range resp.Hits.Hits {
fmt.Printf("%s\n", hit.Source_)
}
}
// AggregationDemo 聚合
func AggregationDemo(indexName string) {
avgScoreAgg, err := EsClinet.Search().
Index(indexName).
Request(
&search.Request{
Size: some.Int(0),
Aggregations: map[string]types.Aggregations{
"avg_score": { // 将所有文档的 score 的平均值聚合为 avg_score
Avg: &types.AverageAggregation{
Field: some.String("score"),
},
},
},
},
).Do(context.Background())
if err != nil {
fmt.Printf("aggregation failed, err:%v\n", err)
return
}
fmt.Printf("avgScore:%#v\n", avgScoreAgg.Aggregations["avg_score"])
}
// UpdateDocument 更新文档
func UpdateDocument(review model.Review, indexName string) {
修改后的结构体变量
//d1 := Review{
// ID: 1,
// UserID: 147982601,
// Score: 5,
// Content: "这是一个修改后的好评!", // 有修改
// Tags: []Tag{ // 有修改
// {1000, "好评"},
// {9000, "有图"},
// },
// Status: 2,
// PublishTime: time.Now(),
//}
resp, err := EsClinet.Update(indexName, fmt.Sprintf("%d", review.ID)). // 通过唯一文档ID指定要更新的文档
Doc(review). // 使用结构体变量更新
Do(context.Background())
if err != nil {
fmt.Printf("update document failed, err:%v\n", err)
return
}
fmt.Printf("result:%v\n", resp.Result)
}
// UpdateDocumentByJson 更新文档
func UpdateDocumentByJson(docId, str, indexName string) {
修改后的JSON字符串
//str := `{
// "id":1,
// "userID":147982601,
// "score":5,
// "content":"这是一个二次修改后的好评!",
// "tags":[
// {
// "code":1000,
// "title":"好评"
// },
// {
// "code":9000,
// "title":"有图"
// }
// ],
// "status":2,
// "publishDate":"2023-12-16T15:27:18.219385+08:00"
//}`
// 直接使用JSON字符串更新
resp, err := EsClinet.Update(indexName, docId).
Request(&update.Request{
Doc: json.RawMessage(str),
}).
Do(context.Background())
if err != nil {
fmt.Printf("update document failed, err:%v\n", err)
return
}
fmt.Printf("result:%v\n", resp.Result)
}
// DeleteDocument 删除 document
func DeleteDocument(docId, indexName string) {
resp, err := EsClinet.Delete(indexName, docId).
Do(context.Background())
if err != nil {
fmt.Printf("delete document failed, err:%v\n", err)
return
}
fmt.Printf("result:%v\n", resp.Result)
}