上篇文章 13分钟聊聊并发包中常用同步组件并手写一个自定义同步组件 聊到并发包中常用的同步组件,并且还手把手实现了自定义的同步组件
本篇文章来聊聊并发包下的另一个核心-线程池
阅读本文大概12分钟
通读本篇文章前先来看看几个问题,看看你是否以及理解线程池
线程的创建、销毁都会带来一定的开销
如果当我们需要使用到多线程时再去创建,使用完又去销毁,这样去使用不仅会拉长业务流程,还会增加创建、销毁线程的开销
于是有了池化技术的思想,将线程提前创建出来,放在一个池子(容器)中进行管理
当需要使用时,从池子里拿取一个线程来执行任务,执行完毕后再放回池子
不仅是线程有池化的思想,连接也有池化的思想,也就是连接池
池化技术不仅能复用资源、提高响应,还方便管理
Executor框架是什么?
可以暂时把Executor看成线程池的抽象,它定义如何去执行任务
public interface Executor {
void execute(Runnable command);
}
Executor
将工作任务与线程池进行分离解耦
工作任务被分为两种:无返回结果的**Runnable
**和有返回结果的**Callable
**
在线程池中允许执行这两种任务,其中它们都是函数式接口,可以使用lambda表达式来实现
有的同学可能会有疑问,上文Executor
框架定义的执行方法不是只允许传入Runnable
任务吗?
那Callable
任务调用哪个方法来执行呢?
Future
接口用来定义获取异步任务的结果,它的实现类常是FutureTask
FutureTask
实现Runnable
的同时,还用字段存储Callable
,在其实现Runnable
时实际上会去执行Callable
任务
线程池在执行**Callable
**任务时,会将使用**FutureTask
**将其封装成**Runnable
**执行(具体源码我们后面再聊),因此Executor
的执行方法入参只有Runnable
FutureTask
****相当于适配器,将****Callable
****转换为****Runnable
******再进行执行**
Executor 定义线程池,而它的重要实现是ThreadPoolExecutor
在ThreadPoolExecutor
的基础上,还有个做定时的线程池ScheduledThreadPoolExecutor
ThreadPoolExecutor
主要有七个重要的参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
拒绝策略 | 作用 |
---|---|
AbortPolicy 默认 | 抛出异常 |
CallerRunsPolicy | 调用线程来执行任务 |
DiscardPolicy | 不处理,丢弃 |
DiscardOldestPolicy | 丢弃队列中最近一个任务,并立即执行当前任务 |
线程池中除了构造时的核心参数外,还使用内部类**Worker
**来封装线程和任务,并使用HashSet容器**workes
**工作队列存储工作线程worker
由于篇幅原因,阻塞队列本篇文章不过多叙述,具体内容在另一篇文章:10分钟从实现和使用场景聊聊并发包下的阻塞队列
为了清晰的理解线程池实现原理,我们先用流程图和总结概述原理,最后来看源码实现
注意: 核心线程完全启动后,即使核心线程空闲,也会先将任务加入队列,核心线程再从队列取任务进行消费
如果生产任务速度大于消费任务速度,那么就会触发后续的使用最大线程/拒绝策略
线程池有两种提交方式execute和submit,其中submit会封装成RunnableFuture最终都来执行execute
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
execute
中实现线程池的整个运行流程
public void execute(Runnable command) {
//任务为空直接抛出空指针异常
if (command == null)
throw new NullPointerException();
//ctl是一个整型原子状态,包含workerCount工作线程数量 和 runState是否运行两个状态
int c = ctl.get();
//1.如果工作线程数 小于 核心线程数 addWorker创建工作线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.工作线程数 大于等于 核心线程数时
// 如果 正在运行 尝试将 任务加入队列
if (isRunning(c) && workQueue.offer(command)) {
//任务加入队列成功 检查是否运行
int recheck = ctl.get();
//不在运行 并且 删除任务成功 执行拒绝策略 否则查看工作线程为0就创建线程
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3.任务加入队列失败,尝试去创建非核心线程,成功则结束
else if (!addWorker(command, false))
// 4.失败则执行拒绝策略
reject(command);
}
addWorker
用于创建线程加入工作队列并执行任务
第二个参数用来判断是不是创建核心线程,当创建核心线程时为true,创建非核心线程时为false
private boolean addWorker(Runnable firstTask, boolean core) {
//方便跳出双层循环
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 检查状态
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//工作线程数已满 返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS自增工作线程数量 成功跳出双重循环
if (compareAndIncrementWorkerCount(c))
break retry;
//CAS失败 重新读取状态 内循环
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//来到这里说明已经自增工作线程数量 准备创建线程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//创建worker 通过线程工厂创建线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//添加线程
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
//标记线程添加完
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//执行线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
addWorker
******中会CAS自增工作线程数量,创建线程再加锁,将线程加入工作队列workes(hashset),解锁后开启该线程去执行任务**
worker中实现Runnable
的是runWorker
方法,在启动线程后会不停的执行任务,任务执行完就去获取任务执行
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//循环执行任务 getTask获取任务
while (task != null || (task = getTask获取任务()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//执行前 钩子方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//执行后钩子方法
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
在执行前后预留两个钩子空方法,留给子类来扩展,后文处理线程池异常也会用到
线程池中是不是越多线程就越好呢?
首先,我们要明白创建线程是有开销的,程序计数器、虚拟机栈、本地方法栈都是线程私有的空间
并且线程在申请空间时,是通过CAS申请年轻代的Eden区中一块内存(因为可能存在多线程同时申请所以要CAS)
线程太多可能导致Eden空间被使用太多导致young gc,并且线程上下文切换也需要开销
因此,线程池中线程不是越多越好,行业内分为两种大概方案
针对CPU密集型,线程池设置最大线程数量为CPU核心数量+1,避免上下文切换,提高吞吐量,多留一个线程兜底
针对IO密集型,线程池设置最大线程数量为2倍CPU核心数量,由于IO需要等待,为了避免CPU空闲就多一些线程
具体业务场景需要具体分析,然后加上大量测试才能得到最合理的配置
Executor框架通过静态工厂方法提供几种线程池,比如:Executors.newSingleThreadExecutor()
、Executors.newFixedThreadPool()
、Executors.newCachedThreadPool()
但由于业务场景的不同,最好还是自定义线程池;当理解线程池参数和实现原理后,查看它们的源码并不难,我们不过多叙述
线程池中如果出现异常会怎么样?
当我们使用Runnable
任务时,出现异常会直接抛出
threadPool.execute(() -> {
int i = 1;
int j = 0;
System.out.println(i / j);
});
面对这种情况,我们可以在Runnable任务中使用try-catch进行捕获
threadPool.execute(() -> {
try {
int i = 1;
int j = 0;
System.out.println(i / j);
} catch (Exception e) {
System.out.println(e);
}
});
实际操作的话用日志记录哈,不要打印到控制台
当我们使用Callable
任务时,使用submit方法会获取Future
Future<Integer> future = threadPool.submit(() -> {
int i = 1;
int j = 0;
return i / j;
});
如果不使用Future.get()
去获取返回值,那么异常就不会抛出,这是比较危险的
为什么会出现这样的情况呢?
前文说过执行submit时会将Callable
封装成FutureTask
执行
在其实现Runnable中,在执行Callable任务时,如果出现异常会封装在FutureTask中
public void run() {
//...其他略
try {
//执行call任务
result = c.call();
ran = true;
} catch (Throwable ex) {
//出现异常 封装到FutureTask
result = null;
ran = false;
setException(ex);
}
//..
}
等到执行get时,先阻塞、直到完成任务再来判断状态,如果状态不正常则抛出封装的异常
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
因此在处理Callable
任务时,可以对任务进行捕获也可以对get进行捕获
//捕获任务
Future<?> f = threadPool.submit(() -> {
try {
int i = 1;
int j = 0;
return i / j;
} catch (Exception e) {
System.out.println(e);
} finally {
return null;
}
});
//捕获get
Future<Integer> future = threadPool.submit(() -> {
int i = 1;
int j = 0;
return i / j;
});
try {
Integer integer = future.get();
} catch (Exception e) {
System.out.println(e);
}
还记得线程池的runWorker
吗?
它在循环中不停的获取阻塞队列中的任务执行,在执行前后预留钩子方法
继承**ThreadPoolExecutor
**来重写执行后的钩子方法,记录执行完是否发生异常,如果有异常则进行日志记录,作一层兜底方案
public class MyThreadPool extends ThreadPoolExecutor {
//...
@Override
protected void afterExecute(Runnable r, Throwable t) {
//Throwable为空 可能是submit提交 如果runnable为future 则捕获get
if (Objects.isNull(t) && r instanceof Future<?>) {
try {
Object res = ((Future<?>) r).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
t = e;
}
}
if (Objects.nonNull(t)) {
System.out.println(Thread.currentThread().getName() + ": " + t.toString());
}
}
}
这样即使使用submit,忘记使用get时,异常也不会“消失”
创建线程时,可以设置未捕获异常**uncaughtException
**方法,当线程出现异常未捕获时调用,也可以打印日志作兜底
我们定义我们自己的线程工厂,以业务组group为单位,创建线程(方便出错排查)并设置uncaughtException
方法
public class MyThreadPoolFactory implements ThreadFactory {
private AtomicInteger threadNumber = new AtomicInteger(1);
private ThreadGroup group;
private String namePrefix = "";
public MyThreadPoolFactory(String group) {
this.group = new ThreadGroup(group);
namePrefix = group + "-thread-pool-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println(t.getName() + ":" + e);
}
});
if (t.isDaemon()) {
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
关闭线程池的2种方法: shutdown(),shutdownNow()
它们的原理都是: 遍历工作队列wokers中的线程,逐个中断(调用线程的**interrupt
**方法) 无法响应中断的任务可能永远无法终止
shutdown 任务会被执行完
shutdownNow 任务不一定会执行完
通常使用shutdown,如果任务不一定要执行完可以使用shutdownNow
ScheduledThreadPoolExecutor
在ThreadPoolExecutor
的基础上提供定时执行的功能
它有两个定时的方法
scheduleAtFixedRate
以任务开始为周期起点,比如说一个任务执行要0.5s,每隔1s执行,相当于执行完任务过0.5s又开始执行任务
scheduledWithFixedDelay
以任务结束为周期起点,比如说一个任务执行要0.5s,每隔1s执行,相当于执行完任务过1s才开始执行任务
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(2);
//scheduleAtFixedRate 固定频率执行任务 周期起点为任务开始
scheduledThreadPoolExecutor.scheduleAtFixedRate(()->{
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("scheduleAtFixedRate 周期起点为任务开始");
//初始延迟:1s 周期:1s
},1,1, TimeUnit.SECONDS);
//scheduledWithFixedDelay 固定延迟执行任务,周期起点为任务结束
scheduledThreadPoolExecutor.scheduleWithFixedDelay(()->{
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("scheduledWithFixedDelay 周期起点为任务结束 ");
//初始延迟:1s 周期:1s
},1,1, TimeUnit.SECONDS);
定时线程池使用延迟队列充当阻塞队列实现的
延迟队列是一个延迟队列,它排序存储定时任务,时间越小越先执行
线程获取任务时,会从延迟队列中获取定时任务,如果时间已到就执行
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
//没有定时任务 等待
if (first == null)
available.await();
else {
//获取延迟时间
long delay = first.getDelay(NANOSECONDS);
//小于等于0 说明超时,拿出来执行
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
//当前线程是leader则等待对应的延迟时间,再进入循环取出任务执行
//不是leader则一直等待,直到被唤醒
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
这两个定时方法一个以任务开始为周期起点、另一个以任务结束为周期起点
获取定时任务的流程是相同的,只是它们构建的定时任务中延迟的时间不同
定时任务使用period
区别,为正数周期起点为任务开始,为负数时周期起点为任务结束
本篇文章围绕线程池,深入浅出的讲解池化技术,Executor,线程池的参数、配置、实现原理、处理异常、关闭等
使用池化技术能够节省频繁创建、关闭的开销,提升响应速度,方便管理,常应用于线程池、连接池等
Executor框架将工作任务与执行(线程池)解耦分离,工作任务分为无返回值的**Runnable
**和有返回值的**Callable
**
Executor实际只处理**Runnable
**任务,会将**Callable
**任务封装成**FutureTask
**适配**Runnable
**执行
线程池使用工作队列来管理线程,线程执行完任务会从阻塞队列取任务执行,当非核心线程空闲一定时间后会被关闭
线程池执行时,如果工作队列线程数量小于核心线程数,则创建线程来执行(相当预热)
如果工作队列线程数量大于核心线程数量,并且阻塞队列未满则放入阻塞队列
如果阻塞队列已满,还未达到最大线程数量则创建非核心线程执行任务
如果已达到最大线程数量则使用拒绝策略
配置参数CPU密集型为CPU核数+1;IO密集型为2倍CPU核数;具体配置需要测试
处理异常可以直接捕获任务,**Callable
**可以捕获get,也可以继承线程池实现**afterExecutor
**记录异常,还可以在创建线程时就设置处理未捕获异常方法
处理定时任务的线程池由延迟队列实现,时间越短的定时任务越先执行,线程会从延迟队列中获取定时任务(时间已到的情况),时间未到就等待
本篇文章被收入专栏 由点到线,由线到面,深入浅出构建Java并发编程知识体系,感兴趣的同学可以持续关注喔
本篇文章笔记以及案例被收入 gitee-StudyJava、 github-StudyJava 感兴趣的同学可以stat下持续关注喔~
案例地址:
Gitee-JavaConcurrentProgramming/src/main/java/D_ThreadPool
Github-JavaConcurrentProgramming/src/main/java/D_ThreadPool
有什么问题可以在评论区交流,如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~
关注菜菜,分享更多干货,公众号:菜菜的后端私房菜
我正在参与2024腾讯技术创作特训营最新征文,快来和我瓜分大奖!
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。