本文试图从三个角度阐述线程池:
(1)为什么要用线程池-why
(2)什么是线程池-what
(3)如何使用线程池-how
为什么要用线程池-why
进程
(1)进程实体:程序段、数据段、进程控制块(PCB)
(2)进程控制块:记录了进程的全部信息,是该进程存在的唯一标志,常驻内存。它包括进程状态,CPU寄存器值等。处理机在运行时,许多信息都放在寄存器中,当处理机被中断时,所有这些信息都必须保存在PCB中。
(3)进程的创建:申请空白PCB;为新进程分配资源;初始化PCB;将新进程插入就绪队列。
有了进程为什么有线程?
减少程序在并发执行时付出的时空开销,即上下文切换代价:对进程进行切换时,由于要保留当前进程的CPU环境、设置新选中进程的CPU环境(即切换上下文的PCB信息),因而必须耽误不少的处理机时间。
(1)线程自己几乎不拥有系统资源,可以访问隶属进程的资源;线程的切换仅需保存和设置少量寄存器;
(2)线程之间的通信更方便,同一进程下的线程共享全局变量、静态变量等数据,而进程之间的通信需要以进程间通信的方式(IPC)进行。
多线程的异步执行方式,虽然能够最大限度发挥多核计算机的计算能力,但是如果不加控制,反而会对系统造成负担。线程本身也要占用内存空间,大量的线程会占用内存资源并且可能会导致Outof Memory。即便没有这样的情况,大量的线程回收也会给GC带来很大的压力。
为了避免重复的创建线程,线程池的出现可以让线程进行复用。通俗点讲,当有工作来,就会向线程池拿一个线程,当工作完成后,并不是直接关闭线程,而是将这个线程归还给线程池供其他任务使用。
什么是线程池-what
抽象及框架
先从宏观把握线程池,来看一看线程池的面向对象的抽象方式:

1.Executor
它是"执行者"接口,它是来执行任务的。准确的说,Executor提供了execute()接口来执行已提交的 Runnable 任务的对象。Executor存在的目的是提供一种将任务的"提交"与"运行"分离开来的机制。
2.ExecutorService
ExecutorService继承于Executor。它是"执行者服务"接口,它是为"执行者接口Executor"服务而存在的;准确的说,ExecutorService提供了"将任务提交给执行者的接口(submit)","让执行者执行任务(invokeAll, invokeAny方法)"的接口等等。
3.AbstractExecutorService
AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。AbstractExecutorService存在的目的是为ExecutorService中的函数接口提供了默认实现。
4.ThreadPoolExecutor
ThreadPoolExecutor就是大名鼎鼎的"线程池"。它继承于AbstractExecutorService抽象类。
5.ScheduledExecutorService
ScheduledExecutorService是一个接口,它继承于ExecutorService。它相当于提供了"延时"和"周期执行"功能的ExecutorService。
6.ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor继承于ThreadPoolExecutor,并且实现了ScheduledExecutorService接口。它相当于提供了"延时"和"周期执行"功能的ScheduledExecutorService。
ScheduledThreadPoolExecutor类似于Timer,但是在高并发程序中,ScheduledThreadPoolExecutor的性能要优于Timer。
7.Executors
Executors是个静态工厂类。它通过静态工厂方法返回ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable 等类的对象。
成员变量
// 阻塞队列。
private final BlockingQueue<Runnable> workQueue;
// 互斥锁
private final ReentrantLock mainLock = new ReentrantLock();
// 线程集合。一个Worker对应一个线程。
private final HashSet<Worker> workers = new HashSet<Worker>();
// “终止条件”,与“mainLock”绑定。
private final Condition termination = mainLock.newCondition();
// 线程池中线程数量曾经达到过的最大值。
private int largestPoolSize;
// 已完成任务数量
private long completedTaskCount;
// ThreadFactory对象,用于创建线程。
private volatile ThreadFactory threadFactory;
// 拒绝策略的处理句柄。
private volatile RejectedExecutionHandler handler;
// 保持线程存活时间。
private volatile long keepAliveTime;
// 核心线程是否允许关闭
private volatile boolean allowCoreThreadTimeOut;
// 核心池大小
private volatile int corePoolSize;
// 最大池大小
private volatile int maximumPoolSize;
1.workers
workers是HashSet<Work>类型,即它是一个Worker集合。而一个Worker对应一个线程,也就是说线程池通过workers包含了"一个线程集合"。当Worker对应的线程池启动时,它会执行线程池中的任务;当执行完一个任务后,它会从线程池的阻塞队列中取出一个阻塞的任务来继续运行。
2.workQueue
workQueue是BlockingQueue类型,即它是一个阻塞队列。当线程池中的线程数超过它的容量的时候,线程会进入阻塞队列进行阻塞等待。通过workQueue,线程池实现了阻塞功能。
3.mainLock
mainLock是互斥锁,通过mainLock实现了对线程池的互斥访问。
4.corePoolSize和maximumPoolSize
corePoolSize是"核心池大小",maximumPoolSize是"最大池大小"。它们的作用是调整"线程池中实际运行的线程的数量"。
例如,当新任务提交给线程池时(通过execute方法)。
(1)如果此时,线程池中运行的线程数量< corePoolSize,则创建新线程来处理请求。
(2)如果此时,线程池中运行的线程数量> corePoolSize,但是却< maximumPoolSize;则仅当阻塞队列满时才创建新线程。
(3)如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务。在大多数情况下,核心池大小和最大池大小的值是在创建线程池设置的;但是,也可以使用 setCorePoolSize(int)和setMaximumPoolSize(int) 进行动态更改。
5.poolSize
poolSize是当前线程池的实际大小,即线程池中任务的数量。
6.allowCoreThreadTimeOut和keepAliveTime
allowCoreThreadTimeOut表示是否允许"核心线程在空闲状态时,仍然能够存活";而keepAliveTime是当线程池处于空闲状态的时候,超过keepAliveTime时间之后,空闲的线程会被终止。如果allowCoreThreadTimeOut为true,那么核心线程也会超时关闭。
7.threadFactory
threadFactory是ThreadFactory对象。它是一个线程工厂类,"线程池通过ThreadFactory创建线程"。
8.handler
handler是RejectedExecutionHandler类型。它是"线程池拒绝策略"的句柄,也就是说"当某任务添加到线程池中,而线程池拒绝该任务时,线程池会通过handler进行相应的处理"。
逻辑图:

