Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >ThreadPoolExecutor 源码分析

ThreadPoolExecutor 源码分析

作者头像
itliusir
发布于 2020-01-15 02:14:15
发布于 2020-01-15 02:14:15
35100
代码可运行
举报
文章被收录于专栏:刘君君刘君君
运行总次数:0
代码可运行

摘要:

  1. ThreadPoolExecutor 线程池是如何实现的

TOP 带着问题看源码

  1. ThreadPoolExecutor 线程池是如何实现的

1. 基本介绍

前面文章的 Thread 我们也分析了,因为 Java 中的Thread 和 内核线程是 1 : 1 的,所以线程是一个重量级的对象,应该避免频繁创建和销毁,我们可以使用线程池来避免。

ThreadPoolExecutor 是 Java 实现的线程池,它并没有采取常见的池化资源的设计方法,而是采用的 生产者-消费者 模式。

上图的左边是线程池的核心体系,右边是 JDK 提供创建线程池的工具类。

Executor 接口

提供最基础的执行方法 execute(Runnable command)

ExecutorService 接口

基于 Executor 接口,新增了线程池的一些操作能力

AbstractExecutorService 抽象类

使用模板模式,丰富了一部分操作的细节流程

ForkJoinPool 实现类

jdk1.7 中新增的线程池类,适用于分治的场景

2. 成员变量 & 核心类分析

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 控制变量 前 3 位标示运行状态,后 29 位标识工作线程的数量
// 初始化为 RUNNING 状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 移位的段
private static final int COUNT_BITS = Integer.SIZE - 3;
// 后29位,标识容量
// ‭0001 1111 1111 1111 1111 1111 1111 1111‬
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// 下面是线程池状态
// 表示可以接受新任务,且可执行队列的任务
// 111 0 0000 ... 0000
private static final int RUNNING    = -1 << COUNT_BITS;
// 不接收新任务,但可以执行队列的任务
// 000 0 0000 ... 0000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 中断正在执行的,不再接收和执行队列的任务
// 001 0 0000 ... 0000
private static final int STOP       =  1 << COUNT_BITS;
// 半中止状态,所有任务都已中止且无工作线程,修改为这个状态,然后执行 terminated() 方法
// 010 0 0000 ... 0000
private static final int TIDYING    =  2 << COUNT_BITS;
// 中止状态,已经执行过 terminated() 方法
// 011 0 0000 ... 0000
private static final int TERMINATED =  3 << COUNT_BITS;
// 获取状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 获取工作线程的数量
private static int workerCountOf(int c)  { return c & CAPACITY; }
// ctl 的值
private static int ctlOf(int rs, int wc) { return rs | wc; }

3. 核心方法分析

3.1 普通任务提交

3.1.1 execute(Runnable command)

主要过程就是:

  1. 如果当前工作线程没有达到核心线程数量阈值,就直接添加一个核心工作线程
  2. 如果达到了核心线程数量阈值,就入任务队列,如果状态不正常,执行拒绝策略
  3. 如果队列满了,就创建非核心线程
  4. 如果创建非核心线程失败(达到了最大数量阈值、线程池状态不正常),执行拒绝策略
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public void execute(Runnable command) {
    // 校验是否为空
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 如果工作线程数小于核心数
    if (workerCountOf(c) < corePoolSize) {
        // 添加一个核心工作线程
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 如果线程池状态正常,并且达到了核心数量,就入队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 再次检查状态,如果不是运行状态就移除任务并执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 再次检查,如果工作线程数量是0,就创建一个
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 如果入队列失败,就尝试创建非核心工作线程
    else if (!addWorker(command, false))
        // 创建非核心线程失败,执行拒绝策略
        reject(command);
}
3.1.2 addWorker(Runnable firstTask, boolean core)

addWorker 方法主要作用就是创建一个工作线程,并加入到工作线程的集合中,然后启动。在此期间会进行状态和数量的校验。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 校验状态
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            // 校验工作线程数量
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 数量+1 跳出循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 创建工作线程,把 firstTask 封装到 Worker 对象,然后把 Worker 对象传给 thread
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());
				// 再次检查状态
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 加入到工作线程集合
                    workers.add(w);
                    // 目前集合的数量
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // 标记添加成功
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 添加成功就启动线程
                // 通过上面 new Worker 的分析,我们知道这里会调用 Worker对象的 run方法
                // run 方法里接着调用 runWorker(this)
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 没有启动成功,执行降级方法(从集合中清除掉、数量减少、)
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
3.1.3 runWorker(Worker w)

