ElasticSearch的两个主要功能:
日志采集(服务器、移动设备、IOT传感器) -> 数据传输汇聚(Kafka、LogStash) -> 数据存储索引(ElasticSearch集群) -> 异常定位与监控(全文检索、结构化查询、数据可视化)
ES是一个基于内存的数据检索与分析引擎,我们如果有十分多的数据需要保存的话,我们可以构建多台服务器,利用ES天然适配于分布式的属性对ES中存储的数据进行分片存储
涉及到ES的问题,尽量不要使用分步查询,一步到位,否则第二次请求要发送的数据可能会相当大,导致网络崩溃
创建索引:(注意点:对于只需要看,不需要搜索的信息,可以把index属性和doc_values属性设置为false,对于内部的另一个属性,要标记为nested,否则会产生问题)
一个数据模型的映射示例:
PUT product
{
"mappings":{
"properties": {
"skuId":{ "type": "long" },
"spuId":{ "type": "keyword" },
"skuTitle": {
"type": "text",
"analyzer": "ik_smart"
},
"skuPrice": { "type": "keyword" },
"skuImg" : { "type": "keyword" },
"saleCount":{ "type":"long" },
"hasStock": { "type": "boolean" },
"hotScore": { "type": "long" },
"brandId": { "type": "long" },
"catalogId": { "type": "long" },
"brandName": {"type": "keyword"},
"brandImg":{
"type": "keyword",
"index": false,
"doc_values": false
},
"catalogName": {"type": "keyword" },
"attrs": {
"type": "nested",
"properties": {
"attrId": {"type": "long" },
"attrName": {
"type": "keyword",
"index": false,
"doc_values": false
},
"attrValue": {"type": "keyword" }
}
}
}
}
}
一定要使用nested的原因:
ES在存储时不会按照对象存储,而是按照属性,把所有同一个字段的内容都聚合成一个数组(用来便于全文检索),而我们不希望我们的内嵌的对象被扁平化处理,希望能够精确查找,所以我们标注nested来让ES不对该属性进行扁平化处理
user.name=["aaa","bbb"]
user.addr=["ccc","ddd"]
这种存储方式,可能会发生如下错误:
错误检索到{aaa,ddd},这个组合是不存在的
我们提前准备好所有要保存的信息,建立Vo类,使用这个VO类进行保存
在search包中创建一个controller:
/**
* 上架商品
*/
@PostMapping("/product")
public R productStatusUp(@RequestBody List<SkuEsModel> skuEsModelList) {
// 由于我们在网络可能出现问题的部分Throw 了 IOException,故我们需要使用 try-catch 进行捕获
try{
productSaveService.productStatusUp(skuEsModelList);
} catch (Exception e) {
return R.error(PRODUCT_UP_EXCEPTION.getCode(), PRODUCT_UP_EXCEPTION.getMsg());
};
return R.ok();
}
}
具体操作还得再Impl中做:
/**
* 上架商品
*
* @param skuEsModelList
* @return
*/
@Override
public boolean productStatusUp(List<SkuEsModel> skuEsModelList) throws IOException {
/**
* 向ES中保存数据.bulk(BulkRequest, RequestOptions)
* BulkRequest中存储我们要保存的数据配置(索引、id等)
* RequestOptions我们提前在Config中进行全局定义,用来配置我们发送请求的各种参数,我们也可以自己定义,但这里我们先使用全局的
* 之后我们再新建IndexRequest对象,在IndexRequest中标注索引、id、具体的数据
* 我们的id可以使用我们的skuId
*/
BulkRequest bulkRequest = new BulkRequest();
for (SkuEsModel model : skuEsModelList) {
IndexRequest indexRequest = new IndexRequest(EsConstant.PRODUCT_INDEX); // 传入索引
indexRequest.id(model.getSkuId().toString()); // 传入id
String s = JSON.toJSONString(model); // 将对象转换成JSON对象
indexRequest.source(s, XContentType.JSON); // 传入具体的信息,第一个形参是String字符串,第二个形参是它对应的数据类型(这里是JSON)
bulkRequest.add(indexRequest); // 将信息传入BulkRequest,用于RestHighLevelClient进行调用
}
// 进行调用,向ES中进行最后的存储
BulkResponse bulk = restHighLevelClient.bulk(bulkRequest, GulimailElasticSearchConfig.COMMON_OPTIONS);
// 若发生添加错误
boolean b = bulk.hasFailures();
// 这个数组直接转Stream流的方式可以学一下
List<String> collect = Arrays.stream(bulk.getItems()).map(item -> {
return item.getId();
}).collect(Collectors.toList());
log.error("商品上架错误{}", collect);
return b;
}
操作Kibana,给ES建立好索引:
PUT product
{
"mappings":{
"properties": {
"skuId":{ "type": "long" },
"spuId":{ "type": "keyword" },
"skuTitle": {
"type": "text",
"analyzer": "ik_smart"
},
"skuPrice": { "type": "keyword" },
"skuImg" : { "type": "keyword" },
"saleCount":{ "type":"long" },
"hasStock": { "type": "boolean" },
"hotScore": { "type": "long" },
"brandId": { "type": "long" },
"catalogId": { "type": "long" },
"brandName": {"type": "keyword"},
"brandImg":{
"type": "keyword",
"index": false,
"doc_values": false
},
"catalogName": {"type": "keyword" },
"attrs": {
"type": "nested",
"properties": {
"attrId": {"type": "long" },
"attrName": {
"type": "keyword",
"index": false,
"doc_values": false
},
"attrValue": {"type": "keyword" }
}
}
}
}
}
在product模块中创建SearchFeignService接口:
@FeignClient("gulimail-search")
public interface SearchFeignService {
/**
* @PostMapping("/product")
* public R productStatusUp(@RequestBody List<SkuEsModel> skuEsModelList) {
*/
@PostMapping
R productStatusUp(@RequestBody List<SkuEsModel> skuEsModelList);
}
注入并调用这个接口(商品上架的整体内容):
/**
* 商品上架
* 上架是一个SPU类型的商品数据,而不是某一个具体的SKU
* @param spuId
*/
@Override
public void up(Long spuId) {
// 先查出当前spuid对应的所有sku的信息,包括品牌的名字也要查出来(一个spu对应多个sku,颜色不同、内存不同,等等)
List<SkuInfoEntity> skuInfoEntities = skuInfoService.getSkusBySpuId(spuId);
List<ProductAttrValueEntity> baseAttrs = productAttrValueService.baseAttrListforspu(spuId);
List<Long> attrIds = baseAttrs.stream().map(item -> {
return item.getAttrId();
}).collect(Collectors.toList());
// 查到所有可被检索信息的id
List<Long> searchAttrIds = attrService.selectSearchAttrIds(attrIds);
HashSet<Long> searchAttrIdsSet = new HashSet<>(searchAttrIds);
List<SkuEsModel.Attrs> attrs = new ArrayList<>();
List<SkuEsModel.Attrs> attrsList = baseAttrs.stream().filter(item -> baseAttrs.contains(item)).map(item -> {
SkuEsModel.Attrs attrs1 = new SkuEsModel.Attrs();
BeanUtils.copyProperties(item, attrs1);
return attrs1;
}).collect(Collectors.toList());
//获取所有的id
List<Long> skuIds = skuInfoEntities.stream().map(skuInfoEntity -> skuInfoEntity.getSkuId()).collect(Collectors.toList());
Map<Long, Boolean> stockMap = null;
try {
R<List<SkuHasStockVo>> skusHasStock = wareFeignService.getSkusHasStock(skuIds);
//若item -> item.getId(),则可以转化为:Item::getId(注意前面是类名,不是对象名)
stockMap = skusHasStock.getData().stream().collect(Collectors.toMap(SkuHasStockVo::getSkuId, SkuHasStockVo::getHasStock));
} catch (Exception e) {
log.error("远程调用出现异常:{}", e);
}
Map<Long, Boolean> finalStockMap = stockMap;
List<SkuEsModel> upProducts = skuInfoEntities.stream().map(item -> {
// 组装需要的数据
SkuEsModel esModel = new SkuEsModel();
BeanUtils.copyProperties(item, esModel);
// 设置其他没有被复制的字段
esModel.setSkuPrice(item.getPrice());
esModel.setSkuImg(item.getSkuDefaultImg());
// 查询库存
// TODO 远程调用仓库查库存
// 利用toMap和R的泛型的方法进行的快速转换(好牛的技巧!!!)
// Lambda中一般不能操作外部变量,这里有必要操作,在编译器的支持下做修改即可(Lambda操作的外部变量必须是以final修饰的)
esModel.setHasStock(finalStockMap.get(item.getSkuId()));
// 热度评分先设置为0
esModel.setHotScore(0L);
BrandEntity brand = brandService.getById(esModel.getBrandId());
esModel.setBrandName(brand.getName());
esModel.setBrandImg(brand.getLogo());
CategoryEntity category = categoryService.getById(esModel.getCatalogId());
esModel.setCatalogName(category.getName());
// 设置里面那个数组
esModel.setAttrs(attrsList);
return esModel;
}).collect(Collectors.toList());
// 将数据发送给ES进行存储
/**
* @PostMapping("/product")
* public R productStatusUp(@RequestBody List<SkuEsModel> skuEsModelList) {
*/
R r = searchFeignService.productStatusUp(upProducts);
if (r.getCode() == 0) {
baseMapper.updateSpuStatus(spuId, ProductConstant.StatusEnum.SPU_UP.getCode());
} else {
log.error("商品上架失败");
}
}
Feign的调用流程:
尝试:
PUT gulimail_product
{
"mappings":{
"properties": {
"skuId":{ "type": "long" },
"spuId":{ "type": "keyword" },
"skuTitle": {
"type": "text",
"analyzer": "ik_smart"
},
"skuPrice": { "type": "keyword" },
"skuImg" : { "type": "keyword" },
"saleCount":{ "type":"long" },
"hasStock": { "type": "boolean" },
"hotScore": { "type": "long" },
"brandId": { "type": "long" },
"catalogId": { "type": "long" },
"brandName": {"type": "keyword"},
"brandImg":{
"type": "keyword",
"index": false,
},
"catalogName": {"type": "keyword" },
"attrs": {
"type": "nested",
"properties": {
"attrId": {"type": "long" },
"attrName": {
"type": "keyword",
"index": false,
},
"attrValue": {"type": "keyword" }
}
}
}
}
}
尝试成功,不加doc_values就可以进行聚合操作