创建线程池
下面以newFixedThreadPool()介绍线程池的创建过程。
1.newFixedThreadPool()
newFixedThreadPool()在Executors.java中定义,源码如下:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}newFixedThreadPool(int nThreads)的作用是创建一个线程池,线程池的容量是nThreads。
newFixedThreadPool()在调用ThreadPoolExecutor()时,会传递一个LinkedBlockingQueue()对象,而LinkedBlockingQueue是单向链表实现的阻塞队列。在线程池中,就是通过该阻塞队列来实现"当线程池中任务数量超过允许的任务数量时,部分任务会阻塞等待"。
2.ThreadPoolExecutor()
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}该函数实际上是调用ThreadPoolExecutor的另外一个构造函数。
3.ThreadFactory
线程池中的ThreadFactory是一个线程工厂,线程池创建线程都是通过线程工厂对象(threadFactory)来完成的。threadFactory对象,是通过Executors.defaultThreadFactory()返回的。
4.RejectedExecutionHandler
handler是ThreadPoolExecutor中拒绝策略的处理句柄。所谓拒绝策略,是指将任务添加到线程池中时,线程池拒绝该任务所采取的相应策略。defaultHandler的定义如下:
private static finalRejectedExecutionHandler defaultHandler = new AbortPolicy()添加任务
1.execute()
execute()定义在ThreadPoolExecutor.java。execute()的作用是将任务添加到线程池中执行。它会分为3种情况进行处理:
(1)如果"线程池中任务数量" < "核心池大小"时,即线程池中少于corePoolSize个任务;此时就新建一个线程,并将该任务添加到线程中进行执行。
(2)如果"线程池中任务数量" >= "核心池大小",并且"线程池是允许状态";此时,则将任务添加到阻塞队列中阻塞等待。在该情况下,会再次确认"线程池的状态",如果"第2次读到的线程池状态"和"第1次读到的线程池状态"不同,则从阻塞队列中删除该任务。
(3)非以上两种情况。在这种情况下,尝试新建一个线程,并将该任务添加到线程中进行执行。如果执行失败,则通过reject()拒绝该任务。
2.addWorker()
addWorker(Runnable firstTask, boolean core) 的作用是将任务(firstTask)添加到线程池中,并启动该任务。
core为true的话,则以corePoolSize为界限,若"线程池中已有任务数量>=corePoolSize",则返回false;core为false的话,则以maximumPoolSize为界限,若"线程池中已有任务数量>=maximumPoolSize",则返回false。
addWorker()会先通过for循环不断尝试更新ctl状态,ctl记录了"线程池中任务数量和线程池状态"。
更新成功之后,再通过try模块来将任务添加到线程池中,并启动任务所在的线程。
3.submit()
submit()实际上也是通过调用execute()实现的
关闭线程池
ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:
shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务。
shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务。
状态:线程池状态
线程池的5种状态是:Running, SHUTDOWN,STOP, TIDYING, TERMINATED

