任务编排

最近更新时间:2024-10-09 16:44:41

我的收藏

操作场景

为了提升用户搭建数据流任务的效率,CKafka 连接器支持任务编排功能。对于只有一个数据源、一个数据目标的场景,用户可以使用预设模板一键搭建数据流任务,将数据从源快速集成到指定目标。

前提条件

已创建好数据源和数据目标连接。

新建链路

CKafka 连接器当前提供了四种预设模板(订阅 MySQL 数据到 ES、订阅 PostgreSQL 数据到 ES、订阅 TDSQL-C MySQL 数据到 ES、订阅 TDSQL-C PostgreSQL 数据到 ES),您可以选择使用。
接下来以订阅 MySQL 数据到 ES 为例为您介绍新建链路的操作步骤。
1. 登录 CKafka 控制台
2. 在左侧导航栏单击连接器 > 任务编排,选择好地域后,单击新建链路
3. 填写链路名称,模板类型选择订阅 MySQL 数据到 ES,单击下一步
4. 填写数据源配置信息。
参数
说明
数据源
选择提前创建好的 MySQL 源数据连接
选择数据库表
支持三种选择方式:
全部库表:订阅该连接关联的所有数据库表。
批量选择:支持手动勾选要订阅的数据库和表,支持订阅多个数据库、多个表。
正则匹配:支持使用正则匹配筛选订阅符合条件的表。



5. (可选)设置高级参数。
参数
说明
复制存量数据
开启后将复制源 MySQL 数据库中的存量数据,开关一经打开,无论后续是否关闭,都无法新增需要监听的库。
订阅结构更新
订阅结构更新将订阅整个数据库实例所有对象的结构创建,删除以及修改。若数据目标配置选择分发到多个Topic 则不支持订阅结构更新。
包含原始 SQL 查询
是否包含生成变更事件的原始 SQL 查询。需要 MySQL 的配置项"binlog_rows_query_log_events"的值为"ON"。
包含 Schema
消息输出时,KEY 和 VALUE 内容是否包含 schema。
分区策略
订阅数据写入,默认情况下根据主键 hash 到不同的分区。可以手动指定表的 hash 字段。
数据格式
默认采用 Debezium 格式,同时提供了兼容其他消息格式的能力。
Canal 格式:详情介绍请参见 MySQL 订阅消息 Canal 格式说明
官方格式一:详情介绍请参见 MySQL 订阅消息官方格式说明



6. 单击下一步,配置数据目标信息。
源数据:点击拉取源 Topic 数据。
数据目标:选择提前创建好的数据流出的目标 Elasticsearch Service 连接。
索引名称:填写索引名称,索引名称必须全部为小写,支持 jsonpath 语法。
按日期拆分索引名称:可选,开启后需选择日期格式,写入 ES 的索引为%(索引名称)_%(日期)。
保留非 JSON 数据:如果保留非 JSON 数据开启,则会指定 key 进行组装投递,关闭则丢弃。
KEY:源 topic 内数据不是 JSON 格式时,可以指定 key 组装为 JSON 投递到 ES 中。
数据库同步模式:本选项仅用于 CKafka 连接器订阅 MySQL,PostgreSQL 数据库到 Topic(仅支持1分区的 Topic)里面的数据(增删改)同步更新到 ES。会识别数据库的增删改,保持 ES 的数据与源表的数据一致。
ES 文档 ID 字段:未开启数据库同步模式时,指定该字段内的值作为 ES 文档 ID 的值。
失败消息处理:选择投递失败的消息的处理方式,支持丢弃保留投递至 CLS (需指定投递到的日志集和日志主题并授权访问日志服务CLS)三种方式。
保留:适合用于测试环境,任务运行失败时将会终止任务不会重试,并且在事件中心中记录失败原因。
丢弃:适合用于生产环境,任务运行失败时将会忽略当前失败消息。建议使用 "保留" 模式测试无误后,再将任务编辑成 "丢弃" 模式用于生产。
投递至 CLS:适合用于严格生产环境,任务运行失败时会将失败消息及元数据和失败原因上传到指定 CLS 主题中。



7. 单击提交,完成链路创建。

删除链路

在 链路列表页面,单击目标链路的操作栏的删除,在二次确认弹窗中单击确认,可删除链路。
说明
删除链路表示停止数据转储并删除任务记录,不会影响到已经转储的数据和相关的 CKafka 实例。
链路一旦删除不可恢复,请您谨慎操作。

查看链路

单击链路的“ID”,进入链路基本信息页面,可以查看链路的详细信息。
基本信息、链路拓扑图。
关联资源:链路关联的 Topic 和连接。
关联任务:链路关联的数据接入和数据流出任务。