前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Dinky 扩展 Phoenix 连接器使用分享

Dinky 扩展 Phoenix 连接器使用分享

作者头像
文末丶
发布2022-05-19 10:35:57
1K0
发布2022-05-19 10:35:57
举报
文章被收录于专栏:DataLink数据中台

摘要:本文介绍了在 Dinky 中扩展 Phoenix 的 Flink 连接器使用分享。内容包括:

  1. Phoenix 连接器编译
  2. Phoenix 连接器部署
  3. Phoenix 连接器使用
  4. Phoenix 连接器 Demo

Tips:历史传送门~

Dinky 0.6.1 已发布,优化 Flink 应用体验

Dinky在Kubernetes的实践分享

Dinky在IDEA远程调试实践分享

Dlink 在 FinkCDC 流式入湖 Hudi 的实践分享

GitHub 地址

https://github.com/DataLinkDC/dlink

https://gitee.com/DataLinkDC/Dinky

欢迎大家关注 Dinky 的发展~

一、Phoenix 连接器编译

1.下载源码

https://github.com/DataLinkDC/dlink

2.参考官网文档进行编译打包

http://www.dlink.top/#/zh-CN/deploy/build

3.找到 connector 包

二、Phoenix 连接器部署

使用方式:

  • 2.1 Flink 中使用 通过 flink 启动的 flink 任务,例如 flink session 任务,需要将 dlink-connector-phoenix-1.13-xxx.jar 和 phoenix-4.14.2-HBase-1.4-client.jar 、phoenix-core-4.14.2-HBase-1.4.jar 放入flink/lib 目录下,可以直接在sql中建表使用。
  • 2.2 dinky 中使用集成 可用于 yarn-perjob 等方式集群任务提交等,也就是通过 dinky 提交任务,此方式需要排除 phoenix-4.14.2-HBase-1.4-client.jar 与 dinky 中冲突的依赖 servlet , gson类。

1.排除冲突依赖

参考官网进行 http://www.dlink.top/#/zh-CN/deploy/deploy?id=%e5%8a%a0%e8%bd%bd%e4%be%9d%e8%b5%96

将phoenix-4.14.2-HBase-1.4-client.jar 中的 servlet ,gson 依赖项排除。

flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar 也是同样删除 servlet 包中的类。

如果遇到新版本其他依赖冲突问题,可参考此方式进行排除。

在这里直接删除了 phoenix-4.14.2-HBase-1.4-client.jar 包中的上述冲突类的包。如下所示:

将排除冲突依赖的 phoenix-4.14.2-HBase-1.4-client.jar 和 phoenix-core-4.14.2-HBase-1.4.jar 放入 Dinky/plugins 目录下。

并确认 Dinky/lib 目录下存在 dlink-connector-phoenix-1.13-0.6.1-SNAPSHOT.jar 的 connector 包

这里使用的Flink版本为1.13.5 dinky plugins 目录下的依赖如下所示:

三、Phoenix 连接器的使用

1.Dinky 中使用

新建 flink sql studio

创建 flink phoenix 表

代码语言:javascript
复制
CREATE TABLE pv (
    sid INT,
    ucount BIGINT,
    PRIMARY KEY (sid) NOT ENFORCED
) WITH (
  'connector.type' = 'phoenix', 
  'connector.url' = 'jdbc:phoenix:xxxxxx:2181', 
  'connector.table' = 'TEST.PV',  
  'connector.driver' = 'org.apache.phoenix.jdbc.PhoenixDriver', 
  'connector.username' = '', 
  'connector.password' = '',
  'phoenix.schema.isnamespacemappingenabled' = 'true',
  'phoenix.schema.mapsystemtablestonamespace' = 'true',
  'connector.write.flush.max-rows' = '1'
 );

参数解释:

phoenix-connector中拓展了

  • 'phoenix.schema.isnamespacemappingenabled' = 'true',
  • 'phoenix.schema.mapsystemtablestonamespace' = 'true'两个参数 用于连接开启 schema 配置的 phoenix ,如果未开启则设置为 false。
  • 'connector.write.flush.max-rows' 参数为写入的数据批次条数,如果想提升写入 phoenix 性能可以设置较大。
  • 'connector.table' = 'TEST.PV', TEST为 phoenix 中 schema 名 PV 为phoenix 表名 3.2 原生 Flink 使用**

