Bulk API提供了一种在单个请求中针对批量文档执行idnex
、create
、delete
和update
操作的方法。显然,批量操作大大提升了CRUD
操作的效率,因为它将多个IO请求归并到一个IO请求中。
Bulk API请求体是一种NDJSON
(newline-delimited json)数据结构,NDJSON
数据结构中每一行必须以换行符\n
结尾,但这个换行符不需要显式添加,因为大多数文本编辑器会自动追加换行符。另外,Http Request Header中Content-Type
值必须为application/x-ndjson
。
operation_and_meta_data
optional_source
operation_and_meta_data
optional_source
operation_and_meta_data
optional_source
...
POST /_bulk
POST /<target>/_bulk
target
用于指定数据流名称、索引名称或索引别名,可选参数。
管道唯一标识,用于对文档进行预处理;可选参数,无默认值。
若_source
值为true,则bulk api响应内容中会包含_source
字段;可选参数,默认值为false。
可选参数,默认值为false。
参数值 | 描述 |
---|---|
true | 立即触发refresh操作 |
false | 不立即触发refresh操作 |
wait_for | 等待refresh操作 |
routing
参数主要用于指定主本分片;可选参数,无默认值。
timeout
参数并不是指bulk api响应超时时间,而是针对每个文档操作的等待超时时间,比如index操作,可能涉及等待:automatic index creation
、dynamic mapping updates
和waiting for active shards
;可选参数,默认值1m。
一般,inedx操作在正式执行之前,需要等待一定数量的active shards,分片数量就是由wait_for_active_shards
参数设定,wait_for_active_shards
默认值为1,即一个主本分片即可,wait_for_active_shards
最大值为一个主本分片与其所有副本分片之和。如果当前活跃分片数小于wait_for_active_shards
值,那么index操作必须等待并重试。
操作名称 | 描述 |
---|---|
create | 文档若已存在,则不执行文档写入操作 |
delete | 删除文档 |
update | 更新文档 |
index | 文档若已存在,则执行文档更新操作(upsert);文档若不存在,则执行文档写入操作 |
操作名称 | 操作描述 |
---|---|
_index | 索引名称,如果请求路径参数缺失,则该参数必选 |
_id | 文档唯一标识,可选,若缺失,则自动生成 |
require_alias | 若值为true,则必须在元数据中追加索引别名,默认值为false |
参数 | 数据类型 | 描述 |
---|---|---|
doc | object | 对应update操作 |
object | 对应index和create操作 |
cat request_body_ndjson
-----------------------
{"index":{"_index":"bulk_test", "_id":"1"}}
{"field1":"index_value1","field2":"index_value2","field3":"index_value3","field4":"index_value4"}
{"index":{"_index":"bulk_test", "_id":"2"}}
{"field1":"index_value1"}
{"update":{"_index":"bulk_test", "_id":"1"}}
{"doc":{"field":"update_value"}}
{"delete":{"_index":"bulk_test", "_id":"2"}}
curl -request POST http://localhost:9200/_bulk \
-header "Content-Type:application/x-ndjson" \
-header "Authorization: Basic ZWxhc3RpYzpRd2UxMjMhQGNtc3M=" \
--data-binary "@/apps/dukui/request_body_ndjson"
-----------------------
{
"took": 311,
"errors": false,
"items": [
{
"index": {
"_index": "bulk_test",
"_type": "_doc",
"_id": "1",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 12,
"_primary_term": 1,
"status": 201
}
},
{
"index": {
"_index": "bulk_test",
"_type": "_doc",
"_id": "2",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 13,
"_primary_term": 1,
"status": 201
}
},
{
"update": {
"_index": "bulk_test",
"_type": "_doc",
"_id": "1",
"_version": 2,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 14,
"_primary_term": 1,
"get": {
"_seq_no": 14,
"_primary_term": 1,
"found": true,
"_source": {
"field1": "index_value1",
"field2": "index_value2",
"field3": "index_value3",
"field4": "index_value4",
"field": "update_value"
}
},
"status": 200
}
},
{
"delete": {
"_index": "bulk_test",
"_type": "_doc",
"_id": "2",
"_version": 2,
"result": "deleted",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 15,
"_primary_term": 1,
"status": 200
}
}
]
}
@Resource
private RestHighLevelClient restHighLevelClient;
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.timeout(TimeValue.timeValueSeconds(5));
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
for (int j = 0; j < 10; j++) {
IndexRequest indexRequest = new IndexRequest("bulk")
.id(String.valueOf(j + 1))
.source(
XContentType.JSON,
"title", "Elasticsearch Bulk API",
"author", "optimus prime",
"date", LocalDateTime.now()
);
bulkRequest.add(indexRequest);
}
BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
restHighLevelClient.bulkAsync(
bulkRequest,
RequestOptions.DEFAULT,
new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
}
@Override
public void onFailure(Exception e) {
}
});
BulkProcessor允许我们基于不同策略来配置flush
操作的触发时机;同时,还能轻松控制BulkRequest的并发执行数;另外,BulkProcessor是线程安全的。
@Bean
public BulkProcessor bulkProcessor(RestHighLevelClient restHighLevelClient) {
BulkProcessor.Builder builder = BulkProcessor.builder(
(bulkRequest, bulkResponseActionListener) ->
restHighLevelClient.bulkAsync(
bulkRequest,
RequestOptions.DEFAULT,
bulkResponseActionListener
)
,
new BulkProcessor.Listener() {
/*
* 每个BulkRequest执行之前该逻辑被调用
*/
@Override
public void beforeBulk(long executionId, BulkRequest bulkRequest) {
}
/*
* 每个BulkRequest执行成功之后该逻辑被调用
*/
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse bulkResponse) {
for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
continue;
}
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
switch (bulkItemResponse.getOpType()) {
case INDEX:
case CREATE:
IndexResponse indexResponse = (IndexResponse) itemResponse;
break;
case UPDATE:
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
break;
case DELETE:
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
}
}
}
/*
* 每个BulkRequest执行失败之后该逻辑被调用
*/
@Override
public void afterBulk(long executionId, BulkRequest bulkRequest, Throwable failure) {
}
});
/*
* 当BulkRequest中action数量达到1000时执行flush操作;默认值为1000,若值为-1意味着禁用该配置项
*/
builder.setBulkActions(1000);
/*
* 当BulkRequest中action字节体量达到5MB时执行flush操作;默认值为5MB,若值为-1意味着禁用该配置项
*/
builder.setBulkSize(new ByteSizeValue(5L, ByteSizeUnit.MB));
/*
* BulkRequest并发执行数,在一定程度上,可以抑制BulkRequest的堆积;默认值为1,若值为0意味着串行执行BulkRequest;若值为-1意味着禁用该配置项
*/
builder.setConcurrentRequests(1);
/*
* 每间隔10秒执行一次flush操作,默认禁用该配置
*/
builder.setFlushInterval(TimeValue.timeValueSeconds(10L));
/*
* 当BulkRequest执行失败时,BulkProcessor如何重试;默认重试机制为BackoffPolicy.exponentialBackoff()
*/
builder.setBackoffPolicy(
BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)
);
/*
* 构建BulkProcessor实例对象
*/
return builder.build();
}
@Resource
private BulkProcessor bulkProcessor;
for (int j = 0; j < 10; j++) {
IndexRequest indexRequest = new IndexRequest("bulk")
.id(String.valueOf(j + 1))
.source(
XContentType.JSON,
"title", "Elasticsearch Bulk API",
"author", "optimus prime",
"date", LocalDateTime.now()
);
bulkProcessor.add(indexRequest);
}
bulkProcessor.flush();
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有