前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >JDK中线程池满后再放入队列

JDK中线程池满后再放入队列

作者头像
克虏伯
发布2020-07-21 10:56:31
1K0
发布2020-07-21 10:56:31
举报
文章被收录于专栏:软件开发-青出于蓝

    JDK中ThreadPoolExecutor有coreSize、maxSize,只有当线程数到coreSize且队列满后才会增加线程数到maxSize.

    想要达到的效果是线程数到maxSize后再放入队列。

方案一

    覆写ThreadPoolExecutor的execute()

List-1

代码语言:javascript
复制
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    //workerCountOf()获取线程数,线程worker数只是int类型二进制位的前几位
    int c = ctl.get();
    //1
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //2
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //3
    else if (!addWorker(command, false))
        reject(command);
}

线程数的表示,是用int类型的二进制的左边几位标示的,所以才需要调用workCountOf()方法来获取

  1. 如果线程数小于coreSize,那么增加worker线程
  2. 将任务放入到队列中,如果成功表示队列没有满
  3. 队列满了,此时尝试去增加worker线程数,让worker线程数达到maxSize

    如果我们自己定义一个CustomThreadPoolExecutor,然后覆写execute(),那么你会发现ctl、workerCountOf、addWorker都是private的,子类上根本访问不了,所以这个方案是不行的

方案二

    自己定义一个CustomThreadPoolExecutor,之后将JDK中ThreadPoolExecutor中的内容全部拷贝过来,之后再改写execute()的实现,但是这个成本很大。

    最重要的,连拒绝策略也不能使用JDK里面的了,因为如下List-2所示第二个参数是ThreadPoolExecutor

List-2

代码语言:javascript
复制
public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable var1, ThreadPoolExecutor var2);
}

    此方案不行

方案三

  1. 自定义queue,改写offer逻辑
  2. 覆写ThreadPoolExecutor的execute()

List-3

代码语言:javascript
复制
public class TaskQueue<R extends Runnable> extends ArrayBlockingQueue<Runnable> {
    private EagerThreadPoolExecutor executor;

    public TaskQueue(int size, boolean fair) {
        super(size, fair);
    }

    @Override
    public boolean offer(Runnable runnable) {
        if (executor == null) {
            throw new RejectedExecutionException("The task queue does not have executor!");
        }
        int currentPoolThreadSize = executor.getPoolSize();
        // have free worker. put task into queue to let the worker deal with task.
        //1
        if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
            return super.offer(runnable);
        }
        // return false to let executor create new worker.
        //2
        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
            return false;
        }
        // currentPoolThreadSize >= max
        //3
        return super.offer(runnable);
    }

    /**
     * retry offer task
     *
     * @param o task
     * @return offer success or not
     * @throws RejectedExecutionException if executor is terminated.
     */
    public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
        if (executor.isShutdown()) {
            throw new RejectedExecutionException("Executor is shutdown!");
        }
        return super.offer(o, timeout, unit);
    }

    public void setExecutor(EagerThreadPoolExecutor executor){
        this.executor = executor;
    }
}

TaskQueue的offer方法是,放入队列时被调用

  1. 如果当前提交的task数少于线程池中线程是数量,那么直接调用父类的offer,将task放入队列,不新建线程,因此此时肯定有空闲的线程
  2. 此时线程池中没有空闲的线程,而且线程数量少于设置的maxSize,此时返回false,让线程池去创建新的线程
  3. 此时线程数量大于等于maxSize,将task放入任务队列中

EagerThreadPoolExecutor继承ThreadPoolExecutor,改写execute()的实现:

List-4

代码语言:javascript
复制
public class EagerThreadPoolExecutor extends ThreadPoolExecutor {
    private final AtomicInteger submittedTaskCount = new AtomicInteger(0);

    public EagerThreadPoolExecutor(int coreSize, int maxSize, long l, TimeUnit timeUnit, BlockingQueue<Runnable> blockingQueue, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
        super(coreSize, maxSize, l, timeUnit, blockingQueue, threadFactory, rejectedExecutionHandler);
    }

    @Override
    protected void beforeExecute(Thread thread, Runnable runnable) {
        //可以结合afterExecute统计执行耗时
    }

    @Override
    public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }
        // do not increment in method beforeExecute!
        //1
        submittedTaskCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            // retry to offer the task into queue.
            final TaskQueue queue = (TaskQueue) super.getQueue();
            try {
                //2
                if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException("Queue capacity is full.", rx);
                }
            } catch (InterruptedException x) {
                submittedTaskCount.decrementAndGet();
                throw new RejectedExecutionException(x);
            }
        } catch (Throwable t) {
            // decrease any way
            //3
            submittedTaskCount.decrementAndGet();
            throw t;
        }
    }

    @Override
    protected void afterExecute(Runnable runnable, Throwable throwable) {
        //ThreadPoolExecutor的勾子方法,在task执行完后需要将池中已提交的任务数 - 1
        //afterExecute和beforeExecute是在runWorker中调用,即使有异常,也不会抛出RejectedExecutionException异常
        submittedTaskCount.decrementAndGet();
        if (throwable!=null) {
            LOG.error("线程池中线程执行出错", throwable);
        }
    }

    public int getSubmittedTaskCount() {
        return submittedTaskCount.get();
    }
}
  1. execute()开始时,将提交的任务数加1,之后调用父类的execute()方法,如List-1中,当线程数达到coreSize后,就会调用queue.offer(),即List-3中的offer(),我们会判断线程数是否少于maxSize,如果少于那么返回false,之后ThreadPoolExecutor.execute()方法会去新增线程
  2. 2处如果被拒绝了,说明队列满了而且线程数达到了maxSize,此时我们再重试一次,将task放入队列中,为什么还要重试一次?List-3中offer中做了一些操作,有可能这期间队列就有空了,所以要重试下。
  3. 3处捕获到任何的异常,之后将task数减去1,3处的Throwable是捕获不到2处抛出的RejectedExecutionException的

    为什么afterExecute()方法中还要将task数减去1呢?

ThreadPoolExecutor中,beforeExecute()和afterExecute()是在runWorker的run()中被调用的,分别在Runnable.run()的前后被调用,而且线程池中抛出异常,在线程池外面是捕获不到的,所以外面需要的afterExecute()中将task数减去1

    改进:我们可以将List-4中使用的AtomicInteger改为JDK8的LongAddr以提高性能

Reference

  1. https://github.com/apache/dubbo/blob/fb5761683729f63c715e39c78a8b7b75278050c9/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/EagerThreadPoolExecutor.java
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 方案一
  • 方案二
  • 方案三
  • Reference
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档