2.原生 Flink 使用

在Flink中只需要将打包后的phoenix connector 和 原生的phoenix-client , phoenix-core包放入 flink/lib 目录下即可在 sql-client中 使用。

lib下文件如下图所示:

四、Phoenix 连接器 Demo

1.Demo1 求实时 PV 数据

通过模拟数据源,将关联 mysql 维表数据,然后将数据写入 phoenix 中。

由于 phonix 中的 insert 语义是 upsert ,相同的主键数据会覆盖。实现了实时 pv 数据的变动。

代码语言:javascript
复制
SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.allow-latency' = '5 s';
SET 'table.exec.mini-batch.size' = '5000';
SET 'table.optimizer.agg-phase-strategy' = 'TWO_PHASE';

--模拟数据源
CREATE TABLE datagen (
 userid int,
 proctime as PROCTIME()
) WITH (
 'connector' = 'datagen',
 'rows-per-second'='10',
 'fields.userid.kind'='random',
 'fields.userid.min'='1',
 'fields.userid.max'='100'
);

--创建mysql lookup维表
CREATE TABLE student (
    sid INT,
    name STRING,
    PRIMARY KEY (sid) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://xxxx:3306/test',
   'table-name' = 'student',
   'username' = 'xxxx',
   'password' = 'xxxxx',
   'lookup.max-retries' = '1',
   'lookup.cache.max-rows' = '1000',
   'lookup.cache.ttl' = '60s' 
);

--创建 phoenix pv表
CREATE TABLE pv (
    sid INT,
    ucount BIGINT,
    PRIMARY KEY (sid) NOT ENFORCED
) WITH (
  'connector.type' = 'phoenix', 
  'connector.url' = 'jdbc:phoenix:zxbd-test-hbase:2181', 
  'connector.table' = 'TEST.PV',  
  'connector.driver' = 'org.apache.phoenix.jdbc.PhoenixDriver', 
  'connector.username' = '', 
  'connector.password' = '',
  'phoenix.schema.isnamespacemappingenabled' = 'true',
  'phoenix.schema.mapsystemtablestonamespace' = 'true',
  'connector.write.flush.max-rows' = '30'
 );

insert into pv select student.sid as sid ,count(student.sid) as ucount from datagen left join student FOR SYSTEM_TIME AS OF datagen.proctime on student.sid = datagen.userid group by student.sid having student.sid is not null;

保存提交任务,在 sql-client 同样可以执行。

dinky任务提交方式,可以参考官网使用文档:http://www.dlink.top/#/zh-CN/administrator_guide/studio/job_dev/flinksql_guide/flinksql_job_submit

2.Demo2 Mysql CDC 数据实时同步 Phoenix

通过 flink 的 cdc 能力,将 mysql 中的数据实时同步至 phoenix 中。

代码语言:javascript
复制
SET 'execution.checkpointing.interval' = '30s'; 

CREATE TABLE student (
    sid INT,
    name STRING,
    PRIMARY KEY (sid) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'xxxxx',
'port' = 'xxx',
'username' = 'xxxx',
'password' = 'xxxxx',
'database-name' = 'test',
'table-name' = 'student');


CREATE TABLE pstudent (
    sid INT,
    name STRING,
    PRIMARY KEY (sid) NOT ENFORCED
) WITH (
  'connector.type' = 'phoenix', 
  'connector.url' = 'jdbc:phoenix:xxxx:2181', 
  'connector.table' = 'TEST.PSTUDENT',  
  'connector.driver' = 'org.apache.phoenix.jdbc.PhoenixDriver', 
  'connector.username' = '', 
  'connector.password' = '',
  'phoenix.schema.isnamespacemappingenabled' = 'true',
  'phoenix.schema.mapsystemtablestonamespace' = 'true',
  'connector.write.flush.max-rows' = '1'
 );

insert into pstudent select * from student;

保存提交任务。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-04-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Dinky开源 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.下载源码
  • 1.排除冲突依赖
  • 1.Dinky 中使用
相关产品与服务
TDSQL MySQL 版
TDSQL MySQL 版(TDSQL for MySQL)是腾讯打造的一款分布式数据库产品,具备强一致高可用、全球部署架构、分布式水平扩展、高性能、企业级安全等特性,同时提供智能 DBA、自动化运营、监控告警等配套设施,为客户提供完整的分布式数据库解决方案。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档