上篇文章讲了下线程的创建及一些常用的方法,但是在使用的时候,大多数是采用了线程池来管理线程的创建,运行,销毁等过程。本篇将着重讲线程池的基础内容,包括通过线程池创建线程,线程池的基本信息等。
❝本小节所有代码都是在
CreateThreadByPool
类上,该类还有一个内部类MyThread
实现了Runnable
接口。 ❞
首先先把基本的代码给写出来
public class CreateThreadByPool {
public static void main(String[] args) {
}
}
class MyThread implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " processing");
process();
System.out.println(Thread.currentThread().getName() + " end");
}
private void process() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return String.format("MyThread{%s}", Thread.currentThread().getName());
}
}
先来大概回顾一下,当我们想创建10个线程的时候的代码「普通方式」是怎样的
private static void createThreadByNormalWay() {
for (int i = 0; i < 10; i++) {
MyThread myThread = new MyThread();
Thread thread = new Thread(myThread);
thread.start();
}
}
在「能看到的代码」中,是使用了start()
自己直接开启了线程,但是如果用线程池方式来呢
第一种创建线程池的方法是通过Executors
类的静态方法来构建,通过这种方式总共可以创建「4种线程池」。
并且可以发现返回是ExecutorService
,所以还要接受返回值,最后通过execute
来启动线程
private static void createThreadByPool() {
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
MyThread myThread = new MyThread();
executorService.execute(myThread);
}
}
先不管底层是如何实现的,至少代码上是「把线程交给了线程池」来执行,这样能够保证线程能够统一管理。
简单的比喻就是前者是要你自己去找班长签到,后者是班长统一管理这整个班的签到。在main函数中调用看看普通方法和通过线程池创建的线程有什么区别
threadPool1
可以很明显的看到有以下几点区别
除了使用Executors.newFixedThreadPool()
创建线程池,还可以通过new ThreadPoolExecutor()
,这里可能有的小伙伴会迷糊了,怎么上面返回的类是ExecutorService
,现在返回的又是ThreadPoolExecutor
,其实两者是同一个东西。
可以看到ThreadExecutorPool
是继承了 AbstractExecutorService
,而后者是实现了ExecutorService
。通过该方法创建的线程池的代码如下
❝可以先这样运行体验下,至于说构造函数里面不同参数的含义,在后面的篇幅中会说到,到时候再返回来看即可。 ❞
private static void createThreadByThreadPoolExecutor() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5,5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
for (int i = 0; i < 10; i++) {
MyThread myThread = new MyThread();
executor.execute(myThread);
}
}
看下运行结果
输出结果没啥好讲的,但是如果细心的小伙伴在上一个gif就会发现,通过线程池来启动线程的方式,程序并没有退出,会一直运行。这是因为我们没有shutdown
线程池。
回过头来看看Executors.静态方法
这种方法来创建线程池的源码
可以看到其实更深一层还是使用了new ThreadPoolExecutor()
,只不过我们自己能定制的构造函数的参数变得极其少,这时候肯定有小伙伴疑问了,「那为什么不直接都用」new ThreadPoolExecutor()
「呢?」
❝「《阿里java开发手册》 嵩山版明确规定了两点,一是线程资源必须通过线程池提供,不允许自行显式创建线程;二是线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式去创建。」 ❞
着重看第二点强制通过ThreadPoolExecutor的方式来创建线程,原因在下面也有,「来看看FixedThreadPool和SingleThreadPool的源码」
image-20210629161632466
其它的不管,可以看到两者调用构造函数中的队列都是LinkedBlockingQueue
,这个队列是无边界的,所以有了允许请求长度为Integer.MAX_VALUE
,会堆积大量的「请求」 ,从而导致OOM。
「再来看看CachedThreadPool的源码」
注意这里构造函数的第二个参数是线程池最大线程数,它设置成了Integer.MAX_VALUE
,这就可能会创建大量的线程,从而导致OOM。
上面也可以看到,创建线程池最重要也是最应该使用的方法的是new ThreadPoolExecutor()
,接下来把重点放在ThreadPoolExecutor
这个类上面
这个是类中的所有的属性,接下来再看看构造函数
有4种,但是归根结底只有以下这一种构造函数,讲下这些参数的意义,然后大家就可以回头看下上一小节的例子。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//省略实现
}
corePoolSize
:核心线程数,大白话就是能够工作的线程数量maximumPoolSize
:最大线程数,就是这个线程池能容纳线程的数量keepAliveTime
:存活时间,当线程池中的线程数量大于核心线程数的时候,如果时候没有任务提交,核心线程池外的线程不会立即被销毁,而是会等待,直到等待的时间超过了这个字段才会被回收销毁unit
:存活时间的单位workQueue
:工作队列,就是在线程开始被调用前,就是存在这个队列中threadFactory
:线程工厂,执行程序创建新线程时使用的工厂handler
:拒绝策略,当达到线程边界和队列容量而采取的拒绝策略对于这个拒绝策略,简单说下,有四种实现。
❝实现
RejectedExecutionHandler
接口就能实现自己的拒绝策略 ❞
下面就来简单实现一个自己的拒绝策略,并且来看下上述类中属性的信息
首先需要一个「监控线程类」
class MonitorThread implements Runnable {
//注入一个线程池
private ThreadPoolExecutor executor;
public MonitorThread(ThreadPoolExecutor executor) {
this.executor = executor;
}
private boolean monitor = true;
public void stopMonitor() {
monitor = false;
}
@Override
public void run() {
//监控一直运行,每3s输出一次状态
while (monitor) {
//主要逻辑是监控线程池的状态
System.out.println(
String.format("[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s, rejectedExecutionHandler: %s",
this.executor.getPoolSize(),
this.executor.getCorePoolSize(),
this.executor.getActiveCount(),
this.executor.getCompletedTaskCount(),
this.executor.getTaskCount(),
this.executor.isShutdown(),
this.executor.isTerminated(),
this.executor.getRejectedExecutionHandler()));
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
同时实现「自定义的拒绝策略」
❝其实这还是没有对r处理,拒绝了就拒绝了,只是打印出来,但是并没有实质性地处理 ❞
class MyRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("task is rejected");
}
}
接下来就是「public类」TheradPoolInfo
,「注意工作线程采用的是上一小节的MyThread
类」
public class ThreadPoolInfo {
public static void main(String[] args) throws InterruptedException {
//新建了一个线程池,核心线程数是3,最大线程数是5,30s
//队列是ArrayBlockingQueue,并且大小边界是3,拒绝策略自定义输出一句话
ThreadPoolExecutor executor = new ThreadPoolExecutor(3,5, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3), new MyRejectedExecutionHandler());
//开启监控线程
MonitorThread monitorThread = new MonitorThread(executor);
new Thread(monitorThread).start();
//开启工作线程
for (int i = 0; i < 10; i++) {
executor.execute(new MyThread());
}
//关闭线程池和监控线程
Thread.sleep(12000);
executor.shutdown();
Thread.sleep(3000);
monitorThread.stopMonitor();
}
}
「预期结果:」 通过构造函数可以知道,预期是有3个核心线程执行任务,会拒绝2个线程,完成8个任务(最大线程数是5,队列长度是3,具体会在下一篇文章中讲)。
可以看到结果和预期的一样
「创作不易,如果对你有帮助,欢迎点赞,收藏和分享啦!」