前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >“多线程去刷一张大表数据,发现任务老是中断,怎么解决?”

“多线程去刷一张大表数据,发现任务老是中断,怎么解决?”

作者头像
烟雨平生
发布2025-01-19 20:57:22
发布2025-01-19 20:57:22
7200
代码可运行
举报
文章被收录于专栏:数字化之路数字化之路
运行总次数:0
代码可运行

一个同事咨询了问题:

“任务中断”具体是指什么情况?

原来是指任务没有再继续执行了。

复盘

刷大量数据时使用多线程,的确是一个加速的办法。这个方案的方向没有问题。

线程池怎么工作的呢?

JDK的线程池是coreSize满了之后,后面的任务会放到等待队列。 如果等待队列已满,则增加worker线程数处理新任务; 如果有新任务进来,等待队列仍然满,则继续启动新的worker线程,直到等于maxPoolSize。 这个线程池中的worker线程数等于maxPoolSize后,新进来的任务会按线程池配置的RejectExecutionHandler[拒绝策略]处理。 唐成:的数字化之路容量评估实践:一个Tomcat最多能同时处理多少个HTTP请求?

具体到这个场景是怎么样?

会有10个任务同时执行,在这10个任务未结束前,新添加的任务会放到等待队列,当队列中存够500个任务时,则会报错,不会新启动线程来处理任务,因为maxPoolSize是10与corePoolSize相同。

如果addTask的方法没有try catch,则添加任务的工作就结束了,假设1个任务需要花费1分钟,添加50万个任务需要59s,那么当前的代码只会处理前510个任务,其它的任务都不会被执行。

问题找到了:

目前设计容量远远小于任务的个数,且如果线程池中无法存放的新任务都被abort了。

如何解决?

在当前的代码基础上优化。

方案1:如果想最小改动,则需要检查跑任务机器的配置。

如果跑任务的机器配置较高,则加大队列大小即可。

把线程池QueueCapacity的大小调整为任务的个数。

方案2:跑完一个批次再加载新的待处理任务。

2.1 使用一个计数器来协同任务的执行:

代码语言:javascript
代码运行次数:0
复制
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class BatchTaskProcessor{
    // 任务总数
    private static final int TOTAL_TASKS=;
    // 每批任务数
    private static final int BATCH_SIZE=;
    // 线程池
    private static final ExecutorService executorService =Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    public static void main(String[] args){
        // 用于记录已处理任务数
        AtomicInteger processedTasks =new AtomicInteger();

        // 加载并执行第一批任务
        loadAndExecuteTasks(processedTasks);

        // 使用守护线程监听任务执行情况,当一批任务完成后加载下一批
        Thread monitorThread =newThread(()->{
            while(processedTasks.get()<TOTAL_TASKS){
                if(processedTasks.get()%BATCH_SIZE==&& processedTasks.get()!=){
                    loadAndExecuteTasks(processedTasks);
                }
            }
            // 所有任务处理完毕,关闭线程池
            executorService.shutdown();
            System.out.println("所有任务处理完毕");
        });
        monitorThread.setDaemon(true);// 设置为守护线程
        monitorThread.start();
    }

    // 加载并执行任务的方法
    private static void loadAndExecuteTasks(AtomicInteger processedTasks){
        for(int i =; i <BATCH_SIZE; i++){
            int taskId = processedTasks.incrementAndGet();
            if(taskId >TOTAL_TASKS){
                break;
            }
            executorService.submit(()->{
                // 模拟任务执行
                System.out.println("任务 "+ taskId +" 正在执行,当前线程:"+Thread.currentThread().getName());
                try{
                    Thread.sleep();// 模拟任务耗时
                }catch(InterruptedException e){
                    e.printStackTrace();
                }
            });
        }
    }
}

代码说明

  • TOTAL_TASKS:定义了任务的总数。
  • BATCH_SIZE:定义了每批加载的任务数,这里为1000。
  • executorService:创建了一个固定大小的线程池,其大小为当前机器可用处理器的数量。
  • processedTasks:使用AtomicInteger来记录已处理的任务数,确保线程安全。
  • loadAndExecuteTasks:该方法用于加载并执行一批任务。通过processedTasks.incrementAndGet()来获取当前任务的ID,并提交给线程池执行。任务执行时模拟了一些操作,并打印了任务ID和当前执行的线程名称。
  • monitorThread:这是一个守护线程,用于监听任务的执行情况。当已处理任务数达到一批任务数的整数倍且不为0时,说明上一批任务已提交完成,此时加载下一批任务。当所有任务处理完毕时,关闭线程池并打印提示信息。

2.2 也可以使用CountDownLatch协同线程之间的执行:

