介绍
HBase Connector 提供了对 HBase 集群的读写支持。Oceanus 已经提供了内置的
flink-connector-hbase
Connector 组件,具体使用可参考 使用 MySQL 关联 HBase 维表数据到 ClickHouse 进一步了解。版本说明
Flink 版本 | 说明 |
1.11 | 支持 hbase 版本为:1.4.x |
1.13 | 支持 hbase 版本为:1.4.x、2.2.x、2.3.x |
1.14 | 支持 hbase 版本为:1.4.x、2.2.x |
1.16 | 支持 hbase 版本为: 1.4.x、2.2.x |
适用范围
可以作为源表,维表,以及Tuple、Upsert 数据流的目的表。
DDL 定义
CREATE TABLE hbase_table (rowkey INT,cf ROW < school_name STRING >,PRIMARY KEY (rowkey) NOT ENFORCED) WITH ('connector' = 'hbase-1.4', -- Flink 1.13 支持 hbase-2.2'table-name' = 'hbase_sink_table', -- Hbase 表名'zookeeper.quorum' = 'ip:port,ip:port,ip:port' -- Hbase 的 zookeeper 地址);
WITH 参数
参数 | 说明 | 是否必填 | 备注 |
connector | 表类型 | 是 | hbase-1.4 或者 hbase-2.2 如果您用了 hbase 2.3.x 版本,那么,connector 参数值需要替换为 hbase-2.2 |
table-name | HBase 表名 | 是 | - |
zookeeper.quorum | HBase 的 zookeeper 地址 | 是 | 查看 hbase-site.xml 确定参数值 |
zookeeper.znode.parent | HBase 在 zookeeper 中的根目录 | 否 | 查看 hbase-site.xml 确定参数值 |
null-string-literal | HBase 字段类型为字符串时,如果 Flink 字段数据为 null,则将该字段赋值为 null-string-literal,并写入 HBase | 否 | 默认为 null |
sink.buffer-flush.max-size | 写入 HBase 前,内存中缓存的数据量(字节)大小。调大该值有利于提高 HBase 写入性能,但会增加写入延迟和内存使用。仅作为 Sink 时使用 | 否 | 默认值为2MB,支持字节单位 B、KB、MB 和 GB,不区分大小写。设置为0表示不进行缓存 |
sink.buffer-flush.max-rows | 写入 HBase 前,内存中缓存的数据条数。调大该值有利于提高 HBase 写入性能,但会增加写入延迟和内存使用。仅作为 Sink 时使用 | 否 | 默认值为1000,设置为0表示不进行缓存 |
sink.buffer-flush.interval | 将缓存数据周期性写入到 HBase 的间隔,可以控制写入 HBase 的延迟。仅作为 Sink 时使用。 | 否 | 默认值为1秒,支持时间单位 ms、s、min、h 和 d。设置为0表示关闭定期写入 |
类型映射
HBase 将所有的数据存为字节数组。读写操作时需要将数据进行序列化和反序列化。Flink 与 HBase 的数据转换关系如下:
Flink 字段类型 | HBase 转换 |
CHAR / VARCHAR / STRING | byte[] toBytes(String s) String toString(byte[] b) |
BOOLEAN | byte[] toBytes(boolean b)boolean toBoolean(byte[] b) |
BINARY / VARBINARY | byte[] |
DECIMAL | byte[] toBytes(BigDecimal v)BigDecimal toBigDecimal(byte[] b) |
TINYINT | new byte[] { val } bytes[0] |
SMALLINT | byte[] toBytes(short val)short toShort(byte[] bytes) |
INT | byte[] toBytes(int val)int toInt(byte[] bytes) |
BIGINT | byte[] toBytes(long val)long toLong(byte[] bytes) |
FLOAT | byte[] toBytes(float val)float toFloat(byte[] bytes) |
DOUBLE | byte[] toBytes(double val)double toDouble(byte[] bytes) |
DATE | 将日期转换成自1970.01.01以来的天数,用 int 表示,并通过 byte[] toBytes(int val) 转换成字节数组 |
TIME | 将时间转换成自00:00:00以来的毫秒数,用 int 表示,并通过 byte[] toBytes(int val) 转换成字节数组 |
TIMESTAMP | 将时间戳转换成自1970-01-01 00:00:00以来的毫秒数,用 long 表示,并通过 byte[] toBytes(long val) 转换成字节数组 |
ARRAY | 不支持 |
MAP / MULTISET | 不支持 |
ROW | 不支持 |
代码示例
包含 HBase 维表的实时计算作业代码,示例如下:
CREATE TABLE datagen_source_table (id INT,name STRING,`proc_time` AS PROCTIME()) with ('connector'='datagen','rows-per-second'='1');CREATE TABLE hbase_table (rowkey INT,cf ROW < school_name STRING >,PRIMARY KEY (rowkey) NOT ENFORCED) WITH ('connector' = 'hbase-1.4', -- Flink 1.13 支持 hbase-2.2'table-name' = 'hbase_sink_table', -- Hbase 表名'zookeeper.quorum' = 'ip:port,ip:port,ip:port' -- Hbase 的 zookeeper 地址);CREATE TABLE blackhole_sink(id INT,name STRING) with ('connector' = 'blackhole');INSERT INTO blackhole_sinkSELECT id, cf.school_name as name FROM datagen_source_table srcJOIN hbase_table FOR SYSTEM_TIME AS OF src.`proc_time` as h ON src.id = h.rowkey;
注意事项
HBase Connector 一般会使用 DDL 语句中定义的主键,以
upsert
模式工作,与外部系统交换变更日志信息。因此,必须在 HBase 的 rowkey 字段上定义主键(必须声明 rowkey 字段)。如果未声明 PRIMARY KEY 子句,则 HBase 连接器默认将 rowkey 作为主键。Kerberos 认证授权
1. 登录集群 Master 节点,获取 krb5.conf、emr.keytab、core-site.xml、hdfs-site.xml、hbase-site.xml 文件,路径如下。
/etc/krb5.conf/var/krb5kdc/emr.keytab/usr/local/service/hadoop/etc/hadoop/core-site.xml/usr/local/service/hadoop/etc/hadoop/hdfs-site.xml/usr/local/service/hbase/conf/hbase-site.xml
2. 对获取的配置文件构建 jar 包。
jar cvf hbase-xxx.jar krb5.conf emr.keytab core-site.xml hdfs-site.xml hbase-site.xml
3. 校验 jar 的结构(可以通过 vim 命令查看),jar 里面包含如下信息,请确保文件不缺失且结构正确。
META-INF/META-INF/MANIFEST.MFkrb5.confemr.keytabcore-site.xmlhdfs-site.xmlhbase-site.xml
4. 在 程序包管理 页面上传 jar 包,并在作业参数配置里引用该程序包。
5. 获取 kerberos principal,用于 作业高级参数 配置。
klist -kt /var/krb5kdc/emr.keytab# 输出如下所示,选取第一个即可:hadoop/172.28.28.51@EMR-OQPO48B9KVNO Timestamp Principal---- ------------------- ------------------------------------------------------2 08/09/2021 15:34:40 hadoop/172.28.28.51@EMR-OQPO48B92 08/09/2021 15:34:40 HTTP/172.28.28.51@EMR-OQPO48B92 08/09/2021 15:34:40 hadoop/VM-28-51-centos@EMR-OQPO48B92 08/09/2021 15:34:40 HTTP/VM-28-51-centos@EMR-OQPO48B9
6. 作业高级参数 配置。
containerized.taskmanager.env.HADOOP_USER_NAME: hadoopcontainerized.master.env.HADOOP_USER_NAME: hadoopsecurity.kerberos.login.principal: hadoop/172.28.28.51@EMR-OQPO48B9security.kerberos.login.keytab: emr.keytabsecurity.kerberos.login.conf: krb5.conf
注意:
历史 Oceanus 集群可能不支持该功能,您可以联系我们升级集群管控服务,以支持 Kerberos 访问。