前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java之线程池源码浅析

Java之线程池源码浅析

原创
作者头像
笔头
修改2022-03-04 10:26:47
2590
修改2022-03-04 10:26:47
举报
文章被收录于专栏:Android记忆

一、前言

线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,合理的使用线程池对线程进行统一分配、调优和监控,有以下好处:重用存在的线程、可有效控制最大并发线程数

二、怎么做

线程池简单用法如下

代码语言:javascript
复制
ExecutorService executorService = new ThreadPoolExecutor(3, 5,
        60L, TimeUnit.SECONDS,
        new ArrayBlockingQueue<>(10));
for (int i = 0; i < 10; i++) {
    executorService.execute(new Runnable() {
        @Override
        public void run() {
            System.out.println("thread id is: " + Thread.currentThread().getId());
        }
    });
}
executorService.shutdown();

一,ThreadPoolExecutor

代码语言:javascript
复制
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

corePoolSize:线程池中的核心线程数,核心线程会一直存活,即使没有任务需要执行。

maximumPoolSize: 线程池中允许的最大线程数 核心线程数+临时线程数

keepAliveTime:临时线程空闲时的存活时间

workQueue:保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口。

我们直接看executorService.execute方法。

代码语言:javascript
复制
public void execute(Runnable command) {
    if (command == null) {
        throw new NullPointerException();
    } else {
        int c = this.ctl.get();//获取当前线程状态相关值
        if (workerCountOf(c) < this.corePoolSize) {  //1
            if (this.addWorker(command, true)) {
                return;
            }

            c = this.ctl.get();
        }

        if (isRunning(c) && this.workQueue.offer(command)) { //2
            int recheck = this.ctl.get();
            if (!isRunning(recheck) && this.remove(command)) {//3
                this.reject(command);
            } else if (workerCountOf(recheck) == 0) {//4
                this.addWorker((Runnable)null, false);
            }
        } else if (!this.addWorker(command, false)) {//5
            this.reject(command);
        }

    }
}

1.workerCountOf根据ctl的低29位,得到线程池的当前线程数,如果当前线程数小于核心线程数,那就直接添加任务addWork

2.如果上面addWork失败,则继续。先判断当前线程池处于RUNNING状态,同时把提交的任务成功放入阻塞队列workQueue

中,添加失败offer返回false,否则执行reject方法处理任务。

3.再次判断判断当前线程池状态,如果线程池没有RUNNING,且成功从阻塞队列中删除任务,则执行reject方法处理任务;

4.如果当前线程数等于0的极端情况下,可能出现一个任务刚被插入队列的同时,所有的线程都结束任务然后被销毁了,则添加一个非核心线程。

5.如果workQueue队列已满,尝试创建非核心线程处理任务.

这里面主要有两个行为,一个就是this.workQueue.offer(command) 添加任务队列,另一个就是this.addWorker(command,true)

来看下addWork方法源码

代码语言:javascript
复制
private boolean addWorker(Runnable firstTask, boolean core) {
    int c = this.ctl.get();

    label247:
    //当前线程池不是SHUTDOWN或者不是STOP状态时,且firstTask==null workQueue不是空,则继续进行
    while(!runStateAtLeast(c, SHUTDOWN) || !runStateAtLeast(c, STOP) && firstTask == null 
    && !this.workQueue.isEmpty()) { 
        //2检查线程总数是否超过容量。
        while(workerCountOf(c) <((core ? this.corePoolSize:this.maximumPoolSize)&COUNT_MASK)) {
            if (this.compareAndIncrementWorkerCount(c)) {// 线程数加1
                boolean workerStarted = false;
                boolean workerAdded = false;
                ThreadPoolExecutor.Worker w = null;

                try {
                    //开始创建新的线程,添加任务firstTask
                    w = new ThreadPoolExecutor.Worker(firstTask);
                    Thread t = w.thread;
                    if (t != null) {
                        //开始加锁 做到了在最小的范围内加锁,尽量减少锁竞争
                        ReentrantLock mainLock = this.mainLock;
                        mainLock.lock();

                        try {
                            int c = this.ctl.get();
                            ///检查线程状态,只有当线程池处于RUNNING,或者处于SHUTDOWN并且firstTask==null的时候,
                            
                            if (isRunning(c) || runStateLessThan(c, STOP) && firstTask == null) {
                                if (t.getState() != State.NEW) {
                                    throw new IllegalThreadStateException();
                                }
                                //workers是一个HashSet,添加我们新增的Worker
                                this.workers.add(w);
                                workerAdded = true;
                                //每次增加worker的时候,都会判断当前workers.size()是否大于largestPoolSize,
                                //如果大于,则将当前最大值赋予largestPoolSize.
                                int s = this.workers.size();
                                if (s > this.largestPoolSize) {
                                    //记录workers历史以来的最大值,
                                    this.largestPoolSize = s;
                                }
                            }
                        } finally {
                            mainLock.unlock();
                        }

                        if (workerAdded) {
                            t.start();//开始工作
                            workerStarted = true;
                        }
                    }
                } finally {
                    if (!workerStarted) {
                        this.addWorkerFailed(w);
                    }

                }

                return workerStarted;
            }

            c = this.ctl.get();
            if (runStateAtLeast(c, 0)) {
                continue label247;
            }
        }

        return false;
    }

    return false;
}

