摘要:本文介绍了在 Dinky 中扩展 Phoenix 的 Flink 连接器使用分享。内容包括:
Tips:历史传送门~
《Dinky 0.6.1 已发布,优化 Flink 应用体验》
《Dlink 在 FinkCDC 流式入湖 Hudi 的实践分享》
GitHub 地址
https://github.com/DataLinkDC/dlink
https://gitee.com/DataLinkDC/Dinky
欢迎大家关注 Dinky 的发展~
一、Phoenix 连接器编译
https://github.com/DataLinkDC/dlink
2.参考官网文档进行编译打包
http://www.dlink.top/#/zh-CN/deploy/build
3.找到 connector 包
二、Phoenix 连接器部署
使用方式:
参考官网进行 http://www.dlink.top/#/zh-CN/deploy/deploy?id=%e5%8a%a0%e8%bd%bd%e4%be%9d%e8%b5%96
将phoenix-4.14.2-HBase-1.4-client.jar 中的 servlet ,gson 依赖项排除。
flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar 也是同样删除 servlet 包中的类。
如果遇到新版本其他依赖冲突问题,可参考此方式进行排除。
在这里直接删除了 phoenix-4.14.2-HBase-1.4-client.jar 包中的上述冲突类的包。如下所示:
将排除冲突依赖的 phoenix-4.14.2-HBase-1.4-client.jar 和 phoenix-core-4.14.2-HBase-1.4.jar 放入 Dinky/plugins 目录下。
并确认 Dinky/lib 目录下存在 dlink-connector-phoenix-1.13-0.6.1-SNAPSHOT.jar 的 connector 包
这里使用的Flink版本为1.13.5 dinky plugins 目录下的依赖如下所示:
三、Phoenix 连接器的使用
新建 flink sql studio
创建 flink phoenix 表
CREATE TABLE pv (
sid INT,
ucount BIGINT,
PRIMARY KEY (sid) NOT ENFORCED
) WITH (
'connector.type' = 'phoenix',
'connector.url' = 'jdbc:phoenix:xxxxxx:2181',
'connector.table' = 'TEST.PV',
'connector.driver' = 'org.apache.phoenix.jdbc.PhoenixDriver',
'connector.username' = '',
'connector.password' = '',
'phoenix.schema.isnamespacemappingenabled' = 'true',
'phoenix.schema.mapsystemtablestonamespace' = 'true',
'connector.write.flush.max-rows' = '1'
);
参数解释:
phoenix-connector中拓展了
2.原生 Flink 使用
在Flink中只需要将打包后的phoenix connector 和 原生的phoenix-client , phoenix-core包放入 flink/lib 目录下即可在 sql-client中 使用。
lib下文件如下图所示:
四、Phoenix 连接器 Demo
1.Demo1 求实时 PV 数据
通过模拟数据源,将关联 mysql 维表数据,然后将数据写入 phoenix 中。
由于 phonix 中的 insert 语义是 upsert ,相同的主键数据会覆盖。实现了实时 pv 数据的变动。
SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.allow-latency' = '5 s';
SET 'table.exec.mini-batch.size' = '5000';
SET 'table.optimizer.agg-phase-strategy' = 'TWO_PHASE';
--模拟数据源
CREATE TABLE datagen (
userid int,
proctime as PROCTIME()
) WITH (
'connector' = 'datagen',
'rows-per-second'='10',
'fields.userid.kind'='random',
'fields.userid.min'='1',
'fields.userid.max'='100'
);
--创建mysql lookup维表
CREATE TABLE student (
sid INT,
name STRING,
PRIMARY KEY (sid) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xxxx:3306/test',
'table-name' = 'student',
'username' = 'xxxx',
'password' = 'xxxxx',
'lookup.max-retries' = '1',
'lookup.cache.max-rows' = '1000',
'lookup.cache.ttl' = '60s'
);
--创建 phoenix pv表
CREATE TABLE pv (
sid INT,
ucount BIGINT,
PRIMARY KEY (sid) NOT ENFORCED
) WITH (
'connector.type' = 'phoenix',
'connector.url' = 'jdbc:phoenix:zxbd-test-hbase:2181',
'connector.table' = 'TEST.PV',
'connector.driver' = 'org.apache.phoenix.jdbc.PhoenixDriver',
'connector.username' = '',
'connector.password' = '',
'phoenix.schema.isnamespacemappingenabled' = 'true',
'phoenix.schema.mapsystemtablestonamespace' = 'true',
'connector.write.flush.max-rows' = '30'
);
insert into pv select student.sid as sid ,count(student.sid) as ucount from datagen left join student FOR SYSTEM_TIME AS OF datagen.proctime on student.sid = datagen.userid group by student.sid having student.sid is not null;
保存提交任务,在 sql-client 同样可以执行。
dinky任务提交方式,可以参考官网使用文档:http://www.dlink.top/#/zh-CN/administrator_guide/studio/job_dev/flinksql_guide/flinksql_job_submit
2.Demo2 Mysql CDC 数据实时同步 Phoenix
通过 flink 的 cdc 能力,将 mysql 中的数据实时同步至 phoenix 中。
SET 'execution.checkpointing.interval' = '30s';
CREATE TABLE student (
sid INT,
name STRING,
PRIMARY KEY (sid) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'xxxxx',
'port' = 'xxx',
'username' = 'xxxx',
'password' = 'xxxxx',
'database-name' = 'test',
'table-name' = 'student');
CREATE TABLE pstudent (
sid INT,
name STRING,
PRIMARY KEY (sid) NOT ENFORCED
) WITH (
'connector.type' = 'phoenix',
'connector.url' = 'jdbc:phoenix:xxxx:2181',
'connector.table' = 'TEST.PSTUDENT',
'connector.driver' = 'org.apache.phoenix.jdbc.PhoenixDriver',
'connector.username' = '',
'connector.password' = '',
'phoenix.schema.isnamespacemappingenabled' = 'true',
'phoenix.schema.mapsystemtablestonamespace' = 'true',
'connector.write.flush.max-rows' = '1'
);
insert into pstudent select * from student;
保存提交任务。