前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Spring Batch 批处理(8) - JobLauncher和JobOperator

Spring Batch 批处理(8) - JobLauncher和JobOperator

作者头像
chenchenchen
发布于 2020-05-26 09:17:22
发布于 2020-05-26 09:17:22
3.6K00
代码可运行
举报
文章被收录于专栏:chenchenchenchenchenchen
运行总次数:0
代码可运行

在成功创建一个job后,Spring Batch 默认在项目启动时候执行配置的job。往往在正常业务处理中,需要我们手动或者定时去触发job,所以这边便引入了jobLauncher、jobOperator两个执行器。

JobLauncher作业调度

启动一个job

运行一个批处理任务至少有两点要求:一个 JobLauncher 和一个用来运行的 job 。它们都包含了相同或是不同的 context 。

Spring Boot默认支持自动启动已配置好的Job,我们可以通过配置项spring.batch.job.enabled=false来禁止Spring容器自动启动Job。

Spring Launch API它的核心就是 JobLauncher 接口。JobLauncher 需要2个参数:Job , JobParameters。

JobLauncher 的接口:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public interface JobLauncher {  
    public JobExecution run(Job job, JobParameters jobParameters) throws ();  
}  

正常情况下,当我们通过调度器调用Job时,从命令行来启动job,会为每一个job初始化一个JVM,因此每个job会有一个自己的 JobLauncher,整个流程如下:

从web容器的HttpRequest来启动job,一般只是用一个 JobLauncher 来异步启动job,http请求会调用这个 JobLauncher 来启动它们需要的job。通过web启动job的例子:

前端index.html

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<input type="text" id="msg">
<button onclick="runJob1()">Run Job 1</button>

<script type="text/javascript">
	var baseurl="http://localhost:8080";
	var xhttp=new XMLHttpRequest();
	
	function runJob1(){
		var msg = document.getElementById('msg').value;
		var url = baseurl + '/job/' +msg;
		xhttp.open('GET',url,true);
		xhttp.send();
		
		xhttp.onreadystatechange = function(){
			if(this.readyState == 4 && this.status ==200){
				console.log('Job status: '+this.responseText);
			}
		}
	}
	
</script>

JobLauncherController

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Controller
public class JobLauncherController {
    @Autowired
    JobLauncher jobLauncher;
    @Autowired
    JobLauncherDemoJob jobLauncherDemoJob;
    @RequestMapping("/job/{msg}")
    public void handle(@PathVariable String msg) throws Exception{
			// 把接收到的参数传给任务
			JobParameters parameters = new JobParametersBuilder().addString("msg",msg).toJobParameters();
			
			jobLauncher.run(jobLauncherDemoJob, parameters);
        return "JobLauncher success.";
    }
}

jobLauncher ``` @Bean public JobLauncher jobLauncher() { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository()); jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor()); //转换为异步任务 jobLauncher.afterPropertiesSet(); return jobLauncher; } ```