关于线程池的状态,有5种,

  • RUNNING: 接收新的任务,并能继续处理 workQueue 中的任务
  • SHUTDOWN: 不再接收新的任务,不过能继续处理 workQueue 中的任务
  • STOP: 不再接收新的任务,也不再处理 workQueue 中的任务,并且会中断正在处理任务的线程
  • TIDYING: 所有的任务都完结了,并且线程数量(workCount)为 0 时即为此状态,进入此状态后会调用 terminated() 这个钩子方法进入 TERMINATED 状态
  • TERMINATED: 调用 terminated() 方法后即为此状态

addwork过程是,检查线程池状态和线程总数是否符合条件,符合的话就创建新的Worker,添加任务firstTask,并把Worker

添加到workers hashset中,最后开始启动Worker中的线程t.start(),源码中 this.thread = getThreadFactory().newThread(this);this是当前Runnable,那this.thread里面Runnable就是当前Worker,执行t.start()

就是执行当前Worker中run方法

Worker 继承AbstractQueuedSynchronizer 实现Runnable

代码语言:javascript
复制
public void run() {
    ThreadPoolExecutor.this.runWorker(this);
}

代码语言:javascript
复制
final void runWorker(ThreadPoolExecutor.Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;//获取当前线程任务Runnable
    w.firstTask = null;
    w.unlock();
    boolean completedAbruptly = true;

    try {
        while(task != null || (task = this.getTask()) != null) {
            //当前线程枷锁
            w.lock();
          //如果(线程池的状态>=STOP或者(线程已中断并且线程状态>=STOP))并且当前线程没有被中断。
          // 两种情况:
          //1)如果当前线程池的状态是>=Stop的,并且当前线程没有被中断,那么就要执行中断。
          //2)或者当前线程目前是已中断的状态并且线程池的状态也是>=Stop的(注意Thread.interrupted是会擦除中断标识符的),
          //那么因为中断标识符已经被擦除了,那么!wt.isInterrupted()一定返回true,这个时候还是要将当前线程中断。
          //第二次执行runStateAtLeast(ctl.get(), STOP)相当于一个二次检查。
            if ((runStateAtLeast(this.ctl.get(), STOP) || Thread.interrupted() && 
            runStateAtLeast(this.ctl.get(), STOP)) && !wt.isInterrupted()) {
                wt.interrupt();//中断当前线程
            }

            try {
                this.beforeExecute(wt, task);//前置操作,空方法,可以业务自己实现

                try {
                    task.run();//执行run,也就是下面方法
                    //new Runnable() {
                    //   @Override
                    //  public void run() {
                    //      System.out.println("thread id is: " + Thread.currentThread().getId());
                    //   }
                    //}
                    this.afterExecute(task, (Throwable)null);//后置操作,空方法,可以业务自己实现
                } catch (Throwable var14) {
                    this.afterExecute(task, var14);
                    throw var14;
                }
            } finally {
                task = null;//最后将task置为null
                ++w.completedTasks;//已完成的任务计数器+1
                w.unlock();//释放当前线程的独占锁
            }
        }

        completedAbruptly = false;
    } finally {
        this.processWorkerExit(w, completedAbruptly);
    }

}

runWorker是开启了一个线程,就一直循环执行getTask,知道task==null才结束。

来看下getTask源码

代码语言:javascript
复制
private Runnable getTask() {
    boolean timedOut = false;

    while(true) {
        int c = this.ctl.get();
        // 如果当前状态是>=SHOTDOWN状态&&(运行状态是STOP或者队列是空的).
        // 1)如果线程池的状态是>=STOP状态,这个时候不再处理队列中的任务,并且减少worker记录数量,
        //返回的任务为null,这个时候在runRWorker方法中会执行processWorkerExit进行worker的退出操作.
        // 2)如果线程池的状态是>=SHUTDOWN并且workQueue为空,就说明处于SHOTdown以上的状态下,
        //且没有任务在等待,那么也属于获取不到任务,getTask返回null.
        if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || this.workQueue.isEmpty())) {
            this.decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);
        //如果允许超时切线程数大于核心线程容量,则开启超时机制timed=true
        boolean timed = this.allowCoreThreadTimeOut || wc > this.corePoolSize;
        //如果当前线程数小于等于最大线程容量 且 不允许超时或者当前没有超时 或者当前线程数小于等于1 且当前任务队列不是空
        if (wc <= this.maximumPoolSize && (!timed || !timedOut) || wc <= 1 && !this.workQueue.isEmpty()) {
            try {
                //判断是否允许超时,允许超时用poll设置超时时间,不允许就使用take依赖超时机制
                Runnable r = timed ? (Runnable)this.workQueue.poll(this.keepAliveTime, TimeUnit.NANOSECONDS) 
                : (Runnable)this.workQueue.take();
                if (r != null) {
                    return r;
                }

                timedOut = true;
            } catch (InterruptedException var6) {
                timedOut = false;
            }
        } else if (this.compareAndDecrementWorkerCount(c)) {
            return null;
        }
    }
}

