【中间件】ElasticSearch在Java中的调用尝试

发布时间:2023年12月19日

ElasticSearch

ElasticSearch的两个主要功能:

  • 全文检索
  • 形成ELK集群,进行日志检索(ElasticSearch、LogStash、Kibana)

日志采集(服务器、移动设备、IOT传感器) -> 数据传输汇聚(Kafka、LogStash) -> 数据存储索引(ElasticSearch集群) -> 异常定位与监控(全文检索、结构化查询、数据可视化)

ES是一个基于内存的数据检索与分析引擎,我们如果有十分多的数据需要保存的话,我们可以构建多台服务器,利用ES天然适配于分布式的属性对ES中存储的数据进行分片存储

业务:商品在上架后向ES中保存

涉及到ES的问题,尽量不要使用分步查询,一步到位,否则第二次请求要发送的数据可能会相当大,导致网络崩溃

创建索引:(注意点:对于只需要看,不需要搜索的信息,可以把index属性和doc_values属性设置为false,对于内部的另一个属性,要标记为nested,否则会产生问题)

一个数据模型的映射示例:

PUT product
{
    "mappings":{
        "properties": {
            "skuId":{ "type": "long" },
            "spuId":{ "type": "keyword" },  
            "skuTitle": {
                "type": "text",
                "analyzer": "ik_smart"  
            },
            "skuPrice": { "type": "keyword" },  
            "skuImg"  : { "type": "keyword" },  
            "saleCount":{ "type":"long" },
            "hasStock": { "type": "boolean" },
            "hotScore": { "type": "long"  },
            "brandId":  { "type": "long" },
            "catalogId": { "type": "long"  },
            "brandName": {"type": "keyword"}, 
            "brandImg":{
                "type": "keyword",
                "index": false,  
                "doc_values": false 
            },
            "catalogName": {"type": "keyword" }, 
            "attrs": {
                "type": "nested",
                "properties": {
                    "attrId": {"type": "long"  },
                    "attrName": {
                        "type": "keyword",
                        "index": false,
                        "doc_values": false
                    },
                    "attrValue": {"type": "keyword" }
                }
            }
        }
    }
}

一定要使用nested的原因:

ES在存储时不会按照对象存储,而是按照属性,把所有同一个字段的内容都聚合成一个数组(用来便于全文检索),而我们不希望我们的内嵌的对象被扁平化处理,希望能够精确查找,所以我们标注nested来让ES不对该属性进行扁平化处理

user.name=["aaa","bbb"]
user.addr=["ccc","ddd"]

这种存储方式,可能会发生如下错误:
错误检索到{aaa,ddd},这个组合是不存在的

操作,将数据在ES中保存

我们提前准备好所有要保存的信息,建立Vo类,使用这个VO类进行保存

在search包中创建一个controller:

    /**
     * 上架商品
     */
    @PostMapping("/product")
    public R productStatusUp(@RequestBody List<SkuEsModel> skuEsModelList) {

        // 由于我们在网络可能出现问题的部分Throw 了 IOException,故我们需要使用 try-catch 进行捕获
        try{
            productSaveService.productStatusUp(skuEsModelList);
        } catch (Exception e) {
            return R.error(PRODUCT_UP_EXCEPTION.getCode(), PRODUCT_UP_EXCEPTION.getMsg());
        };

        return R.ok();
    }
}

具体操作还得再Impl中做:

