ElasticSearch 7.x 版本使用 BulkProcessor 实现批量添加数据

发布时间:2024年01月11日

ElasticSearch

1、ElasticSearch学习随笔之基础介绍
2、ElasticSearch学习随笔之简单操作
3、ElasticSearch学习随笔之java api 操作
4、ElasticSearch学习随笔之SpringBoot Starter 操作
5、ElasticSearch学习随笔之嵌套操作
6、ElasticSearch学习随笔之分词算法
7、ElasticSearch学习随笔之高级检索
8、ELK技术栈介绍
9、Logstash部署与使用
10、ElasticSearch 7.x 版本使用 BulkProcessor 实现批量添加数据

ElasticSearch,创始人 Shay Banon(谢巴农)



前言

本文主要应用 Rest High Level Client 来进行对 ElasticSearch 进行操作,虽说官方已经不推荐,但是 ES 升级带来的代价也是相当大的,所以,此处略去一万字。

  • 那什么是 BulkProcessor 呢?
    BulkProcessorElasticSearch 客户端中的一个功能,用于批量执行索引、更新或删除操作,BulkProcessor 运行将多个操作打包成一个请求进行发送,以提高效率和性能。

批量操作索引的好处:

  • 性能优势:将多个操作打包成一个请求,这样可以减少网络开销,提高数据传输效率,从而可以加快数据写入索引速度。
  • 减少开销:较少的网络开销和较少的服务器的交互,减少服务器开销,尤其是大规模写入数据时。
  • 原子性:批量操作可以保证一组操作要么全部成功,要么全部失败,报错数据的一致性。
  • 减少开发成本:批量操作,可以简化客户端代码,减少请求和管理连接的操作。

当然,批量操作也是有缺点的:

  • 内存消耗:在执行批量操作时,首先会将数据写入内存,这样会消耗更多的内存。
  • 错误处理复杂性:单条数据上传,如果出错可以重试或者进行记录操作等,但是批量操作中的某个请求失败,需要额外来处理,比单条操作复杂。
  • 延迟响应:批量操作可能导致请求排队等待,会产生一些延迟。

多余的不说,来上代码。

一:引入 pom

首先引入客户端依赖,我的测试 ES 服务是 8.7.0 版本的,这里对应 High Level REST Client 客户端 7.3.2 版本的。

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.3.2</version>
</dependency>

之所以不用更高版本,是因为版本高了会报如下错误:

java.io.IOException: Unable to parse response body for Response{requestLine=POST /devintcompany@1562219164186/_doc?timeout=1m HTTP/1.1, host=http://192.168.*。*:9200, response=HTTP/1.1 201 Created}
	at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1473)
	at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1424)
	at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1394)
	at org.elasticsearch.client.RestHighLevelClient.index(RestHighLevelClient.java:836)
	at com.example.es.EsTest.addIndex(EsTest.java:97)
	at com.example.es.EsTest.main(EsTest.java:36)
Caused by: java.lang.NullPointerException
	at java.util.Objects.requireNonNull(Objects.java:203)
	at org.elasticsearch.action.DocWriteResponse.<init>(DocWriteResponse.java:127)
	at org.elasticsearch.action.index.IndexResponse.<init>(IndexResponse.java:50)
	at org.elasticsearch.action.index.IndexResponse.<init>(IndexResponse.java:39)
	at org.elasticsearch.action.index.IndexResponse$Builder.build(IndexResponse.java:103)
	at org.elasticsearch.action.index.IndexResponse.fromXContent(IndexResponse.java:85)
	at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1727)
	at org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAndParseEntity$8(RestHighLevelClient.java:1395)
	at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1471)
	... 5 more

亲自测试过的,应该还是版本不兼容的缘故,但是数据已经插入到 Index 了,就很奇怪。

二:创建 ES Client

这里初始化客户端,需要用户名密码进行认证的。

private static RestHighLevelClient createClient(){
    String hostname = "192.168.*.*";
    int port = 9200;
    final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
    credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("your username", "your password"));
    RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(hostname, port))
            .setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
    return new RestHighLevelClient(restClientBuilder);
}

三:创建 BulkProcessor

这里创建 BulkProcessor 批量操作对象,通过 High Level REST Client 来绑定,加入监听器 BulkProcessor.Listener,如果批量操作失败或发生异常,在 afterBulk() 方法中处理。
批量处理需要设置的参数代码中已有注释,一般就设置这些参数就可以了,可根据自己的使用场景进行调节。

