线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,合理的使用线程池对线程进行统一分配、调优和监控,有以下好处:重用存在的线程、可有效控制最大并发线程数
线程池简单用法如下
ExecutorService executorService = new ThreadPoolExecutor(3, 5,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10));
for (int i = 0; i < 10; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("thread id is: " + Thread.currentThread().getId());
}
});
}
executorService.shutdown();
一,ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
corePoolSize:线程池中的核心线程数,核心线程会一直存活,即使没有任务需要执行。
maximumPoolSize: 线程池中允许的最大线程数 核心线程数+临时线程数
keepAliveTime:临时线程空闲时的存活时间
workQueue:保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口。
我们直接看executorService.execute方法。
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
} else {
int c = this.ctl.get();//获取当前线程状态相关值
if (workerCountOf(c) < this.corePoolSize) { //1
if (this.addWorker(command, true)) {
return;
}
c = this.ctl.get();
}
if (isRunning(c) && this.workQueue.offer(command)) { //2
int recheck = this.ctl.get();
if (!isRunning(recheck) && this.remove(command)) {//3
this.reject(command);
} else if (workerCountOf(recheck) == 0) {//4
this.addWorker((Runnable)null, false);
}
} else if (!this.addWorker(command, false)) {//5
this.reject(command);
}
}
}
1.workerCountOf根据ctl的低29位,得到线程池的当前线程数,如果当前线程数小于核心线程数,那就直接添加任务addWork
2.如果上面addWork失败,则继续。先判断当前线程池处于RUNNING状态,同时把提交的任务成功放入阻塞队列workQueue
中,添加失败offer返回false,否则执行reject方法处理任务。
3.再次判断判断当前线程池状态,如果线程池没有RUNNING,且成功从阻塞队列中删除任务,则执行reject方法处理任务;
4.如果当前线程数等于0的极端情况下,可能出现一个任务刚被插入队列的同时,所有的线程都结束任务然后被销毁了,则添加一个非核心线程。
5.如果workQueue队列已满,尝试创建非核心线程处理任务.
这里面主要有两个行为,一个就是this.workQueue.offer(command) 添加任务队列,另一个就是this.addWorker(command,true)
来看下addWork方法源码
private boolean addWorker(Runnable firstTask, boolean core) {
int c = this.ctl.get();
label247:
//当前线程池不是SHUTDOWN或者不是STOP状态时,且firstTask==null workQueue不是空,则继续进行
while(!runStateAtLeast(c, SHUTDOWN) || !runStateAtLeast(c, STOP) && firstTask == null
&& !this.workQueue.isEmpty()) {
//2检查线程总数是否超过容量。
while(workerCountOf(c) <((core ? this.corePoolSize:this.maximumPoolSize)&COUNT_MASK)) {
if (this.compareAndIncrementWorkerCount(c)) {// 线程数加1
boolean workerStarted = false;
boolean workerAdded = false;
ThreadPoolExecutor.Worker w = null;
try {
//开始创建新的线程,添加任务firstTask
w = new ThreadPoolExecutor.Worker(firstTask);
Thread t = w.thread;
if (t != null) {
//开始加锁 做到了在最小的范围内加锁,尽量减少锁竞争
ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int c = this.ctl.get();
///检查线程状态,只有当线程池处于RUNNING,或者处于SHUTDOWN并且firstTask==null的时候,
if (isRunning(c) || runStateLessThan(c, STOP) && firstTask == null) {
if (t.getState() != State.NEW) {
throw new IllegalThreadStateException();
}
//workers是一个HashSet,添加我们新增的Worker
this.workers.add(w);
workerAdded = true;
//每次增加worker的时候,都会判断当前workers.size()是否大于largestPoolSize,
//如果大于,则将当前最大值赋予largestPoolSize.
int s = this.workers.size();
if (s > this.largestPoolSize) {
//记录workers历史以来的最大值,
this.largestPoolSize = s;
}
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();//开始工作
workerStarted = true;
}
}
} finally {
if (!workerStarted) {
this.addWorkerFailed(w);
}
}
return workerStarted;
}
c = this.ctl.get();
if (runStateAtLeast(c, 0)) {
continue label247;
}
}
return false;
}
return false;
}
关于线程池的状态,有5种,
addwork过程是,检查线程池状态和线程总数是否符合条件,符合的话就创建新的Worker,添加任务firstTask,并把Worker
添加到workers hashset中,最后开始启动Worker中的线程t.start(),源码中 this.thread = getThreadFactory().newThread(this);this是当前Runnable,那this.thread里面Runnable就是当前Worker,执行t.start()
就是执行当前Worker中run方法
Worker 继承AbstractQueuedSynchronizer 实现Runnable
public void run() {
ThreadPoolExecutor.this.runWorker(this);
}
final void runWorker(ThreadPoolExecutor.Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;//获取当前线程任务Runnable
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
while(task != null || (task = this.getTask()) != null) {
//当前线程枷锁
w.lock();
//如果(线程池的状态>=STOP或者(线程已中断并且线程状态>=STOP))并且当前线程没有被中断。
// 两种情况:
//1)如果当前线程池的状态是>=Stop的,并且当前线程没有被中断,那么就要执行中断。
//2)或者当前线程目前是已中断的状态并且线程池的状态也是>=Stop的(注意Thread.interrupted是会擦除中断标识符的),
//那么因为中断标识符已经被擦除了,那么!wt.isInterrupted()一定返回true,这个时候还是要将当前线程中断。
//第二次执行runStateAtLeast(ctl.get(), STOP)相当于一个二次检查。
if ((runStateAtLeast(this.ctl.get(), STOP) || Thread.interrupted() &&
runStateAtLeast(this.ctl.get(), STOP)) && !wt.isInterrupted()) {
wt.interrupt();//中断当前线程
}
try {
this.beforeExecute(wt, task);//前置操作,空方法,可以业务自己实现
try {
task.run();//执行run,也就是下面方法
//new Runnable() {
// @Override
// public void run() {
// System.out.println("thread id is: " + Thread.currentThread().getId());
// }
//}
this.afterExecute(task, (Throwable)null);//后置操作,空方法,可以业务自己实现
} catch (Throwable var14) {
this.afterExecute(task, var14);
throw var14;
}
} finally {
task = null;//最后将task置为null
++w.completedTasks;//已完成的任务计数器+1
w.unlock();//释放当前线程的独占锁
}
}
completedAbruptly = false;
} finally {
this.processWorkerExit(w, completedAbruptly);
}
}
runWorker是开启了一个线程,就一直循环执行getTask,知道task==null才结束。
来看下getTask源码
private Runnable getTask() {
boolean timedOut = false;
while(true) {
int c = this.ctl.get();
// 如果当前状态是>=SHOTDOWN状态&&(运行状态是STOP或者队列是空的).
// 1)如果线程池的状态是>=STOP状态,这个时候不再处理队列中的任务,并且减少worker记录数量,
//返回的任务为null,这个时候在runRWorker方法中会执行processWorkerExit进行worker的退出操作.
// 2)如果线程池的状态是>=SHUTDOWN并且workQueue为空,就说明处于SHOTdown以上的状态下,
//且没有任务在等待,那么也属于获取不到任务,getTask返回null.
if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || this.workQueue.isEmpty())) {
this.decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
//如果允许超时切线程数大于核心线程容量,则开启超时机制timed=true
boolean timed = this.allowCoreThreadTimeOut || wc > this.corePoolSize;
//如果当前线程数小于等于最大线程容量 且 不允许超时或者当前没有超时 或者当前线程数小于等于1 且当前任务队列不是空
if (wc <= this.maximumPoolSize && (!timed || !timedOut) || wc <= 1 && !this.workQueue.isEmpty()) {
try {
//判断是否允许超时,允许超时用poll设置超时时间,不允许就使用take依赖超时机制
Runnable r = timed ? (Runnable)this.workQueue.poll(this.keepAliveTime, TimeUnit.NANOSECONDS)
: (Runnable)this.workQueue.take();
if (r != null) {
return r;
}
timedOut = true;
} catch (InterruptedException var6) {
timedOut = false;
}
} else if (this.compareAndDecrementWorkerCount(c)) {
return null;
}
}
}
getTask主要就是从workQueue队列中不断的取Runnable任务。
继续看processWorkerExit方法源码
private void processWorkerExit(ThreadPoolExecutor.Worker w, boolean completedAbruptly) {
if (completedAbruptly) {
//任务完成,将工作线程数量-1
this.decrementWorkerCount();
}
ReentrantLock mainLock = this.mainLock;
mainLock.lock();
//
try {
this.completedTaskCount += w.completedTasks;
//从worker中移除任务
this.workers.remove(w);
} finally {
mainLock.unlock();
}
//尝试关闭线程池,但如果是正常运行状态,就不会关闭
this.tryTerminate();
int c = this.ctl.get();
if (runStateLessThan(c, 536870912)) {
if (!completedAbruptly) {
//如果允许核心线程超时并且当前队列里面还有任务,那就必须留一个线程.
int min = this.allowCoreThreadTimeOut ? 0 : this.corePoolSize;
if (min == 0 && !this.workQueue.isEmpty()) {
min = 1;
}
if (workerCountOf(c) >= min) {
return;
}
}
this.addWorker((Runnable)null, false);
}
}
1.如何复用线程?
回顾代码,好像也没有直接体现复用线程池的,Worker每次都是new。
我们先看下executorService.execute代码,里面有这样的逻辑,
如果当前线程数小于核心线程数,则执行this.addWorker(command,true)操作,这个方法主要就是创建Worker对象,并且运行Worker里面thread直接start,开启线程,执行firstTask任务。假如核心线程数是4,那就开启4个线程。
如果当前线程大于核心线程数,就走this.workQueue.offer(command)这一步,把任务添加到workQueue队列中。
我们看下runWorker方法,这个就是执行线程任务的主要方法,看下while(task !=null||(task =this.getTask())!=null)这个条件,当我们调用this.addWorker(command,true)方法是,task就是command 不等于null,那就执行内容,执行完成后finally{ task =null;++w.completedTasks;w.unlock();//}task=null ,那任务就应该完成,线程就要销毁。
但是我们还要注意while条件中有个getTask方法,里面就是从阻塞队列中获取Runnable任务,也就是workQueue取Runnable。workQueue有内容,则在当前Worker线程中执行Runnable,没有内容的话,就阻塞。此时线程就不会销毁。
2.如何并发?
线程池是如何做到高效并发的。
看整个线程池的工作流程,有以下几个需要特别关注的并发点.
线程池状态和工作线程数量的变更。这个由一个AtomicInteger变量 ctl来解决原子性问题。
向工作Worker容器workers中添加新的Worker的时候线程池加锁。
执行具体任务的时候,线程枷锁。
工作线程Worker从等待队列中取任务的时候。这个由工作队列本身来保证线程安全,比如LinkedBlockingQueue等。
3.非核心线程什么时候销毁?
boolean timed = this.allowCoreThreadTimeOut || wc > this.corePoolSize;
if (wc <= this.maximumPoolSize && (!timed || !timedOut) || wc <= 1 && !this.workQueue.isEmpty()) {
try {
Runnable r = timed ? (Runnable)this.workQueue.poll(this.keepAliveTime, TimeUnit.NANOSECONDS) : (Runnable)this.workQueue.take();
if (r != null) {
return r;
}
timedOut = true;
} catch (InterruptedException var6) {
timedOut = false;
}
}
看下这段代码,当wc >this.corePoolSize时,应该有产生非核心线程了,timed=true,Runnable r=(Runnable)this.workQueue.poll(this.keepAliveTime, TimeUnit.NANOSECONDS),在队列超过keepAliveTime时长后,返回null,直接跳出 while(task !=null||(task =this.getTask())!=null)代码,走processWorkerExit函数。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。