JobLauncherDemo

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class JobLauncherDemo implements StepExecutionListener{

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
	private Map<String,JobParameters> parameters;
	
    @Bean
    public Job jobLauncherDemoJob() {
        return jobBuilderFactory.get("jobLauncherDemoJob")
				.listener(this)
                .start(jobLauncherDemoStep())
                .build();
    }
		
	@Bean
    public Step jobLauncherDemoStep() {
        return stepBuilderFactory.get("jobLauncherDemoStep")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("jobLauncherDemoStep,msg: "+parameter.get("msg").getValue());
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }
	  
	@Override
	public void beforeStep(StepExecution stepExcution){
		parameter = stepExcution.getJobParameters().getParameters();
	}

运行结果

停止一个job

谁需要停止job

(1)、使用者可能由于某些原因,需要停止job的运行,比如发现job出现数据错误,或者抛出异常,需要停止job的工作。

(2)、开发者在开发程序的过程中,开发者明确的知道一些业务逻辑需要停止job。比如,一个job运行的时间不能超过早上8点,如果超过这个时间需要停止job的运行,等等的情况。

此时job中的程序在运行逻辑代码,只有当这些业务完成之后,程序的管理权交回到spring batch的时候,才会被终止。如果中间的业务运行需要很长的时间,则job不会马上停止。而一旦控制权还给了框架,它会立刻设置当前 StepExecution 为 BachStatus.STOPPED ,意为停止,然后保存,最后在完成前对JobExecution进行相同的操作。

如何停止job

(1)、运行的过程中抛出一个exception,造成job的停止。

(2)、利用StepExecution来设置一个标识,停止job的运行。

最好的办法是,利用StepExecution来设置一个标识,停止job的运行。

a、在Tasklet接口的方法中有StepExecution参数,可以进行调用。

示例:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class ProcessItemsTasklet implements Tasklet {  
    @Override  
    public RepeatStatus execute(StepContribution contribution,  
            ChunkContext chunkContext) throws Exception {  
        if(shouldStop()) {  
            chunkContext.getStepContext().getStepExecution().setTerminateOnly();  
        }  
        processItem();  
        if(moreItemsToProcess()) {  
            return RepeatStatus.CONTINUABLE;  
        } else {  
            return RepeatStatus.FINISHED;  
        }  
        }  
        (...)  
}  

b、面向“块”的step中,ItemReader, ItemProcessor, 和 ItemWriter 这3个接口中,它们中间没有 StepExecution。此时我们需要用到Listener中的StepExecution。

示例:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class StopListener {  
    private StepExecution stepExecution;  
    @BeforeStep  
    public void beforeStep(StepExecution stepExecution) {  
        this.stepExecution = stepExecution;  
    }  
      
    @AfterRead  
    public void afterRead() {  
        if(stopConditionsMet()) {  
            stepExecution.setTerminateOnly();  
        }  
    }  
    (...)  
}  

从业务逻辑的需要出发,停止job的最佳的方式,还是设置stepExecution.setTerminateOnly();这个job停止标识,来让job停止运行。

放弃一个job

一个job的执行过程当执行到FAILED状态之后,如果它是可重启的,它将会被重启。

如果任务的执行过程状态是ABANDONED,那么框架就不会重启它。ABANDONED状态也适用于执行步骤,使得它们可以被跳过,即便是在一个可重启的任务执行之中:如果任务执行过程中碰到在上一次执行失败后标记为ABANDONED的步骤,将会跳过该步骤直接到下一步(这是由任务流定义和执行步骤的退出码决定的)。

如果当前的系统进程死掉了(“kill -9”或系统错误),job自然也不会运行,但JobRepository是无法侦测到这个错误的,因为进程死掉之前没有对它进行任何通知。你必须手动的告诉它,你知道任务已经失败了还是说考虑放弃这个任务(设置它的状态为FAILED或ABANDONED)-这是业务逻辑层的事情,无法做到自动决策。

只有在不可重启的任务中才需要设置为FAILED状态,或者你知道重启后数据还是有效的。Spring Batch Admin中有一系列工具JobService,用以取消正在进行执行的任务。

失败一个job

失败的job是可以重新启动的,因为它的状态是FAILED,如果step2失败,则返回一个EARLY TERMINATION的返回码,step3也就不会执行。否则继续执行step3

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<step id="step1" parent="s1" next="step2">

<step id="step2" parent="s2">
    <fail on="FAILED" exit-code="EARLY TERMINATION"/>
    <next on="*" to="step3"/>
</step>

<step id="step3" parent="s3">

结束一个job

已经结束的job是不能重新启动的,因为它的状态是COMPLETED。如果step2失败了,则step3就不执行了,该job也就COMPLETED,结束了。如果step2成功了,则继续往下执行step3。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<step id="step1" parent="s1" next="step2">

<step id="step2" parent="s2">
    <end on="FAILED"/>
    <next on="*" to="step3"/>
</step>

<step id="step3" parent="s3">

JobOperator

一个JobLauncher使用一个JobRepository创建并运行新的JobExection对象,Job和Step实现随后使用相同的JobRepository在job运行期间去更新相同的JobExecution对象。

这些基本的操作能够满足简单场景的需要,但是对于有着数百个任务和复杂定时流程的大型批处理情况来说,就需要使用更高级的方式访问元数据:

JobRepository 提供了对元数据的 CRUD 操作,JobExplorer 提供了对元数据的只读操作。然而,这些操作最常用于联合使用诸多的批量操作类,来对任务进行监测,并完成相当多的任务控制功能,比如停止、重启或对任务进行汇总。在Spring Batch 中JobOperator 接口提供了这些操作类型:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public interface JobOperator {
      List<Long> getExecutions(long instanceId) throws NoSuchJobInstanceException;
      List<Long> getJobInstances(String jobName, int start, int count)throws NoSuchJobException;
      Set<Long> getRunningExecutions(String jobName) throws NoSuchJobException;
      String getParameters(long executionId) throws NoSuchJobExecutionException;
      Long start(String jobName, String parameters)throws NoSuchJobException, JobInstanceAlreadyExistsException;
      Long restart(long executionId)throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException,
                                 NoSuchJobException, JobRestartException;
      Long startNextInstance(String jobName)throws NoSuchJobException, JobParametersNotFoundException, JobRestartException,
                                 JobExecutionAlreadyRunningException, JobInstanceAlreadyCompleteException;
      boolean stop(long executionId)throws NoSuchJobExecutionException, JobExecutionNotRunningException;
      String getSummary(long executionId) throws NoSuchJobExecutionException;Map<Long, String> getStepExecutionSummaries(long executionId)
                                 throws NoSuchJobExecutionException;
      Set<String> getJobNames();
}

JobOperator 最常见的作用莫过于停止某个Job:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Set<Long> executions = jobOperator.getRunningExecutions("sampleJob");
jobOperator.stop(executions.iterator().next());

关闭不是立即发生的,因为没有办法将一个任务立刻强制停掉,尤其是当任务进行到开发人员自己的代码段时,框架在此刻是无能为力的,比如某个业务逻辑处理。而一旦控制权还给了框架,它会立刻设置当前 StepExecution 为 BachStatus.STOPPED ,意为停止,然后保存,最后在完成前对JobExecution进行相同的操作。 此处我们通过web的API接口去调用 jobOperator,通过接口传入job的参数。调用的Job 是根据 在创建job时候,Bean name去指定。

示例:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Controller
public class JobOperatorController {
	@Autowired
    private JobOperatorDemo jobOperatorDemo;

	//不需要注入Job对象
    //@Autowired
    //JobLauncherDemoJob jobLauncherDemoJob;
		
    @RequestMapping("/job/{msg}")
    public void handle(@PathVariable String msg) throws Exception{
			// 把接收到的参数传给任务
			System.out.println("Request to run job2 with param: " + job2param);
 
        jobOperatorDemo.start("jobOperatorDemoJob","msg="+msg);
 
        return "JobOperator success.";
    }
}

``` public class JobOperatorDemo implements StepExecutionListener,ApplicationContextAware{

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private Map<String,JobParameters> parameters;

@Autowired
private JobRepository jobRepository;

@Autowired
private JobExplorer jobExplorer;

@Autowired
private JobRegistry jobRegistry;

@Autowired
private JobLauncher jobLauncher;
	
	pricate ApplicationContext context;
	
	 
@Bean
public JobRepositoryBeanPostProcessor jobRegistrar() throws Exception{
	JobRepositoryBeanPostProcessor postProcessor = new JobRepositoryBeanPostProcessor;
	
	postProcessor.setJobRegistry(jobRegistry);
	postProcessor.setBeanFactory(context.getAutowireCapableBeanFactory());
	postProcessor.afterPropertiesSet();
	
	return postProcessor;
}
	
@Bean
public JobOperator jobOperator(){
    SimpleJobOperator operator = new SimpleJobOperator();

    operator.setJobLauncher(jobLauncher);
			// 参数转换
    operator.setJobParametersConverter(new DefaultJobParametersConverter());
			// 持久化方式
    operator.setJobRepository(jobRepository);
			// 任务相关信息
    operator.setJobExplorer(jobExplorer);
			// 注册:任务名字符串和任务对象关联
    operator.setJobRegistry(jobRegistry);
    return operator;
}

@Bean
public Job jobOperatorDemo() {
    return jobBuilderFactory.get("jobOperatorDemo")
			.listener(this)
            .start(jobOperatorDemoStep())
            .build();
}
	
@Bean
public Step jobOperatorDemoStep() {
    return stepBuilderFactory.get("jobOperatorDemoStep")
            .tasklet(new Tasklet() {
                @Override
                public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                    System.out.println("jobOperatorDemoStep,msg: "+parameter.get("msg").getValue());
                    return RepeatStatus.FINISHED;
                }
            }).build();
}
  
@Override
public void beforeStep(StepExecution stepExcution){
	parameter = stepExcution.getJobParameters().getParameters();
}



@Override
public void setApplicationContext(ApplicationContext applicationContext){
	this.context = applicationContext;
}
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<br/><br/>


<br/><br/>
 
 
 参考:
 
 https://blog.csdn.net/github_36849773/article/details/66968461
 
 https://www.iteye.com/blog/kanpiaoxue-1771208
 
 https://www.cnblogs.com/nizuimeiabc1/p/9409492.html
 
 https://blog.csdn.net/kangkanglou/article/details/82627799
 
 https://blog.csdn.net/wuzhiwei549/article/details/85394406
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/03/10 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Spring Batch 批处理(3) - Job、Flow、Split
在成功创建一个job后,Spring Batch 默认在项目启动时候执行配置的job。往往在正常业务处理中,需要我们手动或者定时去触发job,所以这边便引入了jobLauncher、jobOperator两个执行器。
chenchenchen
2020/05/26
1.9K0
SpringBatch文档
Spring Batch 是一个轻量级的、完善的批处理框架,旨在帮助企业建立健壮、高效的批处理应用。
全栈程序员站长
2022/09/01
5.5K0
Spring Batch多步骤任务、并行执行、任务决策器、任务嵌套
企业中经常会有需要批处理才能完成的业务操作,比如:自动化地处理大批量复杂的数据,如月结计算;重复性地处理大批量数据,如费率计算;充当内部系统和外部系统的数据纽带,中间需要对数据进行格式化,校验,转换处理等。
鱼找水需要时间
2023/02/16
3.5K0
Spring Batch多步骤任务、并行执行、任务决策器、任务嵌套
springbatch+mysql
1.必须导入的依赖。因为spirngbatch必须配置数据源dataSource所有引入了数据库的相关jar包
全栈程序员站长
2022/09/01
6950
Spring 官方批处理框架真香!Spring 全家桶永远滴神!
假期余额严重不足,难受呀!今天早上 8 点半出门坐车,晚上 9 点才到家,差不多坐了一天车才到家,屁股都坐疼了......
Guide哥
2021/10/11
7680
Spring Batch任务调度
在前面的例子中,我们配置的任务都是在项目启动的时候自动运行,我们也可以通过JobLauncher或者JobOperator手动控制任务的运行时机,这节记录下它们的用法。
技术从心
2020/04/21
2.8K0
Spring Batch任务调度
Spring Cloud Task 高级特性Task Batch Jobs
Spring Cloud Task是一个轻量级的框架,用于在Spring Boot应用程序中运行短期任务。它提供了一种简单的方式来管理和监控这些任务,同时还可以与Spring Batch集成,以支持批量处理任务。在本文中,我们将重点介绍Spring Cloud Task的高级特性之一:Task Batch Jobs。
堕落飞鸟
2023/04/17
6840
Spring Cloud Task 示例演示
假设我们有一个任务,需要将一些数据从数据库中提取出来,然后写入到文件中。为了完成这个任务,我们需要执行以下步骤:
堕落飞鸟
2023/04/17
5190
Spring Batch(3)——Step控制
批处理任务的主要业务逻辑都是在Step中去完成的。可以将Job理解为运行Step的框架,而Step理解为业务功能。
随风溜达的向日葵
2019/07/08
6.5K1
Spring Batch(3)——Step控制
Spring Batch(2)——Job配置与运行
在 Spring Batch(1)——数据批处理概念 文中介绍了批处理的概念以及Spring Batch相关的使用场景,后续将会陆续说明在代码层面如何使用。
随风溜达的向日葵
2019/07/08
4.5K1
Spring Batch 批处理(2) - 搭建及运行
private JobBuilderFactory jobBuilderFactory;
chenchenchen
2020/05/26
8410
Spring Batch实战(一)
虽然开源软件项目和相关社区把更多的注意力集中在基于web和微服务的体系结构框架上,但明显缺乏对可重用体系结构框架的关注,以适应基于java的批处理需求,尽管仍然需要在企业IT环境中处理此类处理。缺乏标准的、可重用的批处理体系结构导致了在客户企业IT功能中开发的许多一次性的内部解决方案的激增。
xdd
2022/07/12
1.8K0
Spring Batch实战(一)
Spring Batch(4): Job具体解释[通俗易懂]
Job的配置有3个必须的属性。name,jobRepository,steps。一个简单的Job配置例如以下:
全栈程序员站长
2022/07/29
8340
Spring Batch(4): Job具体解释[通俗易懂]
Spring Batch 核心概念Step示例
首先,我们需要创建一个用来存储数据的表,这里我们创建一个名为“person”的表,包含id、name和age三个字段:
堕落飞鸟
2023/04/16
3200
SpringBoot:使用Spring Batch实现批处理任务
在企业级应用中,批处理任务是不可或缺的一部分。它们通常用于处理大量数据,如数据迁移、数据清洗、生成报告等。Spring Batch是Spring框架的一部分,专为批处理任务设计,提供了简化的配置和强大的功能。本文将介绍如何使用Spring Batch与SpringBoot结合,构建和管理批处理任务。
E绵绵
2024/07/03
1.1K0
Spring Cloud Task查看任务状态示例
我们将演示如何使用上述方法查看任务状态和信息。我们将创建一个简单的Spring Cloud Task应用程序,该应用程序将读取一个文件并输出其内容。我们将使用命令行方式启动任务,并使用Actuator端点和任务执行监听器来跟踪任务的状态和信息。
堕落飞鸟
2023/04/17
4060
Spring Batch 批处理(1) - 简介及使用场景
Spring Batch 作为 Spring 的子项目,是一款基于 Spring 的企业批处理框架。通过它可以构建出健壮的企业批处理应用。Spring Batch 不仅提供了统一的读写接口、丰富的任务处理方式、灵活的事务管理及并发处理,同时还支持日志、监控、任务重启与跳过等特性,大大简化了批处理应用开发,将开发人员从复杂的任务配置管理过程中解放出来,使他们可以更多地去关注核心的业务处理过程。
chenchenchen
2020/05/27
5.6K0
Spring Cloud Task 集成Spring Cloud Task Batch(二)
下一步是创建Spring Cloud Task,它将用于运行我们的Spring Batch作业。为此,我们需要定义一个TaskConfigurer和一个TaskLauncher:
堕落飞鸟
2023/04/17
4010
Spring Batch(1)——数据批处理概念
Spring Batch为批处理提供了一个轻量化的解决方案,它根据批处理的需要迭代处理各种记录,提供事物功能。但是Spring Batch仅仅适用于"脱机"场景,在处理的过程中不能和外部进行任何交互,也不允许有任何输入。
随风溜达的向日葵
2019/07/04
2K0
Spring Batch(1)——数据批处理概念
Spring Batch 详解
调用这个Job Launcher方法:可以通过java程序来通过JobLauncher来启动,也可以通过定时任务例如Quartz scheduler来启动.
全栈程序员站长
2022/08/24
8960
相关推荐
Spring Batch 批处理(3) - Job、Flow、Split
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验