版本说明
Flink 版本 | 说明 |
1.11 | 不支持 |
1.13 | 支持 Source 和 Sink |
1.14 | 支持 Source 和 Sink |
1.16 | 支持 Source 和 Sink |
1.18 | 支持 Source 和 Sink |
1.20 | 支持 Source 和 Sink |
使用范围
可以作为 Source/Sink 使用。其中,作为 source 使用时,不支持 upsert 写入的源 Iceberg。
DDL 定义
用作数据目的:
CREATE TABLE `sink` (`id` bigint,`YCSB_KEY` string,`FIELD0` string,`FIELD1` string,`FIELD2` string,`database_name` string,`table_name` string,`op_ts` timestamp(3),`date` string) PARTITIONED BY (`date`) WITH ('connector' = 'iceberg-1.1', -- 在flink 1.13和flink 1.16使用iceberg-1.1,在flink 1.18和1.20使用iceberg'write.upsert.enabled'='false', -- 是否开启upsert'catalog-type' = 'hive','catalog-name'='xxx','catalog-database'='xxx','catalog-table'='xxx','warehouse' = 'hdfs://HDFS14979/usr/hive/warehouse',-- Hive metastore 的 thrift URI,可以从hive-site.xml配置文件中获取,对应的Key为:hive-metastore-uris'uri'='thrift://ip:port','engine.hive.enabled' = 'true','format-version' = '2');
作为数据源:
CREATE TABLE `icesource` (`id` bigint,`YCSB_KEY` string,`FIELD0` string,`FIELD1` string,`FIELD2` string,`database_name` string,`table_name` string,`op_ts` timestamp(3),PRIMARY KEY(id) NOT ENFORCED) WITH ('connector' = 'iceberg-1.1', -- 在flink 1.13和flink 1.16使用iceberg-1.1,在flink 1.18和1.20使用iceberg'catalog-name' = 'hive_catalog','catalog-type' = 'hive','catalog-database' = 'database_ta','catalog-table' = 't_p1_hive3_avro_3','warehouse'='hdfs://HDFS14979/usr/hive/warehouse','engine.hive.enabled' = 'true','format-version' = '2','streaming'='true','monitor-interval'='10',-- Hive metastore 的 thrift URI,可以从hive-site.xml配置文件中获取,对应的Key为:hive-metastore-uris'uri'='thrift://ip:port');
WITH 参数
通用参数
参数值 | 必填 | 默认值 | 描述 |
connector | 是 | 无 | 在 flink 1.13和 flink 1.16必须填 iceberg-1.1,在 flink 1.18和1.20必须填 iceberg |
warehouse | 是 | 无 | 数据的存储路径(如果存储到 HDFS,格式为 hdfs:// ;存储为 COS 为 COSN://$bucket/$path) |
catalog-name | 是 | 无 | 自定义的 catalog 名 |
catalog-type | 是 | 无 | catalog 类型,可选值为 Hadoop / Hive / custom |
catalog-database | 是 | 无 | Iceberg 数据库名称 |
catalog-table | 是 | 无 | Iceberg 表名称 |
catalog-impl | 否 | 无 | catalog-type 为 custom 时,必填 |
uri | 否 | 无 | Hive metastore 的 Thrift URI,可以从 hive-site.xml 配置文件中获取,对应的 Key 为:hive-metastore-uris; e.g. thrift://172.28.1.149:7004 |
format-version | 否 | 1 | Iceberg 格式 请参见 Iceberg Table Spec |
COS 配置
无需做额外配置,path 填写为对应的 COSN 路径即可。
HDFS 配置
获取 HDFS 连接配置 jar
Flink SQL 任务写 Iceberg,使用 HDFS 存储时需要使用包含 HDFS 配置信息的 jar 包来连接到 HDFS 集群。具体获取连接配置 jar 及其使用的步骤如下:
1. ssh 登录到对应 HDFS 集群节点。
2. 获取 hdfs-site.xml,EMR 集群中的配置文件在如下位置。
/usr/local/service/hadoop/etc/hadoop/hdfs-site.xml
3. 对获取到的配置文件 打 jar 包。
jar -cvf hdfs-xxx.jar hdfs-site.xml
4. 校验 jar 的结构(可以通过 vi 命令查看 ),jar 里面包含如下信息,请确保文件不缺失且结构正确。
vi hdfs-xxx.jar
META-INF/META-INF/MANIFEST.MFhdfs-site.xml
配置写入 HDFS 的用户
说明
containerized.taskmanager.env.HADOOP_USER_NAME: hadoopcontainerized.master.env.HADOOP_USER_NAME: hadoop
Kerberos 认证授权
1. 登录集群 Master 节点,获取 krb5.conf、emr.keytab、core-site.xml、hdfs-site.xml、hive-site.xml 文件,路径如下:
/etc/krb5.conf/var/krb5kdc/emr.keytab/usr/local/service/hadoop/etc/hadoop/core-site.xml/usr/local/service/hadoop/etc/hadoop/hdfs-site.xml/usr/local/service/hive/conf/hive-site.xml
2. 修改 hive-site.xml 文件。在 hive-site.xml 中增加如下配置,IP 的值取配置文件中
hive.server2.thrift.bind.host 的 value。<property><name>hive.metastore.uris</name><value>thrift://ip:7004</value></property>
3. 获取 hivemetastore-site.xml 和 hiveserver2-site.xml,点击文件名下载。
4. 对获取的配置文件打 jar 包。
jar cvf hive-xxx.jar krb5.conf emr.keytab core-site.xml hdfs-site.xml hive-site.xml hivemetastore-site.xml hiveserver2-site.xml
5. 校验 jar 的结构(可以通过 vim 命令查看 vim hdfs-xxx.jar),jar 里面包含如下信息,请确保文件不缺失且结构正确。
META-INF/META-INF/MANIFEST.MFemr.keytabkrb5.confhdfs-site.xmlcore-site.xmlhive-site.xmlhivemetastore-site.xmlhiveserver2-site.xml
6. 在 程序包管理 页面上传 jar 包,并在作业参数配置里引用该程序包。
7. 获取 kerberos principal,用于 作业高级参数 配置。
klist -kt /var/krb5kdc/emr.keytab# 输出如下所示,选取第一个即可:hadoop/172.28.28.51@EMR-OQPO48B9KVNO Timestamp Principal---- ------------------- ------------------------------------------------------2 08/09/2021 15:34:40 hadoop/172.28.28.51@EMR-OQPO48B92 08/09/2021 15:34:40 HTTP/172.28.28.51@EMR-OQPO48B92 08/09/2021 15:34:40 hadoop/VM-28-51-centos@EMR-OQPO48B92 08/09/2021 15:34:40 HTTP/VM-28-51-centos@EMR-OQPO48B9
8. 作业高级参数 配置。
containerized.taskmanager.env.HADOOP_USER_NAME: hadoopcontainerized.master.env.HADOOP_USER_NAME: hadoopsecurity.kerberos.login.principal: hadoop/172.28.28.51@EMR-OQPO48B9security.kerberos.login.keytab: emr.keytabsecurity.kerberos.login.conf: krb5.conf
说明
security.kerberos.login.keytab 和 security.kerberos.login.conf 的值为对应的文件名。
Oceanus 作业中使用 CHDFS 元数据加速桶的配置指南
以下内容旨在指导用户如何在腾讯云 Oceanus 流计算服务中配置 CHDFS 元数据加速桶,以实现对 CHDFS(腾讯云 Hadoop 分布式文件系统)的高效访问。本指南包括通过 OFS 协议访问 CHDFS 的配置方法、必要的依赖 JAR 包、以及如何在启用 COS Ranger 和 Kerberos 认证的情况下进行配置。
1. 通过 OFS 协议访问 CHDFS
1.1 通用参数配置
在 Oceanus 作业的作业配置中添加以下高级参数,以通过 OFS 协议访问 CHDFS:
# OFS 原生参数加上前缀 flink.hadoopflink.hadoop.fs.AbstractFileSystem.ofs.impl: com.qcloud.chdfs.fs.CHDFSDelegateFSAdapterflink.hadoop.fs.ofs.impl: com.qcloud.chdfs.fs.CHDFSHadoopFileSystemAdapterflink.hadoop.fs.ofs.tmp.cache.dir: /tmp/chdfs/flink.hadoop.fs.ofs.upload.flush.flag: trueflink.hadoop.fs.ofs.bucket.region: ${region} # 修改为对应的地域,例如 ap-guangzhouflink.hadoop.fs.ofs.user.appid: ${appid} # 修改为自己的appid
1.2 依赖 JAR 包
为确保 Oceanus 作业能够正常访问 CHDFS,请确保以下 JAR 包已被添加到作业的依赖中:
gson-2.9.1.jarchdfs_hadoop_plugin_network-2.8.jar2. 开启 COS Ranger 认证
如果 OFS 开启了 COS Ranger 认证,除了上述通用参数和依赖外,还需要进行以下配置:
2.1 参数配置
登录对应的 EMR 集群,确认 COS Ranger 的版本(以下适用于 5.X 版本),并添加以下参数:
# OFS 原生 Ranger 相关参数加上前缀 flink.hadoopflink.hadoop.fs.ofs.ranger.enable.flag: trueflink.hadoop.qcloud.object.storage.ranger.service.address: 172.28.68.126:9999 # 注意替换flink.hadoop.qcloud.object.storage.zk.address: 172.28.68.126:2181 # 注意替换flink.hadoop.qcloud.object.storage.zk.leader.ip.path: /cos/ranger_qcloud_object_storage_leader_ip
参数值获取方法:可以在对应的 EMR 集群的集群服务/HDFS 配置管理中搜索并获取。
2.2 依赖 JAR 包
确保以下 JAR 包已被添加到作业的依赖中:
hadoop-ranger-client-for-flink-2.8.5-5.0.jarcos-ranger-interface-1.0.4.jar3. 开启 Kerberos 认证
如果对应的 EMR 集群开启了 Kerberos 认证,需要进行以下配置:
3.1 参数配置
添加以下参数,以支持 Kerberos 认证:
# OFS 原生 Kerberos 相关参数加上前缀 flink.hadoopflink.hadoop.hadoop.security.authentication: kerberos# 替换成对应的 principal; 这里只能用 hadoop 的 principalflink.hadoop.qcloud.object.storage.kerberos.principal: hadoop/_HOST@EMR-XXXX
3.2 依赖 JAR 包和配置文件
确保 Kerberos 相关的配置文件已被正确获取和配置,具体制作步骤请参考腾讯云官方文档《流计算 Oceanus 数据湖 Iceberg -SQL 开发指南》。
4. 通过 COSN 协议访问元数据加速桶
如果是元数据加速桶,还可以通过 COSN 协议进行访问。此时,所有参数需要加上特定前缀:
flink.hadoop.fs.ofs.trsf
其中
flink.hadoop 为 Flink 转成 Hadoop configuration 需要的前缀,fs.ofs.trsf 为 COSN 访问元数据加速桶需要加的前缀。结语
通过遵循本指南中的步骤,用户可以在腾讯云 Oceanus 服务中配置并使用 CHDFS 元数据加速桶,以提高数据处理效率和稳定性。在配置过程中,请确保所有参数和依赖 JAR 包均已正确设置。