提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
批量写入到Elasticsearch会提高写入性能,减少Elasticsearch io压力。
Elasticsearch是一个实时的分布式开放源代码全文本搜索和分析引擎。可从RESTful Web服务界面访问它,并使用无模式的JSON(JavaScript对象表示法)文档存储数据。它基于Java编程语言构建,因此Elasticsearch可以在不同平台上运行。它使用户能够以很高的速度浏览大量的数据。
BulkProcessor是一个线程安全的批量处理类,允许方便地设置每次写入ES的最大数量,以及超时时间。所谓超时时间,就是在规定的时间内,如果没有请求进来,就把之前累积的请求直接写到ES,不必等待请求数量累积到你规定的最大数量。
代码如下(示例):
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.rest.RestStatus;
public class BulkProListener implements BulkProcessor.Listener{
@Override
public void beforeBulk(long l, BulkRequest bulkRequest) {
System.out.println("执行前");
}
@Override
public void afterBulk(long l, BulkRequest request, BulkResponse response) {
System.out.println("执行后");
if (response.hasFailures()) {
BulkItemResponse itemResponse;
Throwable failure;
RestStatus restStatus;
DocWriteRequest actionRequest;
try {
for (int i = 0; i < response.getItems().length; i++) {
itemResponse = response.getItems()[i];
if (itemResponse.isFailed()) {
failure = itemResponse.getFailure().getCause();
if (failure != null) {
restStatus = itemResponse.getFailure().getStatus();
actionRequest = request.requests().get(i);
if (restStatus == null) {
if (actionRequest instanceof ActionRequest) {
System.out.println("Failed Elasticsearch item request: " + failure.getCause().getMessage());
} else {
throw new UnsupportedOperationException(
"The sink currently only supports ActionRequests");
}
}else{
if (actionRequest instanceof ActionRequest) {
System.out.println("Failed sink item request: " + failure.getCause().getMessage()+" status: "+restStatus.getStatus());
failure.printStackTrace();
} else {
throw new UnsupportedOperationException(
"The sink currently only supports ActionRequests");
}
}
}
}
}
}catch (Throwable t){
t.printStackTrace();
}
}
}
@Override
public void afterBulk(long l, BulkRequest request, Throwable failure) {
System.out.println("有错误");
try {
for (DocWriteRequest writeRequest : request.requests()) {
if (writeRequest instanceof ActionRequest) {
System.out.println("Failed Elasticsearch item request: " + failure.getMessage());
failure.printStackTrace();
} else {
throw new UnsupportedOperationException(
"The sink currently only supports ActionRequests");
}
}
} catch (Throwable t) {
// fail the sink and skip the rest of the items
// if the failure handler decides to throw an exception
t.printStackTrace();
}
}
}
在es中建立索引batch,类型my_type,结构为"user_name",“user_id”,“age”,“user_note”
//设置满5000条提交,时间间隔10秒
bulkProcessor.setBulkActions(5000).setFlushInterval(TimeValue.timeValueSeconds(10)).build();
代码如下(示例):
import com.alibaba.fastjson.JSONObject;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.transport.Netty3Plugin;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class EsBatchWriterTest {
public static void main(String[] args) throws Exception {
Settings settings = Settings.builder().put(NetworkModule.HTTP_TYPE_KEY, Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME)
.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty3Plugin.NETTY_TRANSPORT_NAME).build();
// Settings settings = Settings.EMPTY;
//创建client
TransportClient client = new PreBuiltTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("10.68.8.60"), 9300));
batch2(client);
// update(client);
client.close();
}
public static void batch2(TransportClient client) throws InterruptedException {
BulkProcessor.Builder bulkProcessor = BulkProcessor.builder(
client,new BulkProListener());
BulkProcessor processor = bulkProcessor.setBulkActions(5000).setFlushInterval(TimeValue.timeValueSeconds(10)).build();
int count = 1;
List<JSONObject> list = getData();
System.out.println(list.size());
for(JSONObject obj:list) {
System.out.println(obj.toJSONString());
IndexRequestBuilder builder = client.prepareIndex("batch", "my_type").setId(obj.getString("user_id")).setSource(obj);
processor.add(builder.request());
}
processor.awaitClose(2, TimeUnit.MINUTES);
// processor.close();
}
private static List<JSONObject> getData(){
List<JSONObject> list =new ArrayList<>();
JSONObject j=new JSONObject();
j.put("user_name","name7");
j.put("user_id","7");
j.put("age","34");
j.put("user_note","note");
list.add(j);
j=new JSONObject();
j.put("user_name","name8");
j.put("user_id","8");
j.put("age","24");
j.put("user_note","note");
list.add(j);
j=new JSONObject();
j.put("user_name","name9");
j.put("user_id","9");
j.put("age","24");
j.put("user_note","note");
list.add(j);
j=new JSONObject();
j.put("user_name","name10");
j.put("user_id","10");
j.put("age","14");
j.put("user_note","note");
list.add(j);
j=new JSONObject();
j.put("user_name","name11");
j.put("user_id","11");
j.put("age","54b");
j.put("user_note","note");
list.add(j);
j=new JSONObject();
j.put("user_name","name20");
j.put("user_id","20");
j.put("age","34a");
j.put("user_note","note");
list.add(j);
j=new JSONObject();
j.put("user_name","name30");
j.put("user_id","30");
j.put("age","30");
j.put("user_note","note");
list.add(j);
return list;
}
public static void batch(TransportClient client){
int count = 1;
//开启批量插入
BulkRequestBuilder bulkRequest = client.prepareBulk();
List<JSONObject> list =new ArrayList<>();
JSONObject j=new JSONObject();
j.put("user_name","name1");
j.put("user_id","1");
list.add(j);
j=new JSONObject();
j.put("user_name","name3");
j.put("user_id","3");
list.add(j);
j=new JSONObject();
j.put("user_name","name2");
j.put("user_id","2");
list.add(j);
for(JSONObject obj:list){
IndexRequestBuilder builder = client.prepareIndex("batch", "my_type").setId(obj.getString("user_id")).setSource(obj);
bulkRequest.add(builder);
//每一千条提交一次
if (count% 1000==0) {
bulkRequest.execute().actionGet();
System.out.println("提交了:" + count);
}
count++;
}
bulkRequest.execute().actionGet();
}
}
执行文档批量请求时,首先需要初始化 Elasticsearch Client,其次创建 BulkProcessor ,
设置 BulkProcessor 参数,最后关闭processor。