介绍
Elasticsearch Connector 提供了对 Elasticsearch 的写入和读取支持。目前 Oceanus 支持 Elasticsearch 6.x 和 7.x 版本。
版本说明
Flink 版本 | 说明 |
1.11 | 支持 |
1.13 | 支持(写入、批数据源) |
1.14 | 支持 |
1.16 | 支持 |
使用范围
Elasticsearch 支持写入,可以作为 Tuple 数据流的目的表(Sink),也可以作为 Upsert 数据流的目的表(Sink,自动以文档
_id
字段生成主键,并更新之前的文档版本)。如果希望将 JDBC 数据库的变动记录,将其作为流式源表消费,可以使用 Debezium、Canal 等,对 JDBC 数据库的变更进行捕获和订阅,然后 Flink 即可对这些变更事件进行进一步的处理。可参见 Kafka。
Oceanus 支持 Elasticsearch 的批模式读,目前只支持 Elasticsearch 7。
DDL 定义
用作 Elasticsearch 6 数据目的(Sink)
CREATE TABLE elasticsearch6_sink_table (`id` INT,`name` STRING,PRIMARY KEY (`id`) NOT ENFORCED -- 对应 Elasticsearch 中的 _id) WITH ('connector' = 'elasticsearch-6', -- 输出到 Elasticsearch 6'username' = '$username', -- 选填 用户名'password' = '$password', -- 选填 密码'hosts' = 'http://10.28.28.94:9200', -- Elasticsearch 的连接地址'index' = 'my-index', -- Elasticsearch 的 Index 名'document-type' = '_doc', -- Elasticsearch 的 Document 类型'format' = 'json' -- 输出数据格式,目前只支持 'json');
用作 Elasticsearch 7 数据目的(Sink)
CREATE TABLE elasticsearch7_sink_table (`id` INT,`name` STRING,PRIMARY KEY (`id`) NOT ENFORCED -- 对应 Elasticsearch 中的 _id) WITH ('connector' = 'elasticsearch-7', -- 输出到 Elasticsearch 7'username' = '$username', -- 选填 用户名'password' = '$password', -- 选填 密码'hosts' = 'http://10.28.28.94:9200', -- Elasticsearch 的连接地址'index' = 'my-index', -- Elasticsearch 的 Index 名'format' = 'json' -- 输出数据格式,目前只支持 'json');
作为 Elasticsearch 7 批数据源(Source)
CREATE TABLE elasticsearch7_source_table (`id` bigint,`event_date` int,`app` int,primary key (`id`) not enforced) with (-- 必填参数'connector' = 'es-source','endPoint' = '127.0.0.1', -- Elasticsearch 的连接 ip'accessId' = 'elastic', -- 用户名'accessKey' = 'PASSWORD', -- 密码'indexName' = 'my-index', -- Elasticsearch 的 Index 名'format' = 'json', -- 数据格式,只支持 'json'-- 可选参数'scheme' = 'http', -- 连接协议'port' = '9200', -- 端口'batchSize' = '2000', -- 每个 scroll 请求从 Elasticsearch 集群获取的最大文档数'keepScrollAliveSecs' = '60' -- scroll 上下文保留的最长时间,单位为分钟);
WITH 参数
作为数据目的
参数值 | 必填 | 默认值 | 描述 |
connector | 是 | 无 | 当写入 Elasticsearch 6.x 版本时,取值 elasticsearch-6 。当写入 Elasticsearch 7.x 及以上版本时,取值elasticsearch-7 。 |
username | 否 | 无 | 用户名。 |
password | 否 | 无 | 密码。 |
hosts | 是 | 无 | Elasticsearch 的连接地址。 |
index | 是 | 无 | 数据要写入的 Index。支持固定 Index(例如 'myIndex' ),也支持动态 Index(例如'index-{log_ts|yyyy-MM-dd}' )。 |
document-type | 6.x 版本:必填 7.x 版本:不需要 | 无 | Elasticsearch 文档的 Type 信息。当选择 elasticsearch-7 时,不能填写这个字段,否则会报错。 |
document-id.key-delimiter | 否 | _ | 为复合键生成 _id 时的分隔符 (默认是 "_")。例如有 a、b、c 三个主键,某条数据的 a 字段为 "1",b 字段为 "2",c 字段为 "3",使用默认分隔符,则最终写入 Elasticsearch 的 _id 是 "1_2_3"。 |
drop-delete | 否 | false | 是否过滤上游传来的 DELETE(删除)消息。 此外,在多表 LEFT JOIN 且 JOIN Key 非主键的场景下,启用该选项后,可以解决 Elasticsearch 收到较多临时 null 值数据的问题。需要注意的是,JOIN 左右表的字段不能含有 null 值,否则可能会丢失部分数据。 |
failure-handler | 否 | fail | 指定请求 Elasticsearch 失败时,错误处理策略。选项为: fail :抛出一个异常。ignore :忽略错误,直接继续。retry-rejected :重试写入该条记录。另外也支持自定义错误处理器,这里可以填写用户自己编写的 Handler 的类全名(需要上传自定义程序包)。 |
sink.flush-on-checkpoint | 否 | true | Flink 进行快照时,是否等待现有记录完全写入 Elasticsearch 。如果设置为 false,则可能造成恢复时部分数据丢失或者重复等异常情况,但快照速度会提升。 |
sink.bulk-flush.max-actions | 否 | 1000 | 批量写入的最大条数。设置为 0 则禁用批量功能。 |
sink.bulk-flush.max-size | 否 | 2mb | 批量写入缓存的最大容量,必须以 mb 为单位。设置为 0 则禁用批量功能。 |
sink.bulk-flush.interval | 否 | 1s | 批量写入的刷新周期。设置为 0 则禁用批量功能。 |
sink.bulk-flush.backoff.strategy | 否 | DISABLED | 批量写入时,失败重试的策略。 DISABLED :不重试。CONSTANT :等待 sink.bulk-flush.backoff.delay 选项设置的毫秒后重试。EXPONENTIAL :一开始等待 sink.bulk-flush.backoff.delay 选项设置的毫秒后重试,每次失败后将指数增加下次的等待时间。 |
sink.bulk-flush.backoff.max-retries | 否 | 8 | 批量写入时,最多失败重试的次数。 |
sink.bulk-flush.backoff.delay | 否 | 50ms | 批量写入失败时,每次重试之间的等待间隔(对于 CONSTANT 策略而言)或间隔的初始基数(对于 EXPONENTIAL 策略而言)。 |
connection.max-retry-timeout | 否 | 无 | 重试请求的最大超时时间,例如:"20 s"。 |
connection.path-prefix | 否 | 无 | 指定每个 REST 请求的前缀,例如 '/v1' 。通常不需要设置该选项。 |
format | 否 | json | 指定输出的格式,默认是内置的 json 格式,可以使用 前文(Kafka)描述过的 JSON 格式选项,例如 json.fail-on-missing-field 、json.ignore-parse-errors 、json.timestamp-format.standard 等。 |
retry-on-conflict | 否 | 无 |
更新操作中,允许因版本冲突异常而重试的最大次数。超过该次数后将抛出异常导致作业失败。 说明:暂时只支持 Flink-1.13。 |
作为数据源
参数值 | 必填 | 默认值 | 描述 |
connector | 是 | 无 | 固定值 es-source |
endPoint | 是 | 无 | Elasticsearch 的连接 IP,示例 127.0.0.1 |
accessId | 是 | 无 | 用户名 |
accessKey | 是 | 无 | 密码 |
indexName | 是 | 无 | 要读取的 Index |
format | 是 | 无 | 指定读取的格式,只支持内置的 json 格式,可以使用 前文(Kafka)描述过的 JSON 格式选项,例如 json.fail-on-missing-field 、json.ignore-parse-errors 、json.timestamp-format.standard 等。 |
scheme | 否 | http |
Elasticsearch 连接模式,例如 http 、https |
port | 否 | 9200 | Elasticsearch 连接端口 |
batchSize | 否 | 2000 | 每个 scroll 请求从 Elasticsearch 集群获取的最大文档数 |
keepScrollAliveSecs | 否 | 60 | scroll上下文保留的最长时间,单位为分钟 |
代码示例
作为数据目的
CREATE TABLE datagen_source_table (id INT,name STRING) WITH ('connector' = 'datagen','rows-per-second'='1' -- 每秒产生的数据条数);CREATE TABLE elasticsearch7_sink_table (`id` INT,`name` STRING) WITH ('connector' = 'elasticsearch-7', -- 输出到 Elasticsearch 7'username' = '$username', -- 选填 用户名'password' = '$password', -- 选填 密码'hosts' = 'http://10.28.28.94:9200', -- Elasticsearch 的连接地址'index' = 'my-index', -- Elasticsearch 的 Index 名'sink.bulk-flush.max-actions' = '1000', -- 数据刷新频率'sink.bulk-flush.interval' = '1s' -- 数据刷新周期'format' = 'json' -- 输出数据格式,目前只支持 'json');INSERT INTO elasticsearch7_sink_table select * from datagen_source_table;
作为批数据源
CREATE TABLE elasticsearch7_source_table (`id` bigint,`event_date` int,`app` int,primary key (`id`) not enforced) with (-- 必填参数'connector' = 'es-source','endPoint' = '127.0.0.1', -- Elasticsearch 的连接 ip'accessId' = 'elastic', -- 用户名'accessKey' = 'PASSWORD', -- 密码'indexName' = 'my-index', -- Elasticsearch 的 Index 名'format' = 'json', -- 数据格式,只支持 'json'-- 可选参数'scheme' = 'http', -- 连接协议'port' = '9200', -- 端口'batchSize' = '2000', -- 每个 scroll 请求从 Elasticsearch 集群获取的最大文档数'keepScrollAliveSecs' = '60' -- scroll 上下文保留的最长时间,单位为分钟);CREATE TABLE logger_sink (`id` bigint,`event_date` int,`app` int,primary key (`id`) not enforced) with ('connector' = 'logger');INSERT INTO logger_sink SELECT * from elasticsearch7_source_table;
注意事项
如果您希望连接其他版本的 Elasticsearch,请通过附加自定义程序包的方式,上传相应的 Elasticsearch Sink 的 JAR 包。
监控指标说明
numberOfInsertRecords:获取输出的 +I 消息数。
numberOfDeleteRecords:获取输出的 -D 消息数。
numberOfUpdateBeforeRecords:获取输出的 -U 消息数。
numberOfUpdateAfterRecords:获取输出的 +U 消息数。