介绍
Upsert Kafka 连接器支持以 upsert 方式从 Kafka topic 中读取数据并将数据写入 Kafka topic。
作为 Source,Upsert Kafka 连接器生产 changelog 流,其中每条数据记录代表一个更新或删除事件。更准确地说,数据记录中的 value 被解释为同一 key 的最后一个 value 的 UPDATE,如果有这个 key(如果不存在相应的 key,则该更新被视为 INSERT)。用表来类比,changelog 流中的数据记录被解释为 UPSERT,也称为 INSERT/UPDATE,因为任何具有相同 key 的现有行都被覆盖。另外,value 为空的消息将会被视作为 DELETE 消息。
作为 Sink,Upsert Kafka 连接器可以消费 changelog 流。它会将 INSERT/UPDATE_AFTER 数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入(表示对应 key 的消息被删除)。Flink 将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。
版本说明
Flink 版本 | 说明 |
1.11 | 不支持 |
1.13 | 支持 |
1.14 | 支持 |
1.16 | 支持 |
DDL 定义
CREATE TABLE kafka_upsert_sink_table (id INT,name STRING,PRIMARY KEY (id) NOT ENFORCED) WITH (-- 定义 Upsert Kafka 参数'connector' = 'upsert-kafka', -- 选择 connector'topic' = 'topic', -- 替换为您要写入的 Topic'properties.bootstrap.servers' = '...', -- 替换为您的 Kafka 连接地址'key.format' = 'json', -- 定义 key 数据格式'value.format' = 'json' -- 定义value 数据格式);
说明
Upsert Kafka 确保在 DDL 中定义主键。
WITH 参数
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
connector | 必选 | (none) | String | 指定要使用的连接器,Upsert Kafka 连接器使用: 'upsert-kafka' 。 |
topic | 必选 | (none) | String | 用于读取和写入的 Kafka topic 名称。 |
properties.bootstrap.servers | 必选 | (none) | String | 以逗号分隔的 Kafka brokers 列表。 |
properties.* | 可选 | (none) | String | Flink 会自动移除选项名中的 "properties." 前缀,并将转换后的键名以及值传入 KafkaClient。例如,您可以通过 'properties.allow.auto.create.topics' = 'false
来禁止自动创建 topic。 但是,某些选项,例如 'key.deserializer' 和 'value.deserializer' 是不允许通过该方式传递参数,因为 Flink 会重写这些参数的值。 |
key.format | 必选 | (none) | String | 用于对 Kafka 消息中 key 部分序列化和反序列化的格式。key 字段由 PRIMARY KEY 语法指定。支持的格式包括 'csv' 、'json' 、'avro' 。 |
key.fields-prefix | optional | (none) | String | 为'key.fields'的所有字段定义自定义前缀,以避免与 'value.fields' 字段名称冲突。默认情况下,前缀为空。如果定义了自定义前缀,则表 schema 和 'key.fields' 将使用前缀名称。构建'key.fields'格式的数据类型时候,将删除前缀并使用 key format 中非前缀名称。请注意,此选项要求 'value.fields-include' 必须设置为 'EXCEPT_KEY' 。 |
value.format | 必选 | (none) | String | 用于对 Kafka 消息中 value 部分序列化和反序列化的格式。支持的格式包括 'csv' 、'json' 、'avro' 。 |
value.fields-include | 可选 | 'ALL' | String | 控制哪些字段应该出现在 value 中。可取值: ALL:消息的 value 部分将包含 schema 中所有的字段,包括定义为主键的字段。 EXCEPT_KEY:记录的 value 部分包含 schema 的所有字段,定义为主键的字段除外。 |
sink.parallelism | 可选 | (none) | Integer | 定义 upsert-kafka sink 算子的并行度。默认情况下,由框架确定并行度,与上游链接算子的并行度保持一致。 |
sink.buffer-flush.max-rows | 可选 | 0 | Integer | 缓存刷新前,最多能缓存多少条记录。当 sink 收到很多同 key 上的更新时,缓存将保留同 key 的最后一条记录,因此 sink 缓存能帮助减少发往 Kafka topic 的数据量,以及避免发送潜在的 tombstone 消息。 可以通过设置为 '0' 来禁用它。默认,该选项是未开启的。注意,如果要开启 sink 缓存,需要同时设置 'sink.buffer-flush.max-rows' 和 'sink.buffer-flush.interval' 两个选项为大于零的值。 |
sink.buffer-flush.interval | 可选 | 0 | Duration | 缓存刷新的间隔时间,超过该时间后异步线程将刷新缓存数据。当 sink 收到很多同 key 上的更新时,缓存将保留同 key 的最后一条记录,因此 sink 缓存能帮助减少发往 Kafka topic 的数据量,以及避免发送潜在的 tombstone 消息。 可以通过设置为 '0' 来禁用它。默认,该选项是未开启的。注意,如果要开启 sink 缓存,需要同时设置 'sink.buffer-flush.max-rows' 和 'sink.buffer-flush.interval' 两个选项为大于零的值。 |
代码示例
CREATE TABLE `kafka_json_source_table` (`id` INT,`name` STRING) WITH (-- 定义 Kafka 参数'connector' = 'kafka','topic' = 'Data-Input', -- 替换为您要消费的 Topic'scan.startup.mode' = 'latest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种'properties.bootstrap.servers' = '172.28.28.13:9092', -- 替换为您的 Kafka 连接地址'properties.group.id' = 'testGroup', -- 必选参数, 一定要指定 Group ID-- 定义数据格式 (JSON 格式)'format' = 'json','json.fail-on-missing-field' = 'false', -- 如果设置为 false, 则遇到缺失字段不会报错。'json.ignore-parse-errors' = 'true' -- 如果设置为 true,则忽略任何解析报错。);CREATE TABLE kafka_upsert_sink_table (id INT,name STRING,PRIMARY KEY (id) NOT ENFORCED) WITH (-- 定义 Upsert Kafka 参数'connector' = 'upsert-kafka', -- 选择 connector'topic' = 'topic', -- 替换为您要消费的 Topic'properties.bootstrap.servers' = '...', -- 替换为您的 Kafka 连接地址'key.format' = 'json', -- 定义 key 数据格式'value.format' = 'json' -- 定义value 数据格式);-- 计算 pv、uv 并插入到 upsert-kafka sinkINSERT INTO kafka_upsert_sink_tableSELECT * FROM kafka_json_source_table;
SASL 认证授权
SASL/PLAIN 用户名密码认证授权
1. 参考 消息队列 CKafka - 配置 ACL 策略,设置 Topic 按用户名密码访问的 SASL_PLAINTEXT 认证方式。
2. 参考 消息队列 CKafka - 添加路由策略,选择 SASL_PLAINTEXT 接入方式,并以该接入方式下的网络地址访问 Topic。
3. 作业配置 with 参数。
CREATE TABLE `YourTable` (...) WITH (...'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="ckafka-xxxxxxxx#YourUserName" password="YourPassword";','properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.mechanism' = 'PLAIN',...);
说明
username
是实例 ID
+ #
+ 刚配置的用户名
,password
是刚配置的用户密码。SASL/GSSAPI Kerberos 认证授权
腾讯云 CKafka 暂时不支持 Kerberos 认证,您的自建 Kafka 如果开启了 Kerberos 认证,可参考如下步骤配置作业。
1. 获取您的自建 Kafka 集群的 Kerberos 配置文件,如果您基于腾讯云 EMR 集群自建,获取 krb5.conf、emr.keytab 文件,路径如下。
/etc/krb5.conf/var/krb5kdc/emr.keytab
2. 对步骤1中获取的文件打 jar 包。
jar cvf kafka-xxx.jar krb5.conf emr.keytab
3. 校验 jar 的结构(可以通过 vim 命令查看 vim kafka-xxx.jar),jar 里面包含如下信息,请确保文件不缺失且结构正确。
META-INF/META-INF/MANIFEST.MFemr.keytabkrb5.conf
4. 在 程序包管理 页面上传 jar 包,并在作业参数配置里引用该程序包。
5. 获取 kerberos principal,用于作业 高级参数 配置。
klist -kt /var/krb5kdc/emr.keytab# 输出如下所示,选取第一个即可:hadoop/172.28.28.51@EMR-OQPO48B9KVNO Timestamp Principal---- ------------------- ------------------------------------------------------2 08/09/2021 15:34:40 hadoop/172.28.28.51@EMR-OQPO48B92 08/09/2021 15:34:40 HTTP/172.28.28.51@EMR-OQPO48B92 08/09/2021 15:34:40 hadoop/VM-28-51-centos@EMR-OQPO48B92 08/09/2021 15:34:40 HTTP/VM-28-51-centos@EMR-OQPO48B9
6. 作业 with 参数配置。
CREATE TABLE `YourTable` (...) WITH (...'properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.mechanism' = 'GSSAPI','properties.sasl.kerberos.service.name' = 'hadoop',...);
说明:
参数
properties.sasl.kerberos.service.name
的值必须与您选取的 principal 匹配,如果您选择的为 hadoop/${IP}@EMR-OQPO48B9
,那么取值为 hadoop。7. 作业 高级参数 配置。
security.kerberos.login.principal: hadoop/172.28.2.13@EMR-4K3VR5FDsecurity.kerberos.login.keytab: emr.keytabsecurity.kerberos.login.conf: krb5.confsecurity.kerberos.login.contexts: KafkaClientfs.hdfs.hadoop.security.authentication: kerberos