我想创建一个云函数(HTTP)来在Gcp DataProc
集群中提交spark作业。我创建了一个示例微服务,并通过以下链接实现了这一点。How do you use the Google DataProc Java Client to submit spark jobs using jar files and classes in associated GS bucket?。现在我想在gcp
云函数服务方法中实现同样的方法。这是我的微服务中的代码:
public static void main(String a[]) throws IOException {
GoogleCredential credential = GoogleCredential.fromStream(new FileInputStream("My credential file location")).createScoped(
java.util.Arrays.asList("https://www.googleapis.com/auth/cloud-platform"));
Dataproc dataproc = new Dataproc.Builder(new NetHttpTransport(), new JacksonFactory(), credential)
.setApplicationName("my-webabb/1.0")
.build();
String curJobId = "spark-job-" + UUID.randomUUID().toString();
Job jobSnapshot = null;
jobSnapshot = dataproc.projects().regions().jobs().submit(
"gcp-project-name", "cluster-region", new SubmitJobRequest()
.setJob(new Job()
.setReference(new JobReference()
.setJobId(curJobId))
.setPlacement(new JobPlacement()
.setClusterName("cluster-name"))
.setSparkJob(new SparkJob()
.setMainClass("MainMethod")
.setJarFileUris(ImmutableList.of("jarfilelocation"))
)))
.execute();
}
在执行此代码时,将在该集群中创建作业。这很好。但我的疑问是,如果我想在云函数中执行同样的步骤,我需要通过什么凭证?如果我将云功能和集群都放在同一个网络中,它可以在没有凭据的情况下工作。但是,是否可以在下面的代码中创建一个没有凭据的dataproc
?
Dataproc dataproc = new Dataproc.Builder(new NetHttpTransport(), new JacksonFactory(), credential)
.setApplicationName("my-webabb/1.0")
.build();
我也试着用
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-dataproc</artifactId>
<version>1.5.2</version>
</dependency>
这是代码
public static void main(String a[])
throws IOException, InterruptedException {
String myEndpoint = String.format("%s-dataproc.googleapis.com:443", "us-central1");
// Configure the settings for the job controller client.
JobControllerSettings jobControllerSettings =
JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
// Create a job controller client with the configured settings. Using a try-with-resources
// closes the client,
// but this can also be done manually with the .close() method.
try (JobControllerClient jobControllerClient =
JobControllerClient.create(jobControllerSettings)) {
// Configure cluster placement for the job.
JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName("myclusterName").build();
// Configure Spark job settings.
SparkJob sparkJob =
SparkJob.newBuilder()
.setMainClass("mymain")
.addJarFileUris("myJarFile")
.build();
Job job =
Job.newBuilder().setPlacement(jobPlacement).setSparkJob(sparkJob).build();
// Submit an asynchronous request to execute the job.
OperationFuture<Job, JobMetadata> submitJobAsOperationAsyncRequest =
jobControllerClient.submitJobAsOperationAsync("myProjectId", "us-central1", job);
Job response = submitJobAsOperationAsyncRequest.get();
// Print output from Google Cloud Storage.
Matcher matches =
Pattern.compile("gs://(.*?)/(.*)").matcher(response.getDriverOutputResourceUri());
matches.matches();
} catch (ExecutionException e) {
// If the job does not complete successfully, print the error message.
System.err.println(String.format("submitJob: %s ", e.getMessage()));
}
}
在上面的依赖中,我不能设置jobId。如何使用它设置JobId?
有没有人能建议一些可行的方法呢?
提前感谢:)
发布于 2021-06-13 18:40:56
实际上,您可以在将作业ID提交到dataproc集群时设置作业ID。您可以在the dataproc API中找到它。如果您仔细查看要在参数中设置的Job object。
在这里,您有一个作业UUID。注释很清楚:仅输出,使用reference.job_id设置作业ID
好的,转到reference object,您可以定义一个JobID和一个项目ID。
现在,让我们对库执行此操作:在作业对象中,添加一个具有项目ID和作业ID的引用对象。
Job job = Job.newBuilder().setReference(
JobReference.newBuilder().setJobId("123").setProjectId("myProjectId").build())
.setPlacement(jobPlacement).setSparkJob(sparkJob).build();
https://stackoverflow.com/questions/67908728
复制