getTask主要就是从workQueue队列中不断的取Runnable任务。

继续看processWorkerExit方法源码

代码语言:javascript
复制
private void processWorkerExit(ThreadPoolExecutor.Worker w, boolean completedAbruptly) {
    if (completedAbruptly) {
        //任务完成,将工作线程数量-1
        this.decrementWorkerCount();
    }

    ReentrantLock mainLock = this.mainLock;
    mainLock.lock();

    //
    try {
        this.completedTaskCount += w.completedTasks;
        //从worker中移除任务
        this.workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    //尝试关闭线程池,但如果是正常运行状态,就不会关闭
    this.tryTerminate();
    int c = this.ctl.get();
    if (runStateLessThan(c, 536870912)) {
        if (!completedAbruptly) {
           //如果允许核心线程超时并且当前队列里面还有任务,那就必须留一个线程.
            int min = this.allowCoreThreadTimeOut ? 0 : this.corePoolSize;
            if (min == 0 && !this.workQueue.isEmpty()) {
                min = 1;
            }

            if (workerCountOf(c) >= min) {
                return;
            }
        }
        
        this.addWorker((Runnable)null, false);
    }

}

三、问题

1.如何复用线程?

回顾代码,好像也没有直接体现复用线程池的,Worker每次都是new。

我们先看下executorService.execute代码,里面有这样的逻辑,

如果当前线程数小于核心线程数,则执行this.addWorker(command,true)操作,这个方法主要就是创建Worker对象,并且运行Worker里面thread直接start,开启线程,执行firstTask任务。假如核心线程数是4,那就开启4个线程。

如果当前线程大于核心线程数,就走this.workQueue.offer(command)这一步,把任务添加到workQueue队列中。

我们看下runWorker方法,这个就是执行线程任务的主要方法,看下while(task !=null||(task =this.getTask())!=null)这个条件,当我们调用this.addWorker(command,true)方法是,task就是command 不等于null,那就执行内容,执行完成后finally{ task =null;++w.completedTasks;w.unlock();//}task=null ,那任务就应该完成,线程就要销毁。

但是我们还要注意while条件中有个getTask方法,里面就是从阻塞队列中获取Runnable任务,也就是workQueue取Runnable。workQueue有内容,则在当前Worker线程中执行Runnable,没有内容的话,就阻塞。此时线程就不会销毁。

2.如何并发?

线程池是如何做到高效并发的。

看整个线程池的工作流程,有以下几个需要特别关注的并发点.

线程池状态和工作线程数量的变更。这个由一个AtomicInteger变量 ctl来解决原子性问题。

向工作Worker容器workers中添加新的Worker的时候线程池加锁。

执行具体任务的时候,线程枷锁。

工作线程Worker从等待队列中取任务的时候。这个由工作队列本身来保证线程安全,比如LinkedBlockingQueue等。

3.非核心线程什么时候销毁?

代码语言:javascript
复制
boolean timed = this.allowCoreThreadTimeOut || wc > this.corePoolSize;
if (wc <= this.maximumPoolSize && (!timed || !timedOut) || wc <= 1 && !this.workQueue.isEmpty()) {
    try {
        Runnable r = timed ? (Runnable)this.workQueue.poll(this.keepAliveTime, TimeUnit.NANOSECONDS) : (Runnable)this.workQueue.take();
        if (r != null) {
            return r;
        }

        timedOut = true;
    } catch (InterruptedException var6) {
        timedOut = false;
    }
}

看下这段代码,当wc >this.corePoolSize时,应该有产生非核心线程了,timed=true,Runnable r=(Runnable)this.workQueue.poll(this.keepAliveTime, TimeUnit.NANOSECONDS),在队列超过keepAliveTime时长后,返回null,直接跳出 while(task !=null||(task =this.getTask())!=null)代码,走processWorkerExit函数。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、前言
  • 二、怎么做
  • 三、问题
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档