首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何使用Nifi/HDF从MS读取增量记录

如何使用Nifi/HDF从MS读取增量记录
EN

Stack Overflow用户
提问于 2017-12-01 10:30:55
回答 1查看 921关注 0票数 1

在MS中有几个表,这些表每秒钟更新一次,查询大致如下所示

代码语言:javascript
运行
复制
SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID 
WHERE table2.UpdateTime >= ${lastUpdateTime} AND table2.G_ID > ${lastID}

假设select内部连接查询结果5条记录如下所示。

如果查询第一次运行,则${lastUpdateTime}${lastG_ID}设置为0,并且它将返回5条以下的记录。在处理记录之后,查询将将max(G_ID) (即5)和max(UpdateTime) (即1512010479 )存储在etl_stat表中。

代码语言:javascript
运行
复制
 G_ID       UpdateTime   ID            Name             T_NAME 
-------------------------------------------------------------------
 1          1512010470  12591225      DUMMY_DATA       DUMMY_ID    
 2          1512096873  12591538      DUMMY_DATA       DUMMY_ID    
 3          1512096875  12591539      DUMMY_DATA       DUMMY_ID    
 4          1512010477  12591226      DUMMY_DATA       DUMMY_ID    
 5          1512010479  12591227      DUMMY_DATA       DUMMY_ID    

如果表中还添加5条新记录,如下所示:

代码语言:javascript
运行
复制
 G_ID       UpdateTime   ID            Name             T_NAME 
-------------------------------------------------------------------
 1          1512010470  12591225      DUMMY_DATA       DUMMY_ID    
 2          1512096873  12591538      DUMMY_DATA       DUMMY_ID    
 3          1512096875  12591539      DUMMY_DATA       DUMMY_ID    
 4          1512010477  12591226      DUMMY_DATA       DUMMY_ID    
 5          1512010479  12591227      DUMMY_DATA       DUMMY_ID 
 6          1512010480  12591230      DUMMY_DATA       DUMMY_ID 
 7          1512010485  12591231      DUMMY_DATA       DUMMY_ID 
 8          1512010490  12591232      DUMMY_DATA       DUMMY_ID 
 9          1512010493  12591233      DUMMY_DATA       DUMMY_ID 
 10         1512010500  12591234      DUMMY_DATA       DUMMY_ID 

该查询将首先从max(G_ID)max(UpdateTime)中读取etl_stat table,并按如下所示对查询进行帧化,以便该查询只返回5条增量记录,如下所示。

代码语言:javascript
运行
复制
G_ID        UpdateTime   ID            Name             T_NAME 
-------------------------------------------------------------------
 6          1512010480  12591230      DUMMY_DATA       DUMMY_ID 
 7          1512010485  12591231      DUMMY_DATA       DUMMY_ID 
 8          1512010490  12591232      DUMMY_DATA       DUMMY_ID 
 9          1512010493  12591233      DUMMY_DATA       DUMMY_ID 
 10         1512010500  12591234      DUMMY_DATA       DUMMY_ID 

因此,每次查询运行时,它都应该首先从max(G_ID)表中读取etl_statmax(UpdateTime),并按上面所示对select内部连接查询进行框架化,并获得增量更改。

使用SPARK 的体系结构

我已经按照以下方式实现了上述用例:

1) Spark读取菲尼克斯表,从max(G_ID)表中获取max(UpdateTime)etl_stat表。

2) Spark框架选择内部连接查询,如下面的SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID WHERE table2.UpdateTime >= 1512010479 AND table2.G_ID > 5

3) Spark运行步骤2的内连接查询,从messages读取增量消息,处理记录并插入到HBase中。

4)成功插入HBase后,用最新的G_ID ( 10 )和UpdateTime ( 1512010500 )更新etl_stat表。

5)这项工作计划每1分钟运行一次。

使用NIFI的体系结构

我想将这个用例移到Nifi,我想使用NiFi从MS读取记录,并将此记录发送给Kafka。

在成功发布到Kafka之后,NiFi将把G_ID和UpdateTime保存在数据库中。

一旦消息到达卡夫卡,火花流将读取来自卡夫卡的消息,并使用现有的业务逻辑保存到HBase。

在每次运行时,Nifi处理器都应该使用max(G_ID)max(UpdateTime)来设置帧、选择内部连接查询,以便获取增量记录并发布到Kafka。

我是Nifi/HDF的新手。我需要您的帮助和指导,以便使用Nifi/HDF来实现这一点。如果您对这个用例有更好的解决方案/架构,请建议。

为这么长的职位感到抱歉。

EN

回答 1

Stack Overflow用户

发布于 2017-12-01 11:38:46

您所描述的是JDBC Kafka连接连接器所做的一切。设置您的配置文件,加载它,离开。好了。卡夫卡连接是阿帕奇卡夫卡的一部分。不需要额外的工具和技术。

您可能还需要考虑适当的更改数据捕获(CDC)。对于专有的关系数据库管理系统(Oracle、DB2、MS等),您有一些商业工具,如GoldenGate、Attunity、DBVisit等等。对于开放源码关系数据库管理系统(例如MySQL、PostgreSQL),您应该查看开源德贝兹工具。所有这些CDC工具都直接与Kafka集成。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/47591418

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档