/**
     * 上架商品
     *
     * @param skuEsModelList
     * @return
     */
    @Override
    public boolean productStatusUp(List<SkuEsModel> skuEsModelList) throws IOException {
        /**
         * 向ES中保存数据.bulk(BulkRequest, RequestOptions)
         * BulkRequest中存储我们要保存的数据配置(索引、id等)
         * RequestOptions我们提前在Config中进行全局定义,用来配置我们发送请求的各种参数,我们也可以自己定义,但这里我们先使用全局的
         * 之后我们再新建IndexRequest对象,在IndexRequest中标注索引、id、具体的数据
         * 我们的id可以使用我们的skuId
         */
        BulkRequest bulkRequest = new BulkRequest();
        for (SkuEsModel model : skuEsModelList) {
            IndexRequest indexRequest = new IndexRequest(EsConstant.PRODUCT_INDEX);     // 传入索引
            indexRequest.id(model.getSkuId().toString());               // 传入id
            String s = JSON.toJSONString(model);        // 将对象转换成JSON对象
            indexRequest.source(s, XContentType.JSON);              // 传入具体的信息,第一个形参是String字符串,第二个形参是它对应的数据类型(这里是JSON)

            bulkRequest.add(indexRequest);              // 将信息传入BulkRequest,用于RestHighLevelClient进行调用
        }
        // 进行调用,向ES中进行最后的存储
        BulkResponse bulk = restHighLevelClient.bulk(bulkRequest, GulimailElasticSearchConfig.COMMON_OPTIONS);

        // 若发生添加错误
        boolean b = bulk.hasFailures();
        // 这个数组直接转Stream流的方式可以学一下
        List<String> collect = Arrays.stream(bulk.getItems()).map(item -> {
            return item.getId();
        }).collect(Collectors.toList());
        log.error("商品上架错误{}", collect);


        return b;
    }

操作Kibana,给ES建立好索引:

PUT product
{
    "mappings":{
        "properties": {
            "skuId":{ "type": "long" },
            "spuId":{ "type": "keyword" },  
            "skuTitle": {
                "type": "text",
                "analyzer": "ik_smart"  
            },
            "skuPrice": { "type": "keyword" },  
            "skuImg"  : { "type": "keyword" },  
            "saleCount":{ "type":"long" },
            "hasStock": { "type": "boolean" },
            "hotScore": { "type": "long"  },
            "brandId":  { "type": "long" },
            "catalogId": { "type": "long"  },
            "brandName": {"type": "keyword"}, 
            "brandImg":{
                "type": "keyword",
                "index": false,  
                "doc_values": false 
            },
            "catalogName": {"type": "keyword" }, 
            "attrs": {
                "type": "nested",
                "properties": {
                    "attrId": {"type": "long"  },
                    "attrName": {
                        "type": "keyword",
                        "index": false,
                        "doc_values": false
                    },
                    "attrValue": {"type": "keyword" }
                }
            }
        }
    }
}

在product模块中创建SearchFeignService接口:

@FeignClient("gulimail-search")
public interface SearchFeignService {

    /**
     *     @PostMapping("/product")
     *     public R productStatusUp(@RequestBody List<SkuEsModel> skuEsModelList) {
     */
    @PostMapping
    R productStatusUp(@RequestBody List<SkuEsModel> skuEsModelList);
}

