为了更好的控制多线程,JDK提供了一套线程框架Executor来帮助程序员有效的进行线程控制。Java.util.concurrent 包是专为 Java并发编程而设计的包,它下有很多编写好的工具:
脑图地址,感谢深入浅出 Java Concurrency ,此脑图在这篇基础上修改而来。其中有一个比较重要的线程工厂类:Executors。 Executors工厂会提供常用四类线程池的创建。
以前当我们每次执行一个任务时用new Thread,频繁创建对象会导致系统性能差,线程缺乏统一管理,可能无限制新建线程,相互之间竞争导致系统耗尽,并且缺乏定时任务,中断等功能。线程池可以有效的提高系统资源的使用率,同时避免过多资源竞争,重用存在的线程,减少对象创建。Java通过Executors创建不同功能的线程池,若Executors无法满足需求,我们也可以创建自定义的线程池。文章分为以下部分讲解:
1.newFixedThreadPool()方法
2. newSingleThreadExecutor()方法
3.newCachedThreadPool()方法
4.newScheduledThreadPool()方法
5.自定义线程池
在讲述之前,因为上面5条均会用到ThreadPoolExecutor这个类,所以我们先来看看ThreadPoolExecutor中线程执行任务的示意图,它的执行任务分两种情况:
1).Execute()方法会创建一个线程然后执行一个任务。
2).这个线程在执行完1之后,会反复从BlockingQueue队列中获取任务来执行。如果图中所示三个线程同时间在执行任务,还有任务进来则会放入BlockingQueue队列中暂缓起来等待线程空闲去执行。再者,这3个线程正在使用,队列也满了的话(有界队列的情况),还有任务进来,则会实行拒绝策略。(take()和poll()都是取头元素节点,区别在于前者会删除元素,后者不会)
创建一个固定数量的线程池,里面的线程数始终不变,当有一个线程提交时,若线程池中有空闲的线程,则立即执行。若没有,则会暂缓在一个阻塞队列LinkedBlockingQueue中等待有空闲的线程去执行。newFixedThreadPool()方法的源码如下(LinkedBlockingQueue的详解可以看博主的上一篇文章:【JDK并发包基础】并发容器详解):
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, //核心线程数
nThreads,//最大线程数
0L, //空闲时保持线程活着的时间
TimeUnit.MILLISECONDS,//上述时间的单位
new LinkedBlockingQueue<Runnable>());//线程池没空闲,则新任务放在这个队列里
}
现在我们思考一下:假如有Thread1、Thread2、Thread3、Thread4四条线程分别统计C、D、E、F四个盘的大小,所有线程都统计完毕交给Thread5线程去做汇总,应当如何实现?
第一种方式是用join()来做,不推荐:
推荐使用线程池的方式:
public static void main(String[] args) throws InterruptedException {
//用CountDownLatch实现,CountDownLatch传入4相当于一个计时器,一个await需要4次countDown才能唤醒
final CountDownLatch countDownLatch= new CountDownLatch(4);
Runnable run1= new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
System.out.println("统计C盘");
countDownLatch.countDown();//单任务,把计数器减1
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
Runnable run2= new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
System.out.println("统计D盘");
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
Runnable run3= new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
System.out.println("统计E盘");
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
Runnable run4= new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
System.out.println("统计F盘");
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//创建固定线程的线程池
ExecutorService service= Executors.newFixedThreadPool(4);
service.submit(run1);
service.submit(run2);
service.submit(run3);
service.submit(run4);
// new Thread(run1).start();
// new Thread(run2).start();
// new Thread(run3).start();
// new Thread(run4).start();
countDownLatch.await();//主线程,即第5线程等待
System.out.println("合计C,D,E,F");
service.shutdown();
}
运行结果如下,统计前四个盘大小可以没有顺序,但合计始终在最后:
创建只有一个线程的线程池,若线程池中有空闲的线程,则立即执行。若没有,则会暂缓在一个阻塞队列LinkedBlockingQueue中等待有空闲的线程去执行,它保证所有任务按照提交顺序执行。我们来看看newSingleThreadExecutor方法的源码:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService//先不用关注这个
(new ThreadPoolExecutor(1, //核心线程数
1,//最大线程数
0L,//空闲时保持线程活着的时间
TimeUnit.MILLISECONDS,//上面时间的单位
new LinkedBlockingQueue<Runnable>()));//当线程池没有空闲线程,就放在这个队列里
}
应用场景:这个线程池会在仅有的一个线程发生异常时,重新启动一个线程来替代原来的线程执行下去。
创建一个可根据实际情况调整线程个数的线程池,不限制线程数量。若有任务,则创建线程。若无任务,则不创建线程,并且每一个空闲的线程会在60秒后自动回收。我们来看看源码:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0,//核心线程数,0表示初始化不创建线程
Integer.MAX_VALUE,//int的最大值,表示不限制线程池容量
60L,//缓存线程60秒
TimeUnit.SECONDS,//单位
new SynchronousQueue<Runnable>());
}
源码中的SynchronousQueue这个没有容量的队列一创建,内部就使用take()方法阻塞着,当有一个线程来了直接就执行。
创建一个大小无限的线程池,此线程池支持定时以及周期性执行任务的需求。它的源码如下:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor//注意这里继承了ThreadPoolExecutor
implements ScheduledExecutorService {
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize,//核心线程数,传入
Integer.MAX_VALUE,//int的最大值,表示不限制线程池容量
0, //表示没有延迟
TimeUnit.NANOSECONDS,//单位是纳秒
new DelayedWorkQueue());
}
}
源码中的DelayedWorkQueue是带有延迟时间的一个队列,其中元素只有当指定时间到了,才能够从队列中获取元素,可以做定时的功能。
创建一个任务,等3秒初始化后每隔1秒打印一句话:
public class ScheduledThread {
public static void main(String args[]) throws Exception {
Temp command = new Temp();
//创建一个实现定时器的线程池
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
//command表示具体的任务对象,第一个数字表示初始化的时间,第二个数字表示轮询的时间
ScheduledFuture<?> scheduleTask = scheduler.scheduleWithFixedDelay(command, 3, 1, TimeUnit.SECONDS);
}
}
class Temp extends Thread {
public void run() {
System.out.println("run");
}
}
这个类似于Java的Timer定时器,但项目中用Quartz,跟Spring整合的话,最好用@Scheduled注解。ref:Spring Schedule 任务调度实现
在上述Executors工厂类创建线程池时,它的创建线程方法内部实现均用了ThreadPoolExecutor这个类,ThreadPoolExecutor可以实现自定义线程池,它的构造方法如下:
public ThreadPoolExecutor(int corePoolSize,//核心线程数
int maximumPoolSize,//最大线程数
long keepAliveTime,//线程保持多久
TimeUnit unit,//单位
BlockingQueue<Runnable> workQueue,//线程池功能
ThreadFactory threadFactory,//先不关注这个
RejectedExecutionHandler handler)//拒绝策略,比如超过最大线程数了,可以告诉客户服务器繁忙
{...}
这个构造方法对于BlockingQueue队列是什么类型比较关键,它关乎这个自定义线程池的功能。
1.使用有界队列ArrayBlockingQueue时,实际线程数小于corePoolSize时,则创建线程。若大于corePoolSize时,则任务会加入BlockingQueue队列中,若队列已满,则在实际线程总数不大于maximumPoolSize时,创建新线程。若还大于maximumPoolSize,则执行拒绝策略,或者自定义的其他方式。
2.使用无界队列LinkedBlockingQueue时,缓冲队列,当实际线程超过corePoolSize核心线程数后放置等待的线程,最后等系统空闲了在这个队列里取,maximumPoolSize参数在这里就没有作用了。因为它是无界队列,所以除非系统资源耗尽,否则不会出现任务入队失败的情况。比如创建任务的速度和处理速度差异很大,无界队列会保持快速增长,直到系统内存耗尽。
有界队列和无界队列实例如下:
public class ThreadPoolExecutorDemo implements Runnable{
private static AtomicInteger count = new AtomicInteger(0);
@Override
public void run() {
try {
int temp = count.incrementAndGet();
System.out.println("任务" + temp);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception{
BlockingQueue<Runnable> queue =
new LinkedBlockingQueue<Runnable>();
//new ArrayBlockingQueue<Runnable>(10);
ExecutorService executor = new ThreadPoolExecutor(
5, //corePoolSize
10, //使用无界队列LinkedBlockingQueue时,maximumPoolSize这个参数值不起作用
120L, //2分钟
TimeUnit.SECONDS,
queue);
for(int i = 0 ; i < 15; i++){//提交15个任务
executor.execute(new ThreadPoolExecutorDemo());
}
Thread.sleep(1000);
System.out.println("queue size:" + queue.size());
executor.shutdown();
}
}
用LinkedBlockingQueue无界队列执行后结果是每过一段时间5个任务一执行:
对于拒绝策略,即当任务数量超过了系统实际承载能力时该如何处理呢?JDK提供了几种实现策略:
AbortPolicy:直接抛出异常来阻止系统正常工作。
CallerRunsPolicy:只要线程池未关闭,会把丢弃的任务先执行。
DiscardOledestPolicy:丢弃最老的一个请求,尝试再次提交当前任务
DiscardPolicy:丢弃无法处理的任务,不给于任何处理。
这四种策略个人觉得都不太好,我们可以实现一个自定义策略,在这里实现RejectedExecutionHandler接口就好了:
public class MyThreadPoolExecutor {
public static void main(String[] args) {
ThreadPoolExecutor pool = new ThreadPoolExecutor(
1, //coreSize
2, //MaxSize
60, //60
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(3) //指定一种队列 (有界队列)
//new LinkedBlockingQueue<Runnable>()
, new MyRejected()
//, new DiscardOldestPolicy()//直接抛出异常
);
MyTask mt1 = new MyTask(1, "任务1");//第一个任务会直接执行
MyTask mt2 = new MyTask(2, "任务2");//第二个任务会放入队列里,等第一个任务执行完以后才执行
MyTask mt3 = new MyTask(3, "任务3");//因为队列里有三个容量,所以任务3也会放入队列里
MyTask mt4 = new MyTask(4, "任务4");//因为队列里有三个容量,所以任务4也会放入队列里
MyTask mt5 = new MyTask(5, "任务5");//假如有5个任务,任务1和5同时执行,任务234放在队列里
MyTask mt6 = new MyTask(6, "任务6");//队列满了,线程池的最大线程数也超过了,则会实行拒绝策略
pool.execute(mt1);
pool.execute(mt2);
pool.execute(mt3);
pool.execute(mt4);
pool.execute(mt5);
pool.execute(mt6);
pool.shutdown();
}
}
class MyRejected implements RejectedExecutionHandler{
@Override
//传入当前任务对象和当前线程池对象
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//1.可以做一些处理,比如用http再创建请求给传数据的客户端,让它重新发送任务。高峰期的时候,系统已经超负荷了,不建议再发送请求
//2.只是记录日志:id及相关重要的信息,暂缓到磁盘上,在不是高峰期的时候跑一些定时的job解析日志,把没处理的任务再处理一遍或者批处理下,一般用这个
System.out.println("自定义处理..");
System.out.println("当前被拒绝任务为:" + r.toString());
}
}
class MyTask implements Runnable {
private int taskId;
private String taskName;
public MyTask(int taskId, String taskName){this.taskId = taskId;this.taskName = taskName;}
public int getTaskId() {return taskId;}
public void setTaskId(int taskId) {this.taskId = taskId;}
public String getTaskName() {return taskName;}
public void setTaskName(String taskName) {this.taskName = taskName;}
@Override
public void run() {
try {
System.out.println("run taskId =" + this.taskId);
Thread.sleep(3000);
//System.out.println("end taskId =" + this.taskId);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public String toString(){
return Integer.toString(this.taskId);
}
}
运行结果如下:
到这里已经介绍完了Java并发包下的线程池,博主是个普通的程序猿,水平有限,文章难免有错误,欢迎牺牲自己宝贵时间的读者,就本文内容直抒己见。
系列:
【JDK并发包基础】线程池详解
【JDK并发包基础】并发容器详解
【JDK并发包基础】工具类详解
【JDK并发基础】Java内存模型详解