数据湖 Iceberg

最近更新时间:2026-06-17 15:13:17

我的收藏

版本说明

Flink 版本
说明
1.11
不支持
1.13
支持 Source 和 Sink
1.14
支持 Source 和 Sink
1.16
支持 Source 和 Sink
1.18
支持 Source 和 Sink
1.20
支持 Source 和 Sink

使用范围

可以作为 Source/Sink 使用。其中,作为 source 使用时,不支持 upsert 写入的源 Iceberg。

DDL 定义

用作数据目的:
CREATE TABLE `sink` (
`id` bigint,
`YCSB_KEY` string,
`FIELD0` string,
`FIELD1` string,
`FIELD2` string,
`database_name` string,
`table_name` string,
`op_ts` timestamp(3),
`date` string
) PARTITIONED BY (`date`) WITH (
'connector' = 'iceberg-1.1', -- 在flink 1.13和flink 1.16使用iceberg-1.1,在flink 1.18和1.20使用iceberg
'write.upsert.enabled'='false', -- 是否开启upsert
'catalog-type' = 'hive',
'catalog-name'='xxx',
'catalog-database'='xxx',
'catalog-table'='xxx',
'warehouse' = 'hdfs://HDFS14979/usr/hive/warehouse',
-- Hive metastore 的 thrift URI,可以从hive-site.xml配置文件中获取,对应的Key为:hive-metastore-uris
'uri'='thrift://ip:port',
'engine.hive.enabled' = 'true',
'format-version' = '2'
);
作为数据源:
CREATE TABLE `icesource` (
`id` bigint,
`YCSB_KEY` string,
`FIELD0` string,
`FIELD1` string,
`FIELD2` string,
`database_name` string,
`table_name` string,
`op_ts` timestamp(3),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'iceberg-1.1', -- 在flink 1.13和flink 1.16使用iceberg-1.1,在flink 1.18和1.20使用iceberg
'catalog-name' = 'hive_catalog',
'catalog-type' = 'hive',
'catalog-database' = 'database_ta',
'catalog-table' = 't_p1_hive3_avro_3',
'warehouse'='hdfs://HDFS14979/usr/hive/warehouse',
'engine.hive.enabled' = 'true',
'format-version' = '2',
'streaming'='true',
'monitor-interval'='10',
-- Hive metastore 的 thrift URI,可以从hive-site.xml配置文件中获取,对应的Key为:hive-metastore-uris
'uri'='thrift://ip:port'
);

WITH 参数

通用参数