1.RUNNING
(1)状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。
(2)状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态!
2.SHUTDOWN
(1)状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
(2)状态切换:调用线程池的shutdown()接口时,线程池由RUNNING ->SHUTDOWN。
3.STOP
(1)状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
(2)状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN)
4.TIDYING
(1)状态说明:当所有的任务已终止,ctl记录的"任务数量"为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
(2)状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN ->TIDYING。
当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。
5.TERMINATED
(1)状态说明:线程池彻底终止,就变成TERMINATED状态。
(2)状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING ->TERMINATED。
类型:任务队列
workQueue的类型为BlockingQueue<Runnable>,通常可以取下面三种类型:
(1)有界任务队列ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;
(2)无界任务队列LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
(3)直接提交队列SynchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
类型:线程池
由Executors线程池工具类提供。
(1)newFixedThreadPool
固定大小的线程池,可以指定线程池的大小,该线程池corePoolSize和maximumPoolSize相等,阻塞队列使用的是LinkedBlockingQueue,大小为整数最大值。该线程池中的线程数量始终不变,当有新任务提交时,线程池中有空闲线程则会立即执行,如果没有,则会暂存到阻塞队列。对于固定大小的线程池,不存在线程数量的变化。同时使用无界的LinkedBlockingQueue来存放执行的任务。当任务提交十分频繁的时候,LinkedBlockingQueue迅速增大,存在着耗尽系统资源的问题。而且在线程池空闲时,即线程池中没有可运行任务时,它也不会释放工作线程,还会占用一定的系统资源,需要shutdown。
public static ExecutorService newFixedThreadPool(int var0) {
return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}
public static ExecutorService newFixedThreadPool(int var0, ThreadFactory var1) {
return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var1);
}(2)newSingleThreadExecutor
单个线程线程池,只有一个线程的线程池,阻塞队列使用的是LinkedBlockingQueue,若有多余的任务提交到线程池中,则会被暂存到阻塞队列,待空闲时再去执行。按照先入先出的顺序执行任务。
public static ExecutorService newSingleThreadExecutor() {
return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory var0) {
return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var0));
}(3)newCachedThreadPool
缓存线程池,缓存的线程默认存活60秒。线程的核心池corePoolSize大小为0,核心池最大为Integer.MAX_VALUE,阻塞队列使用的是SynchronousQueue。是一个直接提交的阻塞队列,他总会迫使线程池增加新的线程去执行新的任务。在没有任务执行时,当线程的空闲时间超过keepAliveTime(60秒),则工作线程将会终止被回收,当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销。如果同时又大量任务被提交,而且任务执行的时间不是特别快,那么线程池便会新增出等量的线程池处理任务,这很可能会很快耗尽系统的资源。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue());
}
public static ExecutorService newCachedThreadPool(ThreadFactory var0) {
return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue(), var0);
}(4)newScheduledThreadPool
定时线程池,该线程池可用于周期性地去执行任务,通常用于周期性的同步数据。scheduleAtFixedRate:是以固定的频率去执行任务,周期是指每次执行任务成功执行之间的间隔。schedultWithFixedDelay:是以固定的延时去执行任务,延时是指上一次执行成功之后和下一次开始执行的之前的时间。
通过构造方法直接初始化线程池实例:
[1] ThreadPoolExecutor:
(int, int,long, TimeUnit, BlockingQueue<Runnable>)[2] ThreadPoolExecutor:
(int, int,long, TimeUnit, BlockingQueue<Runnable>, ThreadFactory)[3] ThreadPoolExecutor:
(int, int, long, TimeUnit, BlockingQueue<Runnable>, RejectedExecutionHandler)[4] ThreadPoolExecutor:
(int, int, long, TimeUnit, BlockingQueue<Runnable>, ThreadFactory,RejectedExecutionHandler)参数说明:
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable>workQueue,
ThreadFactory,
RejectedExecutionHandler
类型:拒绝策略
线程池的拒绝策略,是指当任务添加到线程池中被拒绝,而采取的处理措施。
当任务添加到线程池中之所以被拒绝,可能是由于:第一,线程池异常关闭。第二,任务数量超过线程池的最大限制。
线程池共包括4种拒绝策略,它们分别是:AbortPolicy,CallerRunsPolicy, DiscardOldestPolicy和DiscardPolicy。
(1)AbortPolicy:丢弃任务并抛出RejectedExecutionException
(2)CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。
(3)DiscardOldestPolicy:丢弃队列中最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。
(4)DiscardPolicy:丢弃任务,不做任何处理。
线程池默认的处理策略是AbortPolicy!
如何使用线程池-how
线程池数量
线程池的大小决定着系统的性能,过大或者过小的线程池数量都无法发挥最优的系统性能。当然线程池的大小也不需要做的太过于精确,只需要避免过大和过小的情况。一般来说,确定线程池的大小需要考虑CPU的数量,内存大小,任务是计算密集型还是IO密集型等因素。
NCPU = CPU的数量
UCPU = 期望对CPU的使用率 0 ≤ UCPU ≤ 1
W/C = 等待时间与计算时间的比率
如果希望处理器达到理想的使用率,那么线程池的最优大小为:
线程池大小=NCPU *UCPU(1+W/C)
在Java中使用int ncpus = Runtime.getRuntime().availableProcessors();获取CPU的数量。
扩展线程池
ThreadPoolExecutor是可以拓展的,它提供了几个可以在子类中改写的方法:beforeExecute,afterExecute和terimated。在执行任务的线程中将调用beforeExecute和afterExecute,这些方法中还可以添加日志,计时,监视或统计收集的功能,还可以用来输出有用的调试信息,帮助系统诊断故障。
正确使用线程池
以下阿里编码规范里面说的一段话:
线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。说明:Executors各个方法的弊端:
(1)newFixedThreadPool和newSingleThreadExecutor:主要问题是堆积请求处理队列可能会耗费非常大的内存,甚至OOM。
(2)newCachedThreadPool和newScheduledThreadPool:主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。
通过ThreadPoolExecutor创建线程池需要注意:
(1)任务独立。如何任务依赖于其他任务,那么可能产生死锁。例如某个任务等待另一个任务的返回值或执行结果,那么除非线程池足够大,否则将发生线程饥饿死锁。
(2)合理配置阻塞时间过长的任务。如果任务阻塞时间过长,那么即使不出现死锁,线程池的性能也会变得很糟糕。在Java并发包里可阻塞方法都同时定义了限时方式和不限时方式。例如Thread.join, BlockingQueue.put,CountDownLatch.await等,如果任务超时,则标识任务失败,然后中止任务或者将任务放回队列以便随后执行,这样,无论任务的最终结果是否成功,这种办法都能够保证任务总能继续执行下去。
(3)设置合理的线程池大小。只需要避免过大或者过小的情况即可,上文的公式线程池大小=NCPU *UCPU(1+W/C)。
(4)选择合适的阻塞队列。newFixedThreadPool和newSingleThreadExecutor都使用了无界的阻塞队列,无界阻塞队列会有消耗很大的内存,如果使用了有界阻塞队列,它会规避内存占用过大的问题,但是当任务填满有界阻塞队列,新的任务该怎么办?在使用有界队列是,需要选择合适的拒绝策略,队列的大小和线程池的大小必须一起调节。对于非常大的或者无界的线程池,可以使用SynchronousQueue来避免任务排队,以直接将任务从生产者提交到工作者线程。
参考:
1.https://www.cnblogs.com/superfj/p/7544971.html