代码语言:javascript
代码运行次数:0
复制
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class BatchTaskProcessorWithLatch{
    // 任务总数
    private static final int TOTAL_TASKS=;
    // 每批任务数
    private static final int BATCH_SIZE=;
    // 线程池
    private static final ExecutorService executorService =Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    public static void main(String[] args){
        int remainingTasks =TOTAL_TASKS;
        while(remainingTasks >){
            // 创建一个CountDownLatch,用于等待当前批次的任务完成
            CountDownLatch latch =new CountDownLatch(Math.min(BATCH_SIZE, remainingTasks));

            for(int i =; i <Math.min(BATCH_SIZE, remainingTasks); i++){
                int taskId =TOTAL_TASKS- remainingTasks + i +;
                executorService.submit(()->{
                    // 模拟任务执行
                    System.out.println("任务 "+ taskId +" 正在执行,当前线程:"+Thread.currentThread().getName());
                    try{
                        Thread.sleep();// 模拟任务耗时
                    }catch(InterruptedException e){
                        e.printStackTrace();
                    }finally{
                        latch.countDown();// 任务完成,计数器减1
                    }
                });
            }

            try{
                // 主线程等待当前批次的所有任务完成
                latch.await();
            }catch(InterruptedException e){
                e.printStackTrace();
            }
            remainingTasks -=BATCH_SIZE;// 更新剩余任务数
        }

        // 所有任务处理完毕,关闭线程池
        executorService.shutdown();
        System.out.println("所有任务处理完毕");
    }
}

代码说明

  • TOTAL_TASKS:定义了任务的总数。
  • BATCH_SIZE:定义了每批加载的任务数,这里为1000。
  • executorService:创建了一个固定大小的线程池,其大小为当前机器可用处理器的数量。
  • remainingTasks:用于记录剩余未处理的任务数。
  • CountDownLatch:在每个批次开始时创建一个CountDownLatch,其计数器初始化为当前批次的任务数。每个任务完成时,计数器减1。
  • 任务提交:在每个批次中,提交任务到线程池执行。任务执行完毕后,调用latch.countDown()减少计数器。
  • 主线程等待:主线程通过latch.await()等待当前批次的所有任务完成,确保一批任务全部处理完毕后再加载下一批任务。
  • 更新剩余任务数:每处理完一批任务,更新剩余任务数。
  • 关闭线程池:所有任务处理完毕后,关闭线程池并打印提示信息。

2.3 也可以使用CompletableFuture来实现:

代码语言:javascript
代码运行次数:0
复制
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class BatchTaskProcessorWithCompletableFuture{
    // 任务总数
    private static final intTOTAL_TASKS=;
    // 每批任务数
    private static final int BATCH_SIZE=;
    // 线程池
    private static final ExecutorService executorService =Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    public static void main(String[] args)throws Exception{
        int remainingTasks =TOTAL_TASKS;
        while(remainingTasks >){
            List<CompletableFuture<Void>> futures =new ArrayList<>();
            int batchSize =Math.min(BATCH_SIZE, remainingTasks);

            for(int i =; i < batchSize; i++){
                int taskId =TOTAL_TASKS- remainingTasks + i +;
                CompletableFuture<Void> future =CompletableFuture.runAsync(()->{
                    // 模拟任务执行
                    System.out.println("任务 "+ taskId +" 正在执行,当前线程:"+Thread.currentThread().getName());
                    try{
                        Thread.sleep();// 模拟任务耗时
                    }catch(InterruptedException e){
                        e.printStackTrace();
                    }
                }, executorService);
                futures.add(future);
            }

            // 等待当前批次的所有任务完成
            CompletableFuture<Void> allFutures =CompletableFuture.allOf(
                futures.toArray(new CompletableFuture[])
            );
            allFutures.join();

            remainingTasks -= batchSize;// 更新剩余任务数
        }

        // 所有任务处理完毕,关闭线程池
        executorService.shutdown();
        System.out.println("所有任务处理完毕");
    }
}

代码说明

  • TOTAL_TASKS:定义了任务的总数。
  • BATCH_SIZE:定义了每批加载的任务数,这里为1000。
  • executorService:创建了一个固定大小的线程池,其大小为当前机器可用处理器的数量。
  • remainingTasks:用于记录剩余未处理的任务数。
  • futures:用于存储当前批次的所有CompletableFuture对象。
  • 任务提交:在每个批次中,提交任务到线程池执行。每个任务执行完毕后,CompletableFuture会自动完成。
  • 等待当前批次任务完成:使用CompletableFuture.allOf方法等待当前批次的所有任务完成。allOf方法接受一个CompletableFuture数组,当所有任务都完成时,返回的CompletableFuture也会完成。
  • 更新剩余任务数:每处理完一批任务,更新剩余任务数。
  • 关闭线程池:所有任务处理完毕后,关闭线程池并打印提示信息。

2.1,2.2,2.3这三个方案都实现了这个功能,从结果上看没有区别。

各位粉丝朋友根据自己的喜好取用就好。

