前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >认识 TapFlow,以编程方式运行 TapData

认识 TapFlow,以编程方式运行 TapData

作者头像
Tapdata
发布2025-02-28 14:48:30
发布2025-02-28 14:48:30
6500
代码可运行
举报
文章被收录于专栏:TapdataTapdata
运行总次数:0
代码可运行

什么是TapFlow?

TapFlow 是 TapData Live Data Platform 最新推出的一个面向编程的API 框架。TapFlow 可以让开发者和数据工程师用一个简单易用而又强大的编程语言来进行数据管道和数据模型的开发工作。

这次的发布包括一个 Python 的SDK。TapFlow 需要连接一个 TapData Cluster(可以是企业版,云版或社区版)才能运行。

为何需要编程式的方式?

TapData 目前提供的是一个以可视化拖拉拽方式来构建数据管道,数据开发的UI界面。UI界面在易使用和易运维上有很大的优势,但是在不少地方也有一些局限性。TapFlow 希望能够对这些局限性提供一个有效的补充。

1. 满足开发者的深度定制需求,复杂处理逻辑需要JS和Python代码

在一些复杂脱敏逻辑,或者定制化的数据字段值标准化的时候需要使用不少代码。这个时候如果有频繁的改动,UI界面的代码操作就不会很方便。

2. 更好地支持 CI/CD 和自动化

团队需要定期部署和更新多个环境中的数据集成任务(如开发、测试、生产)。使用编程式 API,可以通过脚本自动完成任务的生成和迁移,轻松与 Git 等版本管理工具集成,而无需人工在 GUI 中操作。

3. 降低复杂场景下的操作成本

经常有用户需要同步 100+ 个数据库表,且每张表有不同的字段映射规则。在 GUI 中手动设置这些规则不仅繁琐,而且易出错,而通过编程式 API 可以实现规则的自动化生成。

4. 面向开发者与技术团队的友好性,更容易集成到工作流

提供面向开发者的工具,更贴近他们的工作方式,更容易和其他业务模块进行集成。

开放代码能力,产品更容易扩展,如增加企业内部的可复用组件等。

TapFlow 使用示例

我们假设我们有一个CRM 应用运行在MySQL 数据库上,下面是这个 MySQL 库的schema:

由于查询性能的考量和一些特定的宽表需求,我们需要将订单数据复制到MongoDB里面来发布一个订单查询API。我们会用 TapFlow 把数据从MySQL 里面复制到MongoDB, 过程中对数据做一些加工处理和合并的操作。

安装 TapFlow

代码语言:javascript
代码运行次数:0
复制
# pip3 install tapflow

TapFlow 的Python SDK 支持两种模式: 以程序方式执行,或在交互模式下运行。接下来我们以交互模式下来展现如何使用 TapFlow API。

代码语言:javascript
代码运行次数:0
复制
# tap

--- Please configure TapData cluster ---

Tap Flow requires TapData Live Data Platform(LDP) cluster to run. 
If you would like to use with TapData Enterprise or TapData Community, type L to continue. 
If you would like to use TapData Cloud, or you are new to TapData, type C or press ENTER to continue.
Please type L or C (L/[C]):

在你输入相应的 TapData 连接信息或者密钥后,就可以开始在交互模式下使用

更多安装信息可以参见 Quick Start 文档: https://docs.tapdata.net/tapflow/quick-start

创建一个源库和目标库连接

代码语言:javascript
代码运行次数:0
复制
tap> mysql_conn = DataSource('mysql', 'MySQL_ECommerce', 
                    {
                    'database': 'ECommerceData',  
                    'port': 3306,                 
                    'host': 'demo.tapdata.io',      
                    'username': 'demouser',  
                    'password': 'demopass'   
                    })
                    .type('source')
                    .save()                  

tap> mongodb_conn = DataSource("mongodb", "MongoDB_ECommerce",
                    {
                        "uri": "mongodb://your_username:your_passwd@192.168.1.18:27017/ecommerce?authSource=admin"
                    })
                    .type("target")
                    .save()

创建一个简单的订单表的复制任务(Data Flow)

目的:将 MySQL 的订单数据同步到 MongoDB 中做统一订单查询

代码语言:javascript
代码运行次数:0
复制
tap> myflow = Flow("simple_data_replication_flow") \
       .read_from("MySQL_ECommerce.ecom_orders") \
       .write_to("MongoDB_ECommerce.orders_replica_collection") \
       .save()
tap> myFlow.start()

过滤一些字段

目标表不需要一些字段

代码语言:javascript
代码运行次数:0
复制
tap> myflow = Flow("order_fields_exclude_flow") \
       .read_from("MySQL_ECommerce.ecom_orders") \
       .exclude("order_delivered_carrier_date", "order_delivered_customer_date")  \
       .write_to("MongoDB_ECommerce.orders_renamed_fields_collection") \
       .save()
tap> myFlow.start()

字段改名

