前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java避坑指南:多线程批量调用下游接口,如何正确设置总超时时间

Java避坑指南:多线程批量调用下游接口,如何正确设置总超时时间

作者头像
崔认知
发布2024-09-13 13:54:51
1630
发布2024-09-13 13:54:51
举报
文章被收录于专栏:nobody

多线程批量调用下游接口,设置总超时时间是一种常见的需求,特别是在需要保证程序在预定时间内必须返回,否则超时设置不合理,导致接口变慢。

设置场景:多线程批量执行三个接口,耗时分别为10s、15s、20s(一般不会设置这么大的超时时间,此值为了模拟),总超时时间为15s。

错误做法

代码语言:javascript
复制
package com.renzhikeji.demo;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
 * @author 认知科技技术团队
 * 微信公众号:认知科技技术团队
 */
public class JdkDemo {
    private static final ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10, 100, 100, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(100), Thread::new,
            new ThreadPoolExecutor.AbortPolicy() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                    System.out.println("rejectedExecution");
                    super.rejectedExecution(r, e);
                }
            });

    public static void main(String[] args) {
        List<Future<Integer>> futures = new ArrayList<>(10);
        Future<Integer> future1 = poolExecutor.submit(() -> {
            System.out.println(Thread.currentThread());
            TimeUnit.SECONDS.sleep(10);
            return 1;
        });

        futures.add(future1);
        Future<Integer> future2 = poolExecutor.submit(() -> {
            System.out.println(Thread.currentThread());
            TimeUnit.SECONDS.sleep(15);
            return 1;
        });

        futures.add(future2);
        Future<Integer> future3 = poolExecutor.submit(() -> {
            System.out.println(Thread.currentThread());
            TimeUnit.SECONDS.sleep(20);
            return 1;
        });

        futures.add(future3);


        long start = System.currentTimeMillis();
        for (Future<Integer> integerFuture : futures) {
            try {
                integerFuture.get(15, TimeUnit.SECONDS);
            } catch (Throwable e) {
                e.printStackTrace();
            }

        }

        long d = System.currentTimeMillis() - start;
        System.out.println(d / 1000);

     }

}

执行结果:总超时时间为20s,大于预设置的15s。

上述错误做法:线程池提交任务后,每个任务的超时时间都设置为一个固定值,从而总任务超时超时延长。

java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)方法是对每个任务的超时时间设置,而不是对总任务设置超时时间。

注意:必须保证所有的任务同时执行,核心线程数必须大于等于3,否则会进入队列等待,超时时间会更长。

线程池实现原理解析 崔认知,公众号:认知科技技术团队【八股文Java】图解Java线程池实现原理(ThreadPoolExecutor)

正确做法:使用线程池invokeAll方法

代码语言:javascript
复制
package com.renzhikeji.demo;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

/**
 * @author 认知科技技术团队
 * 微信公众号:认知科技技术团队
 */
public class JdkDemo {
    private static final ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10, 100, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), Thread::new, new ThreadPoolExecutor.AbortPolicy() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            System.out.println("rejectedExecution");
            super.rejectedExecution(r, e);
        }
    });

    public static void main(String[] args) {
        List<Future<Integer>> futures = new ArrayList<>(10);
        Callable<Integer> callable1 = () -> {
            System.out.println(Thread.currentThread());
            TimeUnit.SECONDS.sleep(10);
            return 1;
        };


        Callable<Integer> callable2 = () -> {
            System.out.println(Thread.currentThread());
            TimeUnit.SECONDS.sleep(15);
            return 1;
        };


        Callable<Integer> callable3 = () -> {
            System.out.println(Thread.currentThread());
            TimeUnit.SECONDS.sleep(20);
            return 1;
        };

        long start = System.currentTimeMillis();

        try {
            List<Future<Integer>> invoked = poolExecutor.invokeAll(Arrays.asList(callable1, callable2, callable3),
                    15L, TimeUnit.SECONDS);
            for (Future<Integer> future : invoked) {
                try {
                    Integer a = future.get();
                } catch (Throwable e) {
                    e.printStackTrace();
                }

            }
        } catch (Throwable e) {
            System.out.println("12");
            e.printStackTrace();
        }

        long d = System.currentTimeMillis() - start;
        System.out.println(d / 1000);

    }

}

运行结果:总超时时间为预设值的15s。

线程池invokeAll的原理其实是动态改动了java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)设置的超时时间,每次都会设置为:任务截止时间减去当前时间,如下图源码所示:

正确做法:使用CompletableFuture

使用CompletableFuture.allOf(......).get(15L, TimeUnit.SECONDS),也能设置总任务超时时间。

代码语言:javascript
复制
package com.renzhikeji.demo;

import java.util.concurrent.*;
import java.util.function.Supplier;

/**
 * @author 认知科技技术团队
 * 微信公众号:认知科技技术团队
 */
public class JdkDemo {
    private static final ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10, 100, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), Thread::new, new ThreadPoolExecutor.AbortPolicy() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            System.out.println("rejectedExecution");
            super.rejectedExecution(r, e);
        }
    });

    public static void main(String[] args) {


        Supplier<Integer> callable1 = () -> {
            System.out.println(Thread.currentThread());
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 1;
        };
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(callable1, poolExecutor);

        Supplier<Integer> callable2 = () -> {
            System.out.println(Thread.currentThread());
            try {
                TimeUnit.SECONDS.sleep(15);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 1;
        };
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(callable2, poolExecutor);


        Supplier<Integer> callable3 = () -> {
            System.out.println(Thread.currentThread());
            try {
                TimeUnit.SECONDS.sleep(20);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 1;
        };

        CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(callable3, poolExecutor);

        long start = System.currentTimeMillis();


        try {
            CompletableFuture.allOf(future2, future2, future3).get(15L, TimeUnit.SECONDS);
        } catch (Throwable e) {
            e.printStackTrace();
        }


        long d = System.currentTimeMillis() - start;
        System.out.println(d / 1000);

        try {
            Integer integer = future1.get();
            System.out.println("future1");
        } catch (Throwable e) {
            e.printStackTrace();

        }
        try {
            Integer integer = future2.get();
            System.out.println("future2");
        } catch (Throwable e) {
            e.printStackTrace();

        }
        try {
            Integer integer = future3.get();
            System.out.println("future3");
        } catch (Throwable e) {
            e.printStackTrace();

        }

    }

}

执行结果:任务1、任务2执行完了,任务3超时了。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-06-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 认知科技技术团队 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档