开发批作业

最近更新时间:2026-04-24 14:13:55

我的收藏
流计算 Oceanus 支持批作业开发,目前批作业支持以下作业类型:SQL 作业、JAR 作业、Python 作业。

SQL 作业

在 SQL 作业代码中配置批作业执行模式:
作业开发的高级参数中,添加如下参数,作业会按照批模式运行。
execution.runtime-mode: BATCH
说明:
批作业是为 有界流(Bounded Stream) 设计的,所以,Bounded Source 才能支持批作业运行。如果作业中引入了 Unbounded Source,启动作业时会报错。
作业示例 SQL 代码:
CREATE TABLE `datagen_source_table` (
id INT,
name STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '2',
-- 下面的参数能够将 datagen 作为 Bounded Source 运行作业
'number-of-rows' = '100'
);

CREATE TABLE `logger_sink_table` (
id INT,
name STRING
) WITH (
'connector' = 'logger',
'print-identifier' = 'DebugData'
);

INSERT INTO `logger_sink_table` SELECT * FROM `datagen_source_table`;
批作业能够支持分阶段运行。这个作业比较简单,Job Graph 算子会 chain 在一起运行,不能直接看到分阶段运行的这一特性。如果要在 Flink UI 中观察到这一特性,可在高级参数中添加参数 pipeline.operator-chaining: false ,重启作业后,chain 会断开,能够观察到作业在分阶段运行。

JAR 作业

在 JAR 作业代码中配置批作业执行模式:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
1. 只需将流作业中的 RuntimeExecutionMode.STREAMING 修改为 RuntimeExecutionMode.BATCH 即可实现批作业开发。
2. 把修改后的批作业重新打成 JAR 包上传至平台的依赖管理中。
3. 在平台的 JAR 作业开发中使用对应 JAR 包,完成 JAR 批作业开发。

Python 作业

在 Python 作业代码中配置批作业执行模式:
env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
table_env = TableEnvironment.create(env_settings)
1. 只需将流作业中的 EnvironmentSettings.new_instance().in_streaming_mode() 修改为 EnvironmentSettings.new_instance().in_batch_mode() 即可实现批作业开发。详情可查看 Python Table API 简介
2. 把修改后的批作业重新打成 py 文件上传至平台的依赖管理中。
3. 在平台的 Python 作业开发中使用对应 py 文件,完成 Python 批作业开发。