代码语言:javascript
代码运行次数:0
复制
tap> myflow = Flow("order_fields_rename_flow") \
       .read_from("MySQL_ECommerce.ecom_orders") \
       .rename_fields({'order_purchase_timestamp': 'purchase_timestamp'})  \
       .write_to("MongoDB_ECommerce.orders_exclude_fields_collection") \
       .save()
tap> myFlow.start()

用Python 脚本做复杂自定义处理

代码语言:javascript
代码运行次数:0
复制
# 标准化 order_status 的字段值
tap> def pyfunc(record):
    if not record['order_status'] :
        record['order_status']  = 'invalid'
    if record['order_status'] == 'SendError' :
        record['order_status'] = 'undeliverable'
    return record # 返回处理后的记录

# 创建数据流任务,应用 Python 函数,并将结果写入目标数据库
tap> myflow = Flow("orders_complex_data_processing_flow") \
       .read_from("MySQL_ECommerce.ecom_orders") \
       .py(pyfunc)
       .write_to("MongoDB_ECommerce.orders_processed_collection") \
       .save()
tap> myFlow.start()

在订单表里通过 lookup 补齐客户信息

代码语言:javascript
代码运行次数:0
复制
tap> myflow = Flow("orders_lookup_flow") \
       .read_from("MySQL_ECommerce.ecom_orders") \
       .lookup("MySQL_ECommerce.ecom_customers", relation=[["customer_id", "customer_id"]])
       .write_to("MongoDB_ECommerce.wide_orders_collection") \
       .save()
tap> myFlow.start()

最后的结果

代码语言:javascript
代码运行次数:0
复制
tap> use MongoDB_ECommerce
datasource switch to: MongoDB_ECommerce

tap> peek wide_orders_collection
table wide_orders_collection has 12641 records
{
  'order_status': 'unavailable',
  'customer_state': 'SP',
  'customer_unique_id': 'a77550dd00887c5bb24100ccbd08cbe9',
  'order_estimated_delivery_date': '2017-11-03 00:00:00',
  '_id': '676154705338b293ccd85c80',
  'customer_id': '3a92efdb6e6163dc1734d44f2f5f6d04',
  'order_id': '0010dedd556712d7bb69a19cb7bbd37a',
  'purchase_timestamp': '2017-10-21 19:32:06',
  'order_approved_at': '2017-10-24 03:25:32',
  'customer_city': 'sao paulo',
  'customer_zip_code_prefix': '04851'
}
{
  'order_status': 'shipped',
  'customer_state': 'BA',
  'customer_unique_id': '205d5aa158338f2b733a07326aae8c87',
  'order_estimated_delivery_date': '2018-04-16 00:00:00',
  '_id': '676154705338b293ccd85c82',
  'customer_id': '7fa80efb1ef15ca4104627910c29791c',
  'order_id': '002f19a65a2ddd70a090297872e6d64e',
  'purchase_timestamp': '2018-03-21 13:05:30',
  'order_approved_at': '2018-03-21 13:15:27',
  'customer_city': 'camacari',
  'customer_zip_code_prefix': '42804'
}
{
  'order_status': 'canceled',
  'customer_state': 'MG',
  'customer_unique_id': 'ec979208947bbba310f2ad8e50963725',
  'order_estimated_delivery_date': '2018-08-29 00:00:00',
  '_id': '676154705338b293ccd85c84',
  'customer_id': '0dad07848c618cc5a4679a1bfe1db8d2',
  'order_id': '00310b0c75bb13015ec4d82d341865a4',
  'purchase_timestamp': '2018-08-15 14:29:08',
  'order_approved_at': '2018-08-15 15:04:25',
  'customer_city': 'belo horizonte',
  'customer_zip_code_prefix': '31160'
}
{
  'order_status': 'unavailable',
  'customer_state': 'BA',
  'customer_unique_id': 'cd88b962adbc4b174353217f99dc6174',
  'order_estimated_delivery_date': '2018-01-08 00:00:00',
  '_id': '676154705338b293ccd85c85',
  'customer_id': '3d2f26eab3f79dd1fe9977f615e70c2f',
  'order_id': '00a500bc03bc4ec968e574c2553bed4b',
  'purchase_timestamp': '2017-11-23 10:53:01',
  'order_approved_at': '2017-11-25 10:54:38',
  'customer_city': 'salvador',
  'customer_zip_code_prefix': '41180'
}
{
  'order_status': 'shipped',
  'customer_state': 'BA',
  'customer_unique_id': '60ec651482858c327c177cf9360cc0a2',
  'order_estimated_delivery_date': '2018-09-18 00:00:00',
  '_id': '676154705338b293ccd85c87',
  'customer_id': '7a399396442d5601cbedfbd0a3cf1da4',
  'order_id': '00a99c50fdff7e36262caba33821875a',
  'purchase_timestamp': '2018-08-17 16:25:04',
  'order_approved_at': '2018-08-17 16:35:18',
  'customer_city': 'teixeira de freitas',
  'customer_zip_code_prefix': '45990'
}

