首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用Java代码将作业提交给Flink集群?

如何使用Java代码将作业提交给Flink集群?
EN

Stack Overflow用户
提问于 2020-01-28 21:35:48
回答 2查看 2.5K关注 0票数 4

我已经将包含应用程序代码的胖jar上载到Flink集群中所有节点的/lib文件夹中。我正在尝试从一个单独的java应用程序启动Flink作业,但是找不到一个很好的方法。

与我目前找到的解决方案最接近的是具有运行作业API的监控Rest。但是,这只允许您运行通过作业上载函数提交的作业。

我在flink客户端模块中看到了ClusterClient.java,但没有看到如何使用它的任何示例。

如果有人通过java代码成功地提交了作业,我们将不胜感激!

EN

回答 2

Stack Overflow用户

发布于 2020-01-29 18:43:37

您可以使用RestClusterClient运行指向Flink作业的PackagedProgram。如果你的工作接受一些论点,你可以传递它们。

下面是运行在localhost:8081上的独立集群的示例:

代码语言:javascript
复制
// import org.apache.flink.api.common.JobSubmissionResult;
// import org.apache.flink.client.deployment.StandaloneClusterId;
// import org.apache.flink.client.program.PackagedProgram;
// import org.apache.flink.client.program.rest.RestClusterClient;
// import org.apache.flink.configuration.Configuration;
// import org.apache.flink.configuration.JobManagerOptions;
// import org.apache.flink.configuration.RestOptions;

String clusterHost = "localhost";
int clusterPort = 8081;

Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, clusterHost);
config.setInteger(RestOptions.PORT, clusterPort);

String jarFilePath = "/opt/flink/examples/streaming/SocketWindowWordCount.jar";
String[] args = new String[]{ "--port", "9000" };
PackagedProgram packagedProgram = new PackagedProgram(new File(jarFilePath), args);

RestClusterClient<StandaloneClusterId> client =
         new RestClusterClient<StandaloneClusterId>(config, StandaloneClusterId.getInstance());

int parallelism = 1;
JobSubmissionResult result = client.run(packagedProgram,  parallelism);
票数 2
EN

Stack Overflow用户

发布于 2020-11-02 13:11:14

这似乎适用于1.10版

代码语言:javascript
复制
private static final int PARALLELISM = 8;
private static final Configuration FLINK_CONFIG = new Configuration();

void foo() throws Exception {
    FLINK_CONFIG.setString(JobManagerOptions.ADDRESS, "localhost");
    FLINK_CONFIG.setInteger(RestOptions.PORT, 8081);
    FLINK_CONFIG.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 3);

    RestClusterClient<StandaloneClusterId> flinkClient = new RestClusterClient<>(FLINK_CONFIG, StandaloneClusterId.getInstance());

    String jar = "/path/to/jar";
    String[] args = new String[]{"..."};
    PackagedProgram program = PackagedProgram.newBuilder()
            .setJarFile(new File(jar))
            .setArguments(args)
            .build();

    JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, FLINK_CONFIG, PARALLELISM, false);

    JobID jobId = flinkClient.submitJob(jobGraph).get();
    ...
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59957387

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档