如果有第一个任务就先执行,之后从任务队列取任务执行。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
final void runWorker(Worker w) {
    // 获取当前工作线程
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // ???
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // task如果为空就取任务,如果任务也取不到就结束循环
        // getTask() 方法主要就是从任务队列中取任务
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // 检查状态
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 任务执行前扩展方法,例如:可以在执行前等待实现暂停的效果
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 任务执行后扩展方法,例如:可以在执行后释放一些资源
                    afterExecute(task, thrown);
                }
            } finally {
                // 置 null,重新从队列取
                task = null;
                // 增加完成数
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

3.2 异步任务提交

3.2.1 submit(Callable task)

submit 方法定义在模板类 AbstractExecutorService 中,然后把 task 封装为 FutureTask , 最后调用 execute 方法来提交任务

AbstractExecutorService#submit

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

我们上面分析 execute 方法知道其最终执行的地方还是调用的 task 的 run 方法,所以我们来分析 FutureTask 的 run 方法。

3.2.2 run()

主要是多了一个执行结果的记录

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public void run() {
    // 线程状态不为 NEW 或者 修改当前线程来运行当前任务失败,直接返回
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        // 再次校验线程状态
        if (c != null && state == NEW) {
            // 注意盯着这个运行结果变量
            V result;
            boolean ran;
            try {
                // 任务执行
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                // 执行异常就修改线程状态为 EXCEPTIONAL
                setException(ex);
            }
            if (ran)
                // 执行正常就修改线程的状态为 NORMAL
                set(result);
        }
    } finally {
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
3.2.3 get()

主要思路就是自旋等待线程执行完

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // 如果线程状态没完成,就进入等待队列
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    // 自旋
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        // 已完成就返回
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        // 快完成(异常),就等一会
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        // 第一次进来一般会走到这里,把当前线程构建一个等待节点
        else if (q == null)
            q = new WaitNode();
        // 第二次循环尝试把节点入队
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        // 如果有超时时间
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        // 如果发现入队失败(已经入队过了),就挂起当前线程
        else
            LockSupport.park(this);
    }
}

4. 总结

可以看到,线程池实际上是一个生产-消费模型的实现,其支持普通任务提交和异步任务提交(ps.. 其实叫异步并不是很合适,对于用户来说线程池本来就是异步的)。

知道了核心数量以及等待队列还有最大数量这些功能的实现,相信对如何更好的使用线程池会更有帮助。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020-01-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
ThreadPoolExecutor源码分析
ThreadPoolExecutor继承AbstractExecutorService,层级实现了ExecutorService,ExecutorService继承了Executor
冰枫
2018/04/16
1.1K6
JDK源码分析-ThreadPoolExecutor
ThreadPoolExecutor 是 JDK 中线程池的实现类,它的继承结构如下:
WriteOnRead
2019/08/30
4200
ThreadPoolExecutor源码学习
但点进去看newSingleThreadExecutor可以看到其会调用ThreadPoolExecutor里面的线程。因此有必要研究ThreadPoolExecutor。
路行的亚洲
2020/07/21
4240
细说线程池---高级篇
上一篇中已经讲了线程池的原理。这一次来说说源码执行过程。建议先看看细说线程池---入门篇 细说线程池---中级篇
田维常
2020/02/13
4940
细说线程池---高级篇
【每周三面】源码角度说说Java线程池
本文来源:http://yeming.me/2016/05/07/threadPool1/
趣学程序-shaofeer
2020/05/18
4520
【每周三面】源码角度说说Java线程池
线程池技术之:ThreadPoolExecutor 源码解析
java中的所说的线程池,一般都是围绕着 ThreadPoolExecutor 来展开的。其他的实现基本都是基于它,或者模仿它的。所以只要理解 ThreadPoolExecutor, 就相当于完全理解了线程池的精髓。
烂猪皮
2021/03/16
3550
线程池技术之:ThreadPoolExecutor 源码解析
从源码看JDK提供的线程池(ThreadPoolExecutor) 一丶什么是线程池二丶ThreadPoolExecutor的使用三丶从源码来看ThreadPoolExecutor
一丶什么是线程池 (1)博主在听到线程池三个字的时候第一个想法就是数据库连接池,回忆一下,我们在学JavaWeb的时候怎么理解数据库连接池的,数据库创建连接和关闭连接是一个比较耗费资源的事情,对于那些数量多且时间短暂的任务,会导致频繁获取和释放连接,这样使得处理事务的效率大大降低,多以我们创建一个连接池,里面放了指定数目的连接,当应用需要数据库连接的时候去里面获取,使用完毕后再放到连接池里,这样就避免了重复的获取连接和释放连接,至于要获取什么样的连接池我们可以根据应用的特征,设置参数来决定。 (2)线程池
MindMrWang
2018/04/19
9950
从源码看JDK提供的线程池(ThreadPoolExecutor)
		一丶什么是线程池二丶ThreadPoolExecutor的使用三丶从源码来看ThreadPoolExecutor