一台机器的处理能力是有限,如果任务量特别大,譬如2千万,跑起来就比较耗时。

如果想在更短的时间内,把这些任务处理完,可以采用数据分片的方式。

分片的办法很多,今天分享一个基于消息队列(MQ)的方案。

MQ&nbsp;更适合应用之间的解耦、隔离、事件通知。例如订单支付、订单完成、订单履约完成等等事件需要广播出去,通知下游其他微服务。 唐成,公众号:的数字化之路看线上代码用了观察者模式,那叫一个优雅,但有隐患

基于MQ的数据分片与分布式任务执行方案

1. 任务分片

首先,将大量任务按照一定规则划分为多个分片(segments)。例如,可以根据任务的ID、数据的哈希值或其他业务逻辑进行分片。每个分片包含一定数量的任务,确保分片后的任务能够均匀分布在不同的机器上。

2. 消息队列(MQ)配置

使用消息队列(如RabbitMQ)来统一分发调度任务。每个任务分片作为一个消息发送到MQ中,不同的机器(消费者)从MQ中消费任务并执行。

3. 任务生产者

任务生产者负责将任务分片发送到MQ中。可以使用Spring的RabbitTemplate来发送消息。

代码语言:javascript
代码运行次数:0
复制
// 任务生产者
@Autowired
privateRabbitTemplate rabbitTemplate;

publicvoidgenerateTasks(){
    List<Task> tasks = taskService.getAllTasks();// 获取所有任务
    int batchSize =;// 每批任务数
    for(int i =; i < tasks.size(); i += batchSize){
        int end =Math.min(i + batchSize, tasks.size());
        List<Task> batchTasks = tasks.subList(i, end);
        rabbitTemplate.convertAndSend("task_exchange","task_routing_key", batchTasks);
        System.out.println("发送任务分片:"+ batchTasks.size());
    }
}
4. 任务消费者

任务消费者从MQ中消费任务分片并执行。每个消费者可以是一个独立的微服务或一个分布式节点。

代码语言:javascript
代码运行次数:0
复制
@RabbitListener(queues ="task_queue")
public void processTasks(Message message,Channel channel){
    try{
        List<Task> batchTasks =(List<Task>) message.getMessageProperties().getBody();
        System.out.println("处理任务分片:"+ batchTasks.size());
        for(Task task : batchTasks){
            // 执行任务
            taskService.executeTask(task);
        }
        // 获取消息的deliveryTag
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        // 手动确认消息
        channel.basicAck(deliveryTag,false);
        System.out.println("消息确认成功,deliveryTag:"+ deliveryTag);
    }catch(Exception e){
        // 处理异常,可以选择重新入队或丢弃消息
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        channel.basicNack(deliveryTag,false,true);// 重新入队
        System.out.println("消息确认失败,重新入队,deliveryTag:"+ deliveryTag);
    }
}

代码说明

  • Message:RabbitMQ传递的消息对象,包含了消息体和消息属性。
  • Channel:RabbitMQ的通道对象,用于执行消息确认操作。
  • batchTasks:从消息体中获取的任务分片。
  • deliveryTag:消息的唯一标识,由RabbitMQ服务器分配,用于确认消息。
  • basicAck:手动确认消息,参数deliveryTag标识要确认的消息,false表示不批量确认。
  • basicNack:拒绝确认消息,参数deliveryTag标识要拒绝的消息,false表示不批量拒绝,true表示将消息重新入队。

配置手动ACK模式

application.propertiesapplication.yml中添加以下配置,开启手动ACK模式:

properties复制

代码语言:javascript
代码运行次数:0
复制
spring.rabbitmq.listener.simple.acknowledge-mode=manual

通过以上配置和代码实现,可以确保消费者在处理完消息后才向RabbitMQ发送确认消息,从而提高消息处理的可靠性和系统的稳定性。

5. 分布式任务调度

使用MQ统一分发调度任务,每个服务根据自身的业务需求,从MQ中消费任务并执行。这样可以确保任务的均匀分布和高效处理。

6. 动态负载均衡

为了应对任务量的变化和节点的动态增减,可以使用MQ的动态负载均衡功能。当某个节点负载过高或出现故障时,MQ可以将任务重新分配到其他健康的节点上。

7. 任务状态监控

监控任务的执行状态,确保任务能够顺利完成。可以使用心跳机制或健康检查来监控节点的健康状况,并及时调整任务分配。

文中的代码由AI生成

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-01-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 的数字化之路 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 复盘
  • 如何解决?
    • 代码说明
    • 代码说明
    • 代码说明
    • 基于MQ的数据分片与分布式任务执行方案
      • 1. 任务分片
      • 2. 消息队列(MQ)配置
      • 3. 任务生产者
      • 4. 任务消费者
    • 代码说明
    • 配置手动ACK模式
      • 5. 分布式任务调度
      • 6. 动态负载均衡
      • 7. 任务状态监控
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档