参数值
必填
默认值
描述
connector
在 flink 1.13和 flink 1.16必须填 iceberg-1.1,在 flink 1.18和1.20必须填 iceberg
warehouse
数据的存储路径(如果存储到 HDFS,格式为 hdfs:// ;存储为 COS 为 COSN://$bucket/$path)
catalog-name
自定义的 catalog 名
catalog-type
catalog 类型,可选值为 Hadoop / Hive / custom
catalog-database
Iceberg 数据库名称
catalog-table
Iceberg 表名称
catalog-impl
catalog-type 为 custom 时,必填
uri
Hive metastore 的 Thrift URI,可以从 hive-site.xml 配置文件中获取,对应的 Key 为:hive-metastore-uris; e.g. thrift://172.28.1.149:7004
format-version
1
Iceberg 格式 请参见 Iceberg Table Spec
更多参数请参见 Configuration

COS 配置

无需做额外配置,path 填写为对应的 COSN 路径即可。

HDFS 配置

获取 HDFS 连接配置 jar

Flink SQL 任务写 Iceberg,使用 HDFS 存储时需要使用包含 HDFS 配置信息的 jar 包来连接到 HDFS 集群。具体获取连接配置 jar 及其使用的步骤如下:
1. ssh 登录到对应 HDFS 集群节点。
2. 获取 hdfs-site.xml,EMR 集群中的配置文件在如下位置。
/usr/local/service/hadoop/etc/hadoop/hdfs-site.xml
3. 对获取到的配置文件 打 jar 包。
jar -cvf hdfs-xxx.jar hdfs-site.xml
4. 校验 jar 的结构(可以通过 vi 命令查看 ),jar 里面包含如下信息,请确保文件不缺失且结构正确。
vi hdfs-xxx.jar
META-INF/
META-INF/MANIFEST.MF
hdfs-site.xml

配置写入 HDFS 的用户

说明
Flink 作业默认以 flink 用户操作 HDFS,若没有 HDFS 路径的写入权限,可通过 作业高级参数 设置为有权限的用户,或者设置为超级用户 Hadoo。
containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
containerized.master.env.HADOOP_USER_NAME: hadoop

Kerberos 认证授权

1. 登录集群 Master 节点,获取 krb5.conf、emr.keytab、core-site.xml、hdfs-site.xml、hive-site.xml 文件,路径如下:
/etc/krb5.conf
/var/krb5kdc/emr.keytab
/usr/local/service/hadoop/etc/hadoop/core-site.xml
/usr/local/service/hadoop/etc/hadoop/hdfs-site.xml
/usr/local/service/hive/conf/hive-site.xml
2. 修改 hive-site.xml 文件。在 hive-site.xml 中增加如下配置,IP 的值取配置文件中 hive.server2.thrift.bind.host 的 value。
<property>
<name>hive.metastore.uris</name>
<value>thrift://ip:7004</value>
</property>
3. 获取 hivemetastore-site.xmlhiveserver2-site.xml,点击文件名下载。
4. 对获取的配置文件打 jar 包。
jar cvf hive-xxx.jar krb5.conf emr.keytab core-site.xml hdfs-site.xml hive-site.xml hivemetastore-site.xml hiveserver2-site.xml
5. 校验 jar 的结构(可以通过 vim 命令查看 vim hdfs-xxx.jar),jar 里面包含如下信息,请确保文件不缺失且结构正确。
META-INF/
META-INF/MANIFEST.MF
emr.keytab
krb5.conf
hdfs-site.xml
core-site.xml
hive-site.xml
hivemetastore-site.xml
hiveserver2-site.xml
6. 程序包管理 页面上传 jar 包,并在作业参数配置里引用该程序包。
7. 获取 kerberos principal,用于 作业高级参数 配置。
klist -kt /var/krb5kdc/emr.keytab

# 输出如下所示,选取第一个即可:hadoop/172.28.28.51@EMR-OQPO48B9
KVNO Timestamp Principal
---- ------------------- ------------------------------------------------------
2 08/09/2021 15:34:40 hadoop/172.28.28.51@EMR-OQPO48B9
2 08/09/2021 15:34:40 HTTP/172.28.28.51@EMR-OQPO48B9
2 08/09/2021 15:34:40 hadoop/VM-28-51-centos@EMR-OQPO48B9
2 08/09/2021 15:34:40 HTTP/VM-28-51-centos@EMR-OQPO48B9
8. 作业高级参数 配置。
containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
containerized.master.env.HADOOP_USER_NAME: hadoop
security.kerberos.login.principal: hadoop/172.28.28.51@EMR-OQPO48B9
security.kerberos.login.keytab: emr.keytab
security.kerberos.login.conf: krb5.conf
说明
security.kerberos.login.keytab 和 security.kerberos.login.conf 的值为对应的文件名。

Oceanus 作业中使用 CHDFS 元数据加速桶的配置指南

以下内容旨在指导用户如何在腾讯云 Oceanus 流计算服务中配置 CHDFS 元数据加速桶,以实现对 CHDFS(腾讯云 Hadoop 分布式文件系统)的高效访问。本指南包括通过 OFS 协议访问 CHDFS 的配置方法、必要的依赖 JAR 包、以及如何在启用 COS Ranger 和 Kerberos 认证的情况下进行配置。

1. 通过 OFS 协议访问 CHDFS

1.1 通用参数配置

在 Oceanus 作业的作业配置中添加以下高级参数,以通过 OFS 协议访问 CHDFS:
# OFS 原生参数加上前缀 flink.hadoop
flink.hadoop.fs.AbstractFileSystem.ofs.impl: com.qcloud.chdfs.fs.CHDFSDelegateFSAdapter
flink.hadoop.fs.ofs.impl: com.qcloud.chdfs.fs.CHDFSHadoopFileSystemAdapter
flink.hadoop.fs.ofs.tmp.cache.dir: /tmp/chdfs/
flink.hadoop.fs.ofs.upload.flush.flag: true
flink.hadoop.fs.ofs.bucket.region: ${region} # 修改为对应的地域,例如 ap-guangzhou
flink.hadoop.fs.ofs.user.appid: ${appid} # 修改为自己的appid

1.2 依赖 JAR 包

为确保 Oceanus 作业能够正常访问 CHDFS,请确保以下 JAR 包已被添加到作业的依赖中:
gson-2.9.1.jar
chdfs_hadoop_plugin_network-2.8.jar

2. 开启 COS Ranger 认证

如果 OFS 开启了 COS Ranger 认证,除了上述通用参数和依赖外,还需要进行以下配置:

2.1 参数配置

登录对应的 EMR 集群,确认 COS Ranger 的版本(以下适用于 5.X 版本),并添加以下参数:
# OFS 原生 Ranger 相关参数加上前缀 flink.hadoop
flink.hadoop.fs.ofs.ranger.enable.flag: true
flink.hadoop.qcloud.object.storage.ranger.service.address: 172.28.68.126:9999 # 注意替换
flink.hadoop.qcloud.object.storage.zk.address: 172.28.68.126:2181 # 注意替换
flink.hadoop.qcloud.object.storage.zk.leader.ip.path: /cos/ranger_qcloud_object_storage_leader_ip
参数值获取方法:可以在对应的 EMR 集群的集群服务/HDFS 配置管理中搜索并获取。

2.2 依赖 JAR 包

确保以下 JAR 包已被添加到作业的依赖中:
hadoop-ranger-client-for-flink-2.8.5-5.0.jar
cos-ranger-interface-1.0.4.jar

3. 开启 Kerberos 认证

如果对应的 EMR 集群开启了 Kerberos 认证,需要进行以下配置:

3.1 参数配置

添加以下参数,以支持 Kerberos 认证:
# OFS 原生 Kerberos 相关参数加上前缀 flink.hadoop
flink.hadoop.hadoop.security.authentication: kerberos
# 替换成对应的 principal; 这里只能用 hadoop 的 principal
flink.hadoop.qcloud.object.storage.kerberos.principal: hadoop/_HOST@EMR-XXXX

3.2 依赖 JAR 包和配置文件

确保 Kerberos 相关的配置文件已被正确获取和配置,具体制作步骤请参考腾讯云官方文档《流计算 Oceanus 数据湖 Iceberg -SQL 开发指南》。

4. 通过 COSN 协议访问元数据加速桶

如果是元数据加速桶,还可以通过 COSN 协议进行访问。此时,所有参数需要加上特定前缀:
flink.hadoop.fs.ofs.trsf
其中 flink.hadoop 为 Flink 转成 Hadoop configuration 需要的前缀,fs.ofs.trsf 为 COSN 访问元数据加速桶需要加的前缀。

结语

通过遵循本指南中的步骤,用户可以在腾讯云 Oceanus 服务中配置并使用 CHDFS 元数据加速桶,以提高数据处理效率和稳定性。在配置过程中,请确保所有参数和依赖 JAR 包均已正确设置。