ThreadPoolExecutor运行原理
Java并发工具集(J.U.C)是开发中使用使用最多的功能之一,其主要的目的是简化Java并发程序的开发过程。其中使用最频繁的则要数线程池技术。还记得刚从事工作的时候,就参考《Thinking In Java》中的例子实现了在ExecutorService基础之上的文件并发处理程序,现在还在生产环境上稳定运行。本文主要分析J.U.C中线程池的执行过程和工作原理,作为自己学习的一点总结,以下的版本基于JDK8进行分析。
BUG弄潮儿
2021/02/03
1.2K0
ThreadPoolExecutor运行原理
【死磕JDK源码】ThreadPoolExecutor源码保姆级详解
线程池的状态用高3位表示,其中包括了符号位。五种状态的十进制值按从小到大依次排序为:
JavaEdge
2021/10/18
3320
Java中线程池ThreadPoolExecutor原理探究
线程池主要解决两个问题:一方面当执行大量异步任务时候线程池能够提供较好的性能,,这是因为使用线程池可以使每个任务的调用开销减少(因为线程池线程是可以复用的)。另一方面线程池提供了一种资源限制和管理的手段,比如当执行一系列任务时候对线程的管理,每个ThreadPoolExecutor也保留了一些基本的统计数据,比如当前线程池完成的任务数目。
加多
2018/09/06
7110
Java中线程池ThreadPoolExecutor原理探究
手写线程池,对照学习ThreadPoolExecutor线程池实现原理!
作者:小傅哥 博客:https://bugstack.cn Github:https://github.com/fuzhengwei/CodeGuide/wiki
huofo
2022/03/18
4010
手写线程池,对照学习ThreadPoolExecutor线程池实现原理!
把线程池的底ku扒个底朝天-源码解析
上一篇我们介绍了线程池的使用,这一篇我们接着分析下线程池的实现原理。首先从创建线程池的核心类ThreadPoolExecutor类说起。
码农飞哥
2021/08/18
3020
从源码的角度解析线程池运行原理
在讲解完线程池的构造参数和一些不常用的设置之后,有些同学还是想继续深入地了解线程池的原理,所以这篇文章科代表会带大家深入源码,从底层吃透线程池的运行原理。
纯洁的微笑
2019/06/14
5920
从源码的角度解析线程池运行原理
深入浅出线程池原理
由此可见BlockingQueue是一个继承与Queue的接口,而他的具体实现最常用的无非一下几种
闫同学
2023/11/16
1950
ThreadPoolExcutor源码分析
RUNNING -> SHUTDOWN:手动调用shutdown方法,或者ThreadPoolExecutor要被GC回收的时候调用finalize方法,finalize方法内部也会调用shutdown方法
袁新栋-jeff.yuan
2020/08/26
4030
ThreadPoolExcutor源码分析
【初识】-JUC·ThreadPoolExecutor 线程池
ThreadPoolExecutor算是JUC中最常用的类之一了。ThreadPoolExecutor,顾名思义,thread-pool-executor,硬翻译就是“线程-池-执行者”;java中,通过ThreadPoolExecutor可以很容易的创建一个线程池。但是我们为什么要使用线程池?呢?它能够带来什么样的优势呢?它又是怎么实现的呢?OK,带着这几个问题,我们来学习一下JAVA中的线程池技术。
磊叔的技术博客
2025/06/07
1170
【初识】-JUC·ThreadPoolExecutor 线程池
相关推荐
ThreadPoolExecutor源码分析
更多 >
交个朋友
加入腾讯云技术交流站
洞悉AI新动向 Get大咖技术交流群
加入HAI高性能应用服务器交流群
探索HAI应用新境界 共享实践心得
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验