在MS中有几个表,这些表每秒钟更新一次,查询大致如下所示
SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID
WHERE table2.UpdateTime >= ${lastUpdateTime} AND table2.G_ID > ${lastID}
假设select内部连接查询结果5条记录如下所示。
如果查询第一次运行,则${lastUpdateTime}
和${lastG_ID}
设置为0,并且它将返回5条以下的记录。在处理记录之后,查询将将max(G_ID)
(即5)和max(UpdateTime)
(即1512010479 )存储在etl_stat
表中。
G_ID UpdateTime ID Name T_NAME
-------------------------------------------------------------------
1 1512010470 12591225 DUMMY_DATA DUMMY_ID
2 1512096873 12591538 DUMMY_DATA DUMMY_ID
3 1512096875 12591539 DUMMY_DATA DUMMY_ID
4 1512010477 12591226 DUMMY_DATA DUMMY_ID
5 1512010479 12591227 DUMMY_DATA DUMMY_ID
如果表中还添加5条新记录,如下所示:
G_ID UpdateTime ID Name T_NAME
-------------------------------------------------------------------
1 1512010470 12591225 DUMMY_DATA DUMMY_ID
2 1512096873 12591538 DUMMY_DATA DUMMY_ID
3 1512096875 12591539 DUMMY_DATA DUMMY_ID
4 1512010477 12591226 DUMMY_DATA DUMMY_ID
5 1512010479 12591227 DUMMY_DATA DUMMY_ID
6 1512010480 12591230 DUMMY_DATA DUMMY_ID
7 1512010485 12591231 DUMMY_DATA DUMMY_ID
8 1512010490 12591232 DUMMY_DATA DUMMY_ID
9 1512010493 12591233 DUMMY_DATA DUMMY_ID
10 1512010500 12591234 DUMMY_DATA DUMMY_ID
该查询将首先从max(G_ID)
和max(UpdateTime)
中读取etl_stat table
,并按如下所示对查询进行帧化,以便该查询只返回5条增量记录,如下所示。
G_ID UpdateTime ID Name T_NAME
-------------------------------------------------------------------
6 1512010480 12591230 DUMMY_DATA DUMMY_ID
7 1512010485 12591231 DUMMY_DATA DUMMY_ID
8 1512010490 12591232 DUMMY_DATA DUMMY_ID
9 1512010493 12591233 DUMMY_DATA DUMMY_ID
10 1512010500 12591234 DUMMY_DATA DUMMY_ID
因此,每次查询运行时,它都应该首先从max(G_ID)
表中读取etl_stat
和max(UpdateTime)
,并按上面所示对select内部连接查询进行框架化,并获得增量更改。
使用SPARK 的体系结构
我已经按照以下方式实现了上述用例:
1) Spark读取菲尼克斯表,从max(G_ID)
表中获取max(UpdateTime)
和etl_stat
表。
2) Spark框架选择内部连接查询,如下面的SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID WHERE table2.UpdateTime >= 1512010479 AND table2.G_ID > 5
3) Spark运行步骤2的内连接查询,从messages读取增量消息,处理记录并插入到HBase中。
4)成功插入HBase后,用最新的G_ID
( 10 )和UpdateTime
( 1512010500 )更新etl_stat
表。
5)这项工作计划每1分钟运行一次。
使用NIFI的体系结构
我想将这个用例移到Nifi,我想使用NiFi从MS读取记录,并将此记录发送给Kafka。
在成功发布到Kafka之后,NiFi将把G_ID和UpdateTime保存在数据库中。
一旦消息到达卡夫卡,火花流将读取来自卡夫卡的消息,并使用现有的业务逻辑保存到HBase。
在每次运行时,Nifi处理器都应该使用max(G_ID)
和max(UpdateTime)
来设置帧、选择内部连接查询,以便获取增量记录并发布到Kafka。
我是Nifi/HDF的新手。我需要您的帮助和指导,以便使用Nifi/HDF来实现这一点。如果您对这个用例有更好的解决方案/架构,请建议。
为这么长的职位感到抱歉。
发布于 2017-12-01 11:38:46
您所描述的是JDBC Kafka连接连接器所做的一切。设置您的配置文件,加载它,离开。好了。卡夫卡连接是阿帕奇卡夫卡的一部分。不需要额外的工具和技术。
您可能还需要考虑适当的更改数据捕获(CDC)。对于专有的关系数据库管理系统(Oracle、DB2、MS等),您有一些商业工具,如GoldenGate、Attunity、DBVisit等等。对于开放源码关系数据库管理系统(例如MySQL、PostgreSQL),您应该查看开源德贝兹工具。所有这些CDC工具都直接与Kafka集成。
https://stackoverflow.com/questions/47591418
复制相似问题