更多的交互式命令参考可以看这个文档:https://docs.tapdata.net/tapflow/tapcli-reference

以 Python 程序方式来运行上面的代码

代码语言:javascript
代码运行次数:0
复制
# cat order_mview.py
from tapflow.lib import *
from tapflow.cli.cli import init
                    
mysql_conn = DataSource('mysql', 'MySQL_ECommerce', 
                    {
                    'database': 'ECommerceData',  
                    'port': 3306,                 
                    'host': 'demo.tapdata.io',      
                    'username': 'demouser',  
                    'password': 'demopass'   
                    })
                    .type('source')
                    .save()   
                    
mongodb_conn = DataSource("mongodb", "MongoDB_ECommerce",
                    {
                        "uri": "mongodb://your_username:your_passwd@192.168.1.18:27017/ecommerce?authSource=admin"
                    })
                    .type("target")
                    .save()

def pyfunc(record):
    if not record['order_status'] :
        record['order_status']  = 'invalid'
    if record['order_status'] == 'SendError' :
        record['order_status'] = 'undeliverable'
    return record
         
myflow = Flow("mysql_order_flow") \
       .read_from("MySQL_ECommerce.ecom_orders") \
       .exclude("order_delivered_carrier_date", "order_delivered_customer_date")  \
       .rename_fields({'order_purchase_timestamp': 'purchase_timestamp'})  \
       .py(pyfunc)
       .lookup("MySQL_ECommerce.ecom_customers", relation=[["customer_id", "customer_id"]])
       .write_to("MongoDB_ECommerce.wide_orders_collection") \
       .save()

myflow.start() # Start the flow
       
# python order_mview.py

已知问题

  • Lookup 目前只能用 MongoDB 作为目标

TapFlow 路线图

我们将持续改进 TapFlow的能力,以下是一些中长期的路线图功能:

  • Lookup 支持除了 MongoDB 以外的更多目标库
  • 支持项目工程 Project
  • 支持 inner join
  • 提供 Java SDK / RESTful API

关于 TapData Live Data Platform:

如果你还不是很熟悉,TapData 是为企业内部的实时数据集成和实时数据服务专门打造的一个实时数据平台。它具有以下特点:

1. 专为实时数据管道而设计的框架: 基于CDC技术,数据采集及处理延迟可在亚秒级

2. 高性能:单节点每秒可处理数十万条记录

3. 内置丰富的 CDC(Change Data Capture)连接器:快速对接 Oracle, DB2, Sybase, MySQL, PostgreSQL, MSSQL 等

4. 丰富的实时数据处理功能:过滤,改名,增删字段

5. 多表关联: 构建持续更新的物化视图,数据汇聚

6. UDF: Javascript 或 Python 自定义function,处理复杂逻辑

7. 至少一次和精确一次的一致性保障

8. 完善的数据校验: 全量/增量校验,哈希校验,二次校验等

9. 国产数据库支持:Dameng, Kingbase, GaussDB, OceanBase, GBase, VastBase

10. Kafka 支持:作为生产者把数据库事件直接推送给Kafka,或从Kafka 队列消费事件

11. 支持私有化和全托管:可以开源版线下部署,或直接使用TapData Cloud的全托管服务

TapData 的常见应用场景包括:

  • 替代 Oracle Golden Gate 进行数据库间实时同步

传统数据库复制工具往往成本高昂且复杂,TapFlow 提供了一种轻量级且易用的替代方案,帮助您在不同的数据库之间高效同步数据,支持从 MySQL 到 PostgreSQL,甚至是 NoSQL 数据库。

  • 替代 Kafka 构建实时数据管道

对于那些需要实时传输数据的场景,TapFlow 是一个强有力的替代方案。它无需部署复杂的 Kafka 集群,而是直接通过轻量化的方式提供同等甚至更高效的数据管道构建能力。

  • 创建持续刷新的物化视图,用来做查询加速,读写分离等

当业务需要实时查询最新的数据结果时,物化视图是一种高效的方式。TapFlow 可以持续刷新物化视图,保证数据的实时性,从而支持实时分析与决策。

  • 数据实时入仓入湖

现代数据分析的趋势是实时化,TapFlow 可以将数据实时写入数据仓库或数据湖(如Apache Doris, Clickhouse, 或者云数仓如 Ali Cloud ADB, SelectDB, BigQuery)

  • 通用流式 ETL 数据处理

TapFlow 同样支持复杂的 ETL(抽取、转换、加载)任务,借助 Python 的灵活性和内置的处理能力,开发者可以轻松处理复杂的数据转换需求。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-02-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Tapdata 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 什么是TapFlow?
  • 为何需要编程式的方式?
  • TapFlow 使用示例
    • 字段改名
    • 用Python 脚本做复杂自定义处理
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档