参考官网:https://www.yuque.com/powerjob/guidence/olgyf0
https://github.com/KFCFans/PowerJob
推荐使用3.3.0版本
导入需要的jar包,servie也需要使用3.3.0版本
<dependency>
<groupId>com.github.kfcfans</groupId>
<artifactId>powerjob-client</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>com.github.kfcfans</groupId>
<artifactId>powerjob-worker-spring-boot-starter</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>3.14.9</version>
</dependency>
一. 使用openAPI 开发一次性的任务,保证任务只调度一次就好
后台访问地址:http://192.168.2.11:7700/
powerjob:
worker:
akka-port: 27777 # akka 工作端口,可选,默认 27777
app-name: sass-openapi # 接入应用名称,用于分组隔离,推荐填写 本 Java 项目名称
server-address: 192.168.2.11:7700 # 调度服务器地址,IP:Port 或 域名,多值逗号分隔
password: shiye9527 # 密码
store-strategy: disk # 持久化方式,可选,默认 disk
2. 编写config配置文件 PowerJobConfig
package com.un.common.utils.job;
import com.github.kfcfans.powerjob.client.OhMyClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author shiye
* @create 2020-10-09 16:33
*/
@Configuration
public class PowerJobConfig {
/**
* # akka 工作端口,可选,默认 27777
*/
@Value("${powerjob.worker.akka-port}")
private Integer akkaPort;
/**
* 接入应用名称,用于分组隔离,推荐填写 本 Java 项目名称
*/
@Value("${powerjob.worker.app-name}")
private String appName;
/**
* 调度服务器地址,IP:Port 或 域名,多值逗号分隔
*/
@Value("${powerjob.worker.server-address}")
private String serverAddress;
@Value("${powerjob.worker.password}")
private String password;
/**
* 持久化方式,可选,默认 disk
*/
@Value("${powerjob.worker.store-strategy}")
private String storeStrategy;
@Bean
public OhMyClient getOhMyClient() {
return new OhMyClient(serverAddress, appName, password);
}
}
3. 编写util工具类 PowerJobUtil
package com.un.common.utils.job;
import com.github.kfcfans.powerjob.client.OhMyClient;
import com.github.kfcfans.powerjob.common.ExecuteType;
import com.github.kfcfans.powerjob.common.ProcessorType;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.un.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;
/**
* @author shiye
* @create 2020-10-09 16:55
*/
@Configuration
public class PowerJobUtil {
protected final Logger logger = LoggerFactory.getLogger(PowerJobUtil.class);
@Autowired
private OhMyClient ohMyClient;
@Bean
public PowerJobUtil getPowerJobUtil() {
return new PowerJobUtil();
}
/**
* 创建一个单核一年只执行一次得任务
*
* @param StartTime 任务开始时间
* @param params 任务参数
* @param processorInfo 回调得类全类名如:com.un.framework.task.SysNoticeScheduProcessor
* @param jobId 任务id:如果为空就是创建任务
* @return 返回结果
* @throws Exception
*/
public ResultDTO<Long> saveJob(Date StartTime, String params, String processorInfo, String jobId, String jobName, String jobDescription) throws Exception {
logger.info("saveJob .......{},{},{},{}", StartTime, params, processorInfo, jobId);
SaveJobInfoRequest request = new SaveJobInfoRequest();
if (StringUtils.isNotEmpty(jobId)) {
request.setId(Long.valueOf(jobId));
}
//任务名称
request.setJobName(jobName);
//任务描述
request.setJobDescription(jobDescription);
//任务参数,Processor#process方法入参TaskContext对象的jobParams字段
request.setJobParams(params);
//时间表达式类型,枚举值
request.setTimeExpressionType(TimeExpressionType.CRON);
//时间表达式,填写类型由timeExpressionType决定,比如CRON需要填写CRON表达式
request.setTimeExpression(getCron(StartTime));
//执行类型,枚举值
request.setExecuteType(ExecuteType.STANDALONE);
//处理器类型,枚举值
request.setProcessorType(ProcessorType.EMBEDDED_JAVA);
//处理器参数,填写类型由processorType决定,如Java处理器需要填写全限定类名,如:com.github.kfcfans.oms.processors.demo.MapReduceProcessorDemo
request.setProcessorInfo(processorInfo);
//最大实例数,该任务同时执行的数量(任务和实例就像是类和对象的关系,任务被调度执行后被称为实例)
request.setMaxInstanceNum(1);
//单机线程并发数,表示该实例执行过程中每个Worker使用的线程数量
request.setConcurrency(1);
//任务实例运行时间限制,0代表无任何限制,超时会被打断并判定为执行失败
request.setInstanceTimeLimit(0l);
//任务实例重试次数,整个任务失败时重试,代价大,不推荐使用
request.setMaxInstanceNum(0);
//Task重试次数,每个子Task失败后单独重试,代价小,推荐使用
request.setTaskRetryNum(2);
//最小可用CPU核心数,CPU可用核心数小于该值的Worker将不会执行该任务,0代表无任何限制
request.setMinCpuCores(0);
//最小内存大小(GB),可用内存小于该值的Worker将不会执行该任务,0代表无任何限制
request.setMinMemorySpace(0);
//最小磁盘大小(GB),可用磁盘空间小于该值的Worker将不会执行该任务,0代表无任何限制
request.setMinDiskSpace(0);
//指定机器执行,设置该参数后只有列表中的机器允许执行该任务,空代表不指定机器
request.setDesignatedWorkers(null);
//最大执行机器数量,限定调动执行的机器数量,0代表无限制
request.setMaxWorkerCount(1);
//是否启用该任务,未启用的任务不会被调度
request.setEnable(true);
ResultDTO<Long> resultDTO = ohMyClient.saveJob(request);
return resultDTO;
}
/**
* 禁用某个任务
*
* @param jobId
* @return
* @throws Exception
*/
public ResultDTO<Void> disableJob(Long jobId) {
logger.info("disableJob .......{}", jobId);
try {
TimeUnit.MINUTES.sleep(5);
return ohMyClient.disableJob(jobId);
} catch (Exception e) {
logger.error("disableJob error.......{},{}", e, jobId);
}
return null;
}
/**
* 删除某个任务
*
* @param jobId
* @return
* @throws Exception
*/
public ResultDTO<Void> deleteJob(Long jobId) throws Exception {
return ohMyClient.deleteJob(jobId);
}
/**
* 通过输入指定日期时间生成cron表达式
*
* @param date
* @return cron表达式
*/
public String getCron(Date date) {
logger.info("create cron 接收到得时间", date.toString());
String dateFormat = "ss mm HH dd MM ? yyyy-yyyy";
SimpleDateFormat sdf = new SimpleDateFormat(dateFormat);
String formatTimeStr = null;
if (date != null) {
formatTimeStr = sdf.format(date);
}
System.out.println("当前时间得CRON:" + formatTimeStr);
return formatTimeStr;
}
}
4. 在业务代码中集成进去
Date startTimeCron = new Date(Long.parseLong(String.valueOf(startTime * 1000)));
ResultDTO<Long> resultDTO = powerJobUtil.saveJob(startTimeCron, pBasActivity.getId(), "com.un.framework.task.ActivityProcessorHandler", createActiveVo1.getStartJobId(),"活动定时任务","活动定时-开始任务");
if (resultDTO.isSuccess()) {
pBasActivity.setStartJobId(resultDTO.getData() + "");
} else {
return AjaxResult.error("创建任务失败,请联系管理员");
}
二. 消费端,负责处理任务具体的调用
package com.un.framework.task;
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor;
import com.un.common.utils.job.PowerJobUtil;
import com.un.framework.manager.AsyncManager;
import com.un.framework.manager.factory.AsyncFactory;
import com.un.project.system.domain.dto.TaskNoticeDto;
import com.un.project.system.domain.vo.UpdateNoticeVo;
import com.un.project.system.mapper.PBasNoticeMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
/**
* @author shiye
* @create 2020-09-30 9:42
*/
@Component
public class NoticeProcessorHandler implements BasicProcessor {
protected final Logger logger = LoggerFactory.getLogger(NoticeScheduTask.class);
@Autowired
private PowerJobUtil powerJobUtil;
@Autowired
private PBasNoticeMapper noticeMapper;
public static Lock lock = new ReentrantLock();
@Override
public ProcessResult process(TaskContext context) throws Exception {
ProcessResult result = new ProcessResult(true, "ok");
try {
lock.lock();
String params = context.getJobParams();
logger.info("Start a notice job task......" + params);
//具体得物业处理
logger.info("end a notice job task......{},{}", noticeIds1, noticeIds2);
} catch (Exception e) {
logger.error("error a notice job task......{},{}", e, context.toString());
} finally {
lock.unlock();
}
return result;
}
}