public static BulkProcessor getBulkProcessor(RestHighLevelClient client) {
    BulkProcessor.Listener listener = new BulkProcessor.Listener() {
        @Override
        public void beforeBulk(long executionId, BulkRequest request) {
            System.out.println("开始执行批量操作,ID: " + executionId);
        }

        @Override
        public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
            if (response.hasFailures()) {
                System.out.println("批量操作完成,ID: " + executionId);
            }
        }

        @Override
        public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
            System.out.println("批量操作失败,ID: " + executionId);
            failure.printStackTrace();
        }
    };
    BulkProcessor.Builder builder = BulkProcessor.builder(((bulkRequest, bulkResponseActionListener) -> {
        bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        bulkRequest.timeout(TimeValue.timeValueSeconds(100));
        client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener);
    }), listener);
    // 当达到1000个操作时触发批量请求
    builder.setBulkActions(1000);
    // 当达到5MB大小时触发批量请求
    builder.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.MB));
    // 每5秒触发一次批量请求,无论大小和操作数如何
    builder.setFlushInterval(TimeValue.timeValueSeconds(5));
    // 设置退避策略,以防服务器过载或拒绝请求
    builder.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(1000), 3));
    // 设置并发请求的数量为1,即同时只有一个批量请求在执行
    builder.setConcurrentRequests(1);
    return builder.build();
}

四:批量推数据

我们在 main 方法中进行测试,代码如下:

public static void main(String[] args) throws IOException {
    RestHighLevelClient client = createClient();
    BulkProcessor bulkProcessor = getBulkProcessor(client);
    for (int i = 0; i < 10; i++) {
        String source = "{\"ApplianceType\":[{\"ApplianceTypeCn\":\"国产\",\"ApplianceTypeEn\":\"Domestic\",\"ApplianceTypeId\":\"1\"}],\"ApplicationCount\":0,\"ClassICount\":17,\"ClassIICount\":1,\"ClassIIICount\":0,\"Classification\":[{\"Cn\":\"2002版分类\",\"En\":\"2002 reg. category of relevant app.\",\"Id\":\"Class2002\",\"Items\":[{\"Cn\":\"Ⅰ类\",\"En\":\"Class Ⅰ\",\"Id\":\"1\",\"Id2\":\"I\",\"Items\":[{\"Cn\":\"进口第一类医疗器械(含第一类体外诊断试剂)备案信息\",\"En\":\"Information on imported ClassⅠmedical devices (including ClassⅠ IVD reagents)\",\"Id\":\"100\"}]},{\"Cn\":\"Ⅱ类\",\"En\":\"Class Ⅱ\",\"Id\":\"2\",\"Id2\":\"II\",\"Items\":[{\"Cn\":\"妇产科、辅助生殖和避孕器械\",\"En\":\"Obstetrics and gynecology, assisted reproductive and contraceptive devices\",\"Id\":\"201818\"}]}]},{\"Class1Code\":[{\"Id\":\"02\"}],\"Class2Code\":[{\"Id\":\"03\"}],\"DataType\":[{\"Id\":\"1\"},{\"Id\":\"3\"}],\"ProductClassificationCode\":[{\"Id\":\"09\"}],\"ProductClassificationNameCode\":[]}],\"Company\":{\"Cn\":\"海南创鑫医药科技发展有限公司\",\"En\":\"Hainan Chuangxin Pharmaceutical Technology Development Co. Ltd.\",\"Id\":\"1000002388\"},\"CompanyAliasCn\":[\"海南创鑫医药科技发展有限公司\"],\"CompanyAliasEn\":[\"Hainan Chuangxin Pharmaceutical Technology Development Co. Ltd.\"],\"CompanyCn\":\"海南创鑫医药科技发展有限公司\",\"CompanyCnSearch\":\"海南创鑫医药科技发展有限公司\",\"CompanyEn\":\"Hainan Chuangxin Pharmaceutical Technology Development Co. Ltd.\",\"CompanyEnSearch\":\"Hainan Chuangxin Pharmaceutical Technology Development Co. Ltd.\",\"CompanyId\":\"1000002388\",\"CompanyType\":{\"Cn\":\"国内公司\",\"En\":\"Domestic company\",\"Id\":\"Domestic company\"},\"CompanyTypeCn\":\"国内公司\",\"CompanyTypeEn\":\"Domestic company\",\"CompanyTypeId\":\"Domestic company\",\"DomesticCount\":18,\"EffectiveRegistrationCount\":18,\"FirstApplicationYear\":null,\"FirstRegistrationYear\":\"2017\",\"IVD\":\"0\",\"ImportCount\":0,\"LatestApplicationYear\":null,\"LatestRegistrationYear\":\"2020\",\"Listing\":{\"Cn\":null,\"En\":null,\"Id\":null},\"ListingCn\":null,\"ListingEn\":null,\"ListingId\":null,\"TotalCount\":18,\"company_registration_relation\":{\"name\":\"company\"},\"website_url\":\"\"}";
        bulkProcessor.add(new IndexRequest("devintcompany@1562219164186").source(source, XContentType.JSON));
        System.out.println("添加第 " + i + "条数据!");
    }
    try {
        bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
        client.close();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("添加完成!");
}
文章来源:https://blog.csdn.net/qq_19283249/article/details/135352497
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。