博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
elasticsearch Java API 之Bulk API(批量操作)
阅读量:5920 次
发布时间:2019-06-19

本文共 2777 字,大约阅读时间需要 9 分钟。

hot3.png

为了提高操作效率,java api提供一个比较有用的批量操作功能,可以一次性提交多种操作,减少与节点的交互的时间。

总的来说,写法也非常简单,有点类似java中的list。

// 静态引入jar包import static org.elasticsearch.common.xcontent.XContentFactory.*;// 生成容器BulkRequestBuilder bulkRequest = client.prepareBulk();// 第一个操作bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")        .setSource(jsonBuilder()                    .startObject()                        .field("user", "kimchy")                        .field("postDate", new Date())                        .field("message", "trying out Elasticsearch")                    .endObject()                  )        );// 第二个操作bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")        .setSource(jsonBuilder()                    .startObject()                        .field("user", "kimchy")                        .field("postDate", new Date())                        .field("message", "another post")                    .endObject()                  )        );// 执行BulkResponse bulkResponse = bulkRequest.execute().actionGet();if (bulkResponse.hasFailures()) {//如果有失败的    // process failures by iterating through each bulk response item}

bulkProcessor

第一步先生成实例

// 静态引入import org.elasticsearch.action.bulk.BulkProcessor;BulkProcessor bulkProcessor = BulkProcessor.builder(        client,  // 使用elasticsearch client,具体其它章节        new BulkProcessor.Listener() {            //这个方法是在bulk执行前触发的。你可以在方法内request.numberOfActions()            @Override            public void beforeBulk(long executionId,                                   BulkRequest request) { ... }             //这个方法在bulk执行成功后触发的。你可以在方法内使用response.hasFailures()            @Override            public void afterBulk(long executionId,                                  BulkRequest request,                                  BulkResponse response) { ... }             //这个方法在bulk执行失败后触发的。            @Override            public void afterBulk(long executionId,                                  BulkRequest request,                                  Throwable failure) { ... }         })        .setBulkActions(10000) //分批,每10000条请求当成一批请求。默认值为1000        // We want to flush the bulk every 1gb。默认为5m        .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))         //每5秒一定执行,不管已经队列积累了多少。默认不设置这个值        .setFlushInterval(TimeValue.timeValueSeconds(5))          //设置并发请求数,如果是0,那表示只有一个请求就可以被执行,如果为1,则可以积累并被执行。默认为1.        .setConcurrentRequests(1)         .build();

第二步添加操作

bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));

第三步关闭(两种情况的区别待研究)

bulkProcessor.awaitClose(10, TimeUnit.MINUTES);//在规定时间内等待?比如在10分钟内处理完成,那就返回true,//如果没能处理完,那返回false?//或者bulkProcessor.close();//马上关闭,不管还有多少没有处理完??

转载于:https://my.oschina.net/claireliu/blog/464538

你可能感兴趣的文章
HorizontalScrollView
查看>>
XenServer 6.5实战系列之十:Create VMs from a VM Template
查看>>
python接口自动化测试(四)-Cookie&Sessinon
查看>>
C#进阶系列——WebApi 接口参数不再困惑:传参详解
查看>>
css中元素的定位:position
查看>>
异步编程
查看>>
[Android Pro] Gradle tip #3-Task顺序
查看>>
[GIT]
查看>>
Android -- View
查看>>
【oracle官网】 Restoring a Database on a New Host
查看>>
关于表联结方法_sort-merge join
查看>>
Vagrant基础简要记录
查看>>
Python探索记(09)——字符串(上)
查看>>
HDOJ 2028 Lowest Common Multiple Plus(n个数的最小公倍数)
查看>>
[20170518]不同事务能使用相同回滚段吗.txt
查看>>
物理卷操作命令:pvcreate,pvscan,pvdisplay.卷组操作命令:vgcreate,vgdisplay. (转)
查看>>
Swift语言实战晋级-第9章 游戏实战-跑酷熊猫-2 创建熊猫类
查看>>
iptables设置时,要注意规则的顺序
查看>>
InputStream中read()与read(byte[] b)
查看>>
JS里写入(混写)php asp
查看>>