一个同事咨询了问题:
“任务中断”具体是指什么情况?
原来是指任务没有再继续执行了。
刷大量数据时使用多线程,的确是一个加速的办法。这个方案的方向没有问题。
线程池怎么工作的呢?
JDK的线程池是coreSize满了之后,后面的任务会放到等待队列。 如果等待队列已满,则增加worker线程数处理新任务; 如果有新任务进来,等待队列仍然满,则继续启动新的worker线程,直到等于maxPoolSize。 这个线程池中的worker线程数等于maxPoolSize后,新进来的任务会按线程池配置的RejectExecutionHandler[拒绝策略]处理。 唐成:的数字化之路容量评估实践:一个Tomcat最多能同时处理多少个HTTP请求?
具体到这个场景是怎么样?
会有10个任务同时执行,在这10个任务未结束前,新添加的任务会放到等待队列,当队列中存够500个任务时,则会报错,不会新启动线程来处理任务,因为maxPoolSize是10与corePoolSize相同。
如果addTask的方法没有try catch,则添加任务的工作就结束了,假设1个任务需要花费1分钟,添加50万个任务需要59s,那么当前的代码只会处理前510个任务,其它的任务都不会被执行。
问题找到了:
目前设计容量远远小于任务的个数,且如果线程池中无法存放的新任务都被abort了。
在当前的代码基础上优化。
方案1:如果想最小改动,则需要检查跑任务机器的配置。
如果跑任务的机器配置较高,则加大队列大小即可。
把线程池QueueCapacity的大小调整为任务的个数。
方案2:跑完一个批次再加载新的待处理任务。
2.1 使用一个计数器来协同任务的执行:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class BatchTaskProcessor{
// 任务总数
private static final int TOTAL_TASKS=;
// 每批任务数
private static final int BATCH_SIZE=;
// 线程池
private static final ExecutorService executorService =Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
public static void main(String[] args){
// 用于记录已处理任务数
AtomicInteger processedTasks =new AtomicInteger();
// 加载并执行第一批任务
loadAndExecuteTasks(processedTasks);
// 使用守护线程监听任务执行情况,当一批任务完成后加载下一批
Thread monitorThread =newThread(()->{
while(processedTasks.get()<TOTAL_TASKS){
if(processedTasks.get()%BATCH_SIZE==&& processedTasks.get()!=){
loadAndExecuteTasks(processedTasks);
}
}
// 所有任务处理完毕,关闭线程池
executorService.shutdown();
System.out.println("所有任务处理完毕");
});
monitorThread.setDaemon(true);// 设置为守护线程
monitorThread.start();
}
// 加载并执行任务的方法
private static void loadAndExecuteTasks(AtomicInteger processedTasks){
for(int i =; i <BATCH_SIZE; i++){
int taskId = processedTasks.incrementAndGet();
if(taskId >TOTAL_TASKS){
break;
}
executorService.submit(()->{
// 模拟任务执行
System.out.println("任务 "+ taskId +" 正在执行,当前线程:"+Thread.currentThread().getName());
try{
Thread.sleep();// 模拟任务耗时
}catch(InterruptedException e){
e.printStackTrace();
}
});
}
}
}
AtomicInteger
来记录已处理的任务数,确保线程安全。processedTasks.incrementAndGet()
来获取当前任务的ID,并提交给线程池执行。任务执行时模拟了一些操作,并打印了任务ID和当前执行的线程名称。2.2 也可以使用CountDownLatch协同线程之间的执行:
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class BatchTaskProcessorWithLatch{
// 任务总数
private static final int TOTAL_TASKS=;
// 每批任务数
private static final int BATCH_SIZE=;
// 线程池
private static final ExecutorService executorService =Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
public static void main(String[] args){
int remainingTasks =TOTAL_TASKS;
while(remainingTasks >){
// 创建一个CountDownLatch,用于等待当前批次的任务完成
CountDownLatch latch =new CountDownLatch(Math.min(BATCH_SIZE, remainingTasks));
for(int i =; i <Math.min(BATCH_SIZE, remainingTasks); i++){
int taskId =TOTAL_TASKS- remainingTasks + i +;
executorService.submit(()->{
// 模拟任务执行
System.out.println("任务 "+ taskId +" 正在执行,当前线程:"+Thread.currentThread().getName());
try{
Thread.sleep();// 模拟任务耗时
}catch(InterruptedException e){
e.printStackTrace();
}finally{
latch.countDown();// 任务完成,计数器减1
}
});
}
try{
// 主线程等待当前批次的所有任务完成
latch.await();
}catch(InterruptedException e){
e.printStackTrace();
}
remainingTasks -=BATCH_SIZE;// 更新剩余任务数
}
// 所有任务处理完毕,关闭线程池
executorService.shutdown();
System.out.println("所有任务处理完毕");
}
}
CountDownLatch
,其计数器初始化为当前批次的任务数。每个任务完成时,计数器减1。latch.countDown()
减少计数器。latch.await()
等待当前批次的所有任务完成,确保一批任务全部处理完毕后再加载下一批任务。2.3 也可以使用CompletableFuture来实现:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class BatchTaskProcessorWithCompletableFuture{
// 任务总数
private static final intTOTAL_TASKS=;
// 每批任务数
private static final int BATCH_SIZE=;
// 线程池
private static final ExecutorService executorService =Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
public static void main(String[] args)throws Exception{
int remainingTasks =TOTAL_TASKS;
while(remainingTasks >){
List<CompletableFuture<Void>> futures =new ArrayList<>();
int batchSize =Math.min(BATCH_SIZE, remainingTasks);
for(int i =; i < batchSize; i++){
int taskId =TOTAL_TASKS- remainingTasks + i +;
CompletableFuture<Void> future =CompletableFuture.runAsync(()->{
// 模拟任务执行
System.out.println("任务 "+ taskId +" 正在执行,当前线程:"+Thread.currentThread().getName());
try{
Thread.sleep();// 模拟任务耗时
}catch(InterruptedException e){
e.printStackTrace();
}
}, executorService);
futures.add(future);
}
// 等待当前批次的所有任务完成
CompletableFuture<Void> allFutures =CompletableFuture.allOf(
futures.toArray(new CompletableFuture[])
);
allFutures.join();
remainingTasks -= batchSize;// 更新剩余任务数
}
// 所有任务处理完毕,关闭线程池
executorService.shutdown();
System.out.println("所有任务处理完毕");
}
}
CompletableFuture
对象。CompletableFuture
会自动完成。CompletableFuture.allOf
方法等待当前批次的所有任务完成。allOf
方法接受一个CompletableFuture
数组,当所有任务都完成时,返回的CompletableFuture
也会完成。2.1,2.2,2.3这三个方案都实现了这个功能,从结果上看没有区别。
各位粉丝朋友根据自己的喜好取用就好。
一台机器的处理能力是有限,如果任务量特别大,譬如2千万,跑起来就比较耗时。
如果想在更短的时间内,把这些任务处理完,可以采用数据分片的方式。
分片的办法很多,今天分享一个基于消息队列(MQ)的方案。
MQ 更适合应用之间的解耦、隔离、事件通知。例如订单支付、订单完成、订单履约完成等等事件需要广播出去,通知下游其他微服务。 唐成,公众号:的数字化之路看线上代码用了观察者模式,那叫一个优雅,但有隐患
首先,将大量任务按照一定规则划分为多个分片(segments)。例如,可以根据任务的ID、数据的哈希值或其他业务逻辑进行分片。每个分片包含一定数量的任务,确保分片后的任务能够均匀分布在不同的机器上。
使用消息队列(如RabbitMQ)来统一分发调度任务。每个任务分片作为一个消息发送到MQ中,不同的机器(消费者)从MQ中消费任务并执行。
任务生产者负责将任务分片发送到MQ中。可以使用Spring的RabbitTemplate
来发送消息。
// 任务生产者
@Autowired
privateRabbitTemplate rabbitTemplate;
publicvoidgenerateTasks(){
List<Task> tasks = taskService.getAllTasks();// 获取所有任务
int batchSize =;// 每批任务数
for(int i =; i < tasks.size(); i += batchSize){
int end =Math.min(i + batchSize, tasks.size());
List<Task> batchTasks = tasks.subList(i, end);
rabbitTemplate.convertAndSend("task_exchange","task_routing_key", batchTasks);
System.out.println("发送任务分片:"+ batchTasks.size());
}
}
任务消费者从MQ中消费任务分片并执行。每个消费者可以是一个独立的微服务或一个分布式节点。
@RabbitListener(queues ="task_queue")
public void processTasks(Message message,Channel channel){
try{
List<Task> batchTasks =(List<Task>) message.getMessageProperties().getBody();
System.out.println("处理任务分片:"+ batchTasks.size());
for(Task task : batchTasks){
// 执行任务
taskService.executeTask(task);
}
// 获取消息的deliveryTag
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 手动确认消息
channel.basicAck(deliveryTag,false);
System.out.println("消息确认成功,deliveryTag:"+ deliveryTag);
}catch(Exception e){
// 处理异常,可以选择重新入队或丢弃消息
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicNack(deliveryTag,false,true);// 重新入队
System.out.println("消息确认失败,重新入队,deliveryTag:"+ deliveryTag);
}
}
deliveryTag
标识要确认的消息,false
表示不批量确认。deliveryTag
标识要拒绝的消息,false
表示不批量拒绝,true
表示将消息重新入队。在application.properties
或application.yml
中添加以下配置,开启手动ACK模式:
properties复制
spring.rabbitmq.listener.simple.acknowledge-mode=manual
通过以上配置和代码实现,可以确保消费者在处理完消息后才向RabbitMQ发送确认消息,从而提高消息处理的可靠性和系统的稳定性。
使用MQ统一分发调度任务,每个服务根据自身的业务需求,从MQ中消费任务并执行。这样可以确保任务的均匀分布和高效处理。
为了应对任务量的变化和节点的动态增减,可以使用MQ的动态负载均衡功能。当某个节点负载过高或出现故障时,MQ可以将任务重新分配到其他健康的节点上。
监控任务的执行状态,确保任务能够顺利完成。可以使用心跳机制或健康检查来监控节点的健康状况,并及时调整任务分配。
文中的代码由AI生成