为了提高操作效率,java api提供一个比较有用的批量操作功能,可以一次性提交多种操作,减少与节点的交互的时间。
总的来说,写法也非常简单,有点类似java中的list。
// 静态引入jar包import static org.elasticsearch.common.xcontent.XContentFactory.*;// 生成容器BulkRequestBuilder bulkRequest = client.prepareBulk();// 第一个操作bulkRequest.add(client.prepareIndex("twitter", "tweet", "1") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject() ) );// 第二个操作bulkRequest.add(client.prepareIndex("twitter", "tweet", "2") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "another post") .endObject() ) );// 执行BulkResponse bulkResponse = bulkRequest.execute().actionGet();if (bulkResponse.hasFailures()) {//如果有失败的 // process failures by iterating through each bulk response item}
bulkProcessor
第一步先生成实例
// 静态引入import org.elasticsearch.action.bulk.BulkProcessor;BulkProcessor bulkProcessor = BulkProcessor.builder( client, // 使用elasticsearch client,具体其它章节 new BulkProcessor.Listener() { //这个方法是在bulk执行前触发的。你可以在方法内request.numberOfActions() @Override public void beforeBulk(long executionId, BulkRequest request) { ... } //这个方法在bulk执行成功后触发的。你可以在方法内使用response.hasFailures() @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { ... } //这个方法在bulk执行失败后触发的。 @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { ... } }) .setBulkActions(10000) //分批,每10000条请求当成一批请求。默认值为1000 // We want to flush the bulk every 1gb。默认为5m .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) //每5秒一定执行,不管已经队列积累了多少。默认不设置这个值 .setFlushInterval(TimeValue.timeValueSeconds(5)) //设置并发请求数,如果是0,那表示只有一个请求就可以被执行,如果为1,则可以积累并被执行。默认为1. .setConcurrentRequests(1) .build();
第二步添加操作
bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));
第三步关闭(两种情况的区别待研究)
bulkProcessor.awaitClose(10, TimeUnit.MINUTES);//在规定时间内等待?比如在10分钟内处理完成,那就返回true,//如果没能处理完,那返回false?//或者bulkProcessor.close();//马上关闭,不管还有多少没有处理完??