注入并调用这个接口(商品上架的整体内容):

    /**
     * 商品上架
     * 上架是一个SPU类型的商品数据,而不是某一个具体的SKU
     * @param spuId
     */
    @Override
    public void up(Long spuId) {
        // 先查出当前spuid对应的所有sku的信息,包括品牌的名字也要查出来(一个spu对应多个sku,颜色不同、内存不同,等等)
        List<SkuInfoEntity> skuInfoEntities = skuInfoService.getSkusBySpuId(spuId);

        List<ProductAttrValueEntity> baseAttrs = productAttrValueService.baseAttrListforspu(spuId);
        List<Long> attrIds = baseAttrs.stream().map(item -> {
            return item.getAttrId();
        }).collect(Collectors.toList());

        // 查到所有可被检索信息的id
        List<Long> searchAttrIds = attrService.selectSearchAttrIds(attrIds);

        HashSet<Long> searchAttrIdsSet = new HashSet<>(searchAttrIds);

        List<SkuEsModel.Attrs> attrs = new ArrayList<>();

        List<SkuEsModel.Attrs> attrsList = baseAttrs.stream().filter(item -> baseAttrs.contains(item)).map(item -> {
            SkuEsModel.Attrs attrs1 = new SkuEsModel.Attrs();
            BeanUtils.copyProperties(item, attrs1);
            return attrs1;
        }).collect(Collectors.toList());

        //获取所有的id
        List<Long> skuIds = skuInfoEntities.stream().map(skuInfoEntity -> skuInfoEntity.getSkuId()).collect(Collectors.toList());
        Map<Long, Boolean> stockMap = null;
        try {
            R<List<SkuHasStockVo>> skusHasStock = wareFeignService.getSkusHasStock(skuIds);
            //若item -> item.getId(),则可以转化为:Item::getId(注意前面是类名,不是对象名)
            stockMap = skusHasStock.getData().stream().collect(Collectors.toMap(SkuHasStockVo::getSkuId, SkuHasStockVo::getHasStock));
        } catch (Exception e) {
            log.error("远程调用出现异常:{}", e);
        }

        Map<Long, Boolean> finalStockMap = stockMap;
        List<SkuEsModel> upProducts = skuInfoEntities.stream().map(item -> {
            // 组装需要的数据
            SkuEsModel esModel = new SkuEsModel();
            BeanUtils.copyProperties(item, esModel);

            // 设置其他没有被复制的字段
            esModel.setSkuPrice(item.getPrice());
            esModel.setSkuImg(item.getSkuDefaultImg());

            // 查询库存
            // TODO 远程调用仓库查库存
            // 利用toMap和R的泛型的方法进行的快速转换(好牛的技巧!!!)
            // Lambda中一般不能操作外部变量,这里有必要操作,在编译器的支持下做修改即可(Lambda操作的外部变量必须是以final修饰的)
            esModel.setHasStock(finalStockMap.get(item.getSkuId()));

            // 热度评分先设置为0
            esModel.setHotScore(0L);

            BrandEntity brand = brandService.getById(esModel.getBrandId());
            esModel.setBrandName(brand.getName());
            esModel.setBrandImg(brand.getLogo());

            CategoryEntity category = categoryService.getById(esModel.getCatalogId());
            esModel.setCatalogName(category.getName());

            // 设置里面那个数组
            esModel.setAttrs(attrsList);

            return esModel;
        }).collect(Collectors.toList());

        // 将数据发送给ES进行存储
        /**
         *     @PostMapping("/product")
         *     public R productStatusUp(@RequestBody List<SkuEsModel> skuEsModelList) {
         */
        R r = searchFeignService.productStatusUp(upProducts);
        if (r.getCode() == 0) {
            baseMapper.updateSpuStatus(spuId, ProductConstant.StatusEnum.SPU_UP.getCode());
        } else {
            log.error("商品上架失败");
        }
    }

Feign的调用流程:

  • 构造请求数据,将对象转为JSON数据,RequestTemplate template = buildTemplateFromArgs.create(argv)
  • 发送请求执行(执行成功会进行解码响应)executeAndDecode(template)
  • 这里的请求执行是有重试机制的,这个重试机制默认是关闭的,其逻辑大概是:执行出错 - 重试(多次重试) - 若再次出错,抛出异常,若没有再次出错 - 解码响应

尝试:

PUT gulimail_product
{
    "mappings":{
        "properties": {
            "skuId":{ "type": "long" },
            "spuId":{ "type": "keyword" },  
            "skuTitle": {
                "type": "text",
                "analyzer": "ik_smart"  
            },
            "skuPrice": { "type": "keyword" },  
            "skuImg"  : { "type": "keyword" },  
            "saleCount":{ "type":"long" },
            "hasStock": { "type": "boolean" },
            "hotScore": { "type": "long"  },
            "brandId":  { "type": "long" },
            "catalogId": { "type": "long"  },
            "brandName": {"type": "keyword"}, 
            "brandImg":{
                "type": "keyword",
                "index": false,  
            },
            "catalogName": {"type": "keyword" }, 
            "attrs": {
                "type": "nested",
                "properties": {
                    "attrId": {"type": "long"  },
                    "attrName": {
                        "type": "keyword",
                        "index": false,
                    },
                    "attrValue": {"type": "keyword" }
                }
            }
        }
    }
}

尝试成功,不加doc_values就可以进行聚合操作

文章来源:https://blog.csdn.net/weixin_41365204/article/details/135067745
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。