某些业务场景下,我们要对数组元素进行聚合查询,就需要用到nested数据类型,但部分场景是针对数组中指定的元素进行聚合,于是就需要对数组元素进行限定,今天来看看这种场景如何实现。
首先我们准备一下模拟数据:一个订单下有各个商品,我们需要统计销售额前3的生鲜商品
mapping如下:
PUT orders
{
"settings": {
"number_of_replicas": 0,
"number_of_shards": 1
},
"mappings": {
"properties": {
"order_no": {
"type": "keyword"
},
"create_user": {
"type": "keyword"
},
"product_list":{
"type": "nested",
"properties": {
"name": {
"type": "keyword"
},
"number": {
"type": "integer"
},
"type": {
"type": "keyword"
}
}
}
}
}
}
提供点模拟数据,供大家演练
POST orders/_bulk
{"index":{}}
{"order_no":"1","create_user":"张三","product_list":[{"name":"苹果", "number":2, "type":"生鲜"},{"name":"水壶", "number":2, "type":"百货"},{"name":"香蕉", "number":4, "type":"生鲜"}]}
{"index":{}}
{"order_no":"2","create_user":"李四","product_list":[{"name":"榴莲", "number":1, "type":"生鲜"},{"name":"小米手机", "number":2, "type":"百货"},{"name":"鲫鱼", "number":5, "type":"生鲜"}]}
{"index":{}}
{"order_no":"2","create_user":"周吴","product_list":[{"name":"苹果", "number":4, "type":"生鲜"},{"name":"香梨", "number":2, "type":"生鲜"},{"name":"毛巾", "number":4, "type":"百货"}]}
以下实现基于elasticsearch 7.13.0
版本
1、我们来看直接查询的情况
GET orders/_search
{
"size": 0,
"aggs": {
"product_agg": {
"nested": {
"path": "product_list"
},
"aggs": {
"name_agg": {
"terms": {
"field": "product_list.name"
},
"aggs": {
"number_sum": {
"sum": {
"field": "product_list.number"
}
},
"sort": {
"bucket_sort": {
"sort": [
{
"number_sum.value": {
"order": "desc"
}
}
],
"size": 3
}
}
}
}
}
}
}
}
不做商品类型筛选的话,可以看到我们通过nested
聚合来实现统计,通过bucket_sort
来实现排序,同时再用size
参数限定下前3输出,即可实现我们的需求。
nested agg官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/7.13/search-aggregations-bucket-nested-aggregation.html
bucket sort官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/7.13/search-aggregations-pipeline-bucket-sort-aggregation.html
但是从输出结果很明显的看出,这不是我们想要的结果,我们需要对数组内的type
属性做一下筛选,只需要“生鲜”商品
而实现上es中提供了filter
参数供我们来筛选聚合元素,我们可以利用它来实现聚合条件的筛选,如下,在官方文档中的filter
用法如下:
这里需要注意的是,要把嵌套的聚合新写一个agg
然后放到filter
同级下,也就是作为filter
的子聚合;即将filter聚合的结果,作为下一次聚合的入参
GET orders/_search
{
"size": 0,
"aggs": {
"product_agg": {
"nested": {
"path": "product_list"
},
"aggs": {
"type_filter": {
"filter": {
"term": {
"product_list.type": "生鲜"
}
},
"aggs": {
"name_agg": {
"terms": {
"field": "product_list.name"
},
"aggs": {
"number_sum": {
"sum": {
"field": "product_list.number"
}
},
"sort": {
"bucket_sort": {
"sort": [
{
"number_sum.value": {
"order": "desc"
}
}
],
"size": 3
}
}
}
}
}
}
}
}
}
}
执行出来的结果如下,符合预期
另外再附加上java client的实现代码,供大家参考,这里使用了RestHighLevelClient
作为客户端
@Resource
private RestHighLevelClient restClient;
public void test() throws IOException {
String outAggName = "product_agg";
String filterAggName = "type_filter";
String termAggName = "name_agg";
TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms(termAggName).field("product_list.name");
SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
searchBuilder.aggregation(AggregationBuilders.nested(outAggName, "product_list")
.subAggregation(AggregationBuilders.filter(filterAggName, new TermQueryBuilder("product_list.type", "生鲜"))
.subAggregation(termsAggregationBuilder
.subAggregation(AggregationBuilders.sum("number_sum").field("product_list.number"))
.order(BucketOrder.aggregation("number_sum.value", false)))));
searchBuilder.size(0);
SearchRequest searchRequest = new SearchRequest("orders");
searchRequest.source(searchBuilder);
SearchResponse response = restClient.search(searchRequest, RequestOptions.DEFAULT);
Map<String, Aggregation> resultMap = response.getAggregations().getAsMap();
ParsedNested aggResults = (ParsedNested) resultMap.get(outAggName);
ParsedFilter filterAgg = aggResults.getAggregations().get(filterAggName);
ParsedStringTerms termAgg = filterAgg.getAggregations().get(termAggName);
List<? extends Terms.Bucket> buckets = termAgg.getBuckets();
for (Terms.Bucket bucket : buckets) {
// TODO 获取分组结果
}
}