前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >这些并发编程知识,一定要知道

这些并发编程知识,一定要知道

作者头像
程序大视界
发布2022-12-19 21:18:35
2420
发布2022-12-19 21:18:35
举报
文章被收录于专栏:程序大视界

01

前言

在Java里面,多线程、并发编程一直是程序员必须要掌握和重中之重,尤其在高并发、大数据量的互联网场景下。很多人说多线程和并发编程的内容很复杂、难学,这是因为多线程及并发编程的内容多,知识点很广泛且深入。想要短时间内掌握多线程和并发编程等知识,确实不是那么简单的。不过我们可以分节点和内容块一个一个击破,做到“分而治之”、“化繁为简”这样再庞大复杂的知识内容也可以简单的掌握,下面跟着节奏一起来学习交流一下吧。

02

线程定义

2.1 什么是线程?

操作系统在运行一个程序时会为其创建一个进程,操作系统分配资源的最小单元是进程,CPU运算调度的最小单位是线程,在一个进程里可以创建多个线程,这些线程都拥有各自的计数器、堆栈和局部变量等属性,并且能够访问共享的内存变量。

2.2 线程与进程的关系

一个进程里可以创建多个线程,线程各自运行。

03

Java内存模型

并发编程多个线程间的通信机制有:共享内存和消息传递。

共享内存的并发模型里,线程之间共享程序的公共状态,通过写-读内存中的公共状态进行隐式通信。在消息传递的并发模型里,线程之间没有公共状态,线程之间必须通过发送消息来显示进行通信。

同步是指程序中用于控制不同线程间操作发生相对顺序的机制,在共享内存并发模型里,同步是显示进行的。程序员必须显示指定某个方法或某段代码需要在线程之间互斥执行。在消息传递的并发模型里,由于消息的发送必须在消息的接收之前,因此同步是隐式进行的。

在Java中,所有实例域、静态域和数组元素都存储在堆内存中,堆内存在线程之间共享。局部变量(Local Variables),方法定义参数(Java语言规范称之为Formal Method Parameters)和异常处理器参数(Exception Handler Parameters)不会在线程之间共享。

Java线程之间的通信由Java内存模型(本文简称为JMM)控制,JMM决定一个线程对共享变量的写入何时对另一个线程可见。从抽象的角度来看,JMM定义了线程和主内存之间的抽象关系:线程之间的共享变量存储在主内存(Main Memory)中,每个线程都有一个私有的本地内存(Local Memory),本地内存中存储了该线程以读/写共享变量的副本。本地内存是JMM的一个抽象概念,并不真实存在。它涵盖了缓存、写缓冲区、寄存器以及其他的硬件和编译器优化。Java内存模型的抽象示意如图3-1所示。

从图3-1来看,如果线程A与线程B之间要通信的话,必须要经历下面2个步骤。

1)线程A把本地内存A中更新过的共享变量刷新到主内存中去。

2)线程B到主内存中去读取线程A之前已更新过的共享变量。

下面通过示意图(见图3-2)来说明这两个步骤。

如图3-2所示,本地内存A和本地内存B由主内存中共享变量x的副本。假设初始时,这3个内存中的x值都为0。线程A在执行时,把更新后的x值(假设值为1)临时存放在自己的本地内存A中。当线程A和线程B需要通信时,线程A首先会把自己本地内存中修改后的x值刷新到主内存中,此时主内存中的x值变为了1。随后,线程B到主内存中去读取线程A更新后的x值,此时线程B的本地内存的x值也变为了1。

04

线程池

4.1 线程池的好处

Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。

第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。

第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。但是,要做到合理利用线程池,必须对其实现原理了如指掌。

4.2 线程池的实现原理

当向线程池提交一个任务之后,线程池的处理流程如下:

1)线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程。

2)线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程。

3)线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务。

4.3 线程池的执行流程

ThreadPoolExecutor执行execute方法分下面4种情况。

1)如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。

2)如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。

3)如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。

4)如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。

线程池中的线程执行任务分两种情况,如下。

1)在execute()方法中创建一个线程时,会让这个线程执行当前任务。

2)这个线程执行完上图中1的任务后,会反复从BlockingQueue获取任务来执行。

4.4 创建线程池

通过ThreadPoolExecutor来创建一个线程池:

代码语言:javascript
复制
new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,milliseconds,runnableTaskQueue, handler);

创建一个线程池时需要输入几个参数,如下。

1)corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线

程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任

务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads()方法,

线程池会提前创建并启动所有基本线程。

2)runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列。

·ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序。

·LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。

·SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用

移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于Linked-BlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。

·PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

3)maximumPoolSize(线程池最大数量):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如果使用了无界的任务队列这个参数就没什么效果。

4)ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设

置更有意义的名字。使用开源框架guava提供的ThreadFactoryBuilder可以快速给线程池里的线程设置有意义的名字,代码如下。

new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();

5)RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。在JDK 1.5中Java线程池框架提供了以下4种策略。

·AbortPolicy:直接抛出异常。

·CallerRunsPolicy:只用调用者所在线程来运行任务。

·DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。

·DiscardPolicy:不处理,丢弃掉。

当然,也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化存储不能处理的任务。

·keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。

·TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和纳秒(NANOSECONDS,千分之一微秒)。

4.5 线程池提交任务

有两个方法向线程池提交任务,分别为execute()和submit()方法。

execute()方法:用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。

submit()方法:用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

4.6 关闭线程池

可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别:

shutdownNow:首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。

shutdown:只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。

至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。

05

Executor框架

Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。

5.1 Executor框架的结构

Executor框架主要由3大部分组成如下。

·任务:包括被执行任务需要实现的接口:Runnable接口或Callable接口。

·任务的执行:包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。Executor框架有两个关键类实现了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。

·异步计算的结果:包括接口Future和实现Future接口的FutureTask类。

5.2 Executor框架包含的主要的类与接口的简介

·Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。

·ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。

·ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor比Timer更灵活,功能更强大。

·Future接口和实现Future接口的FutureTask类,代表异步计算的结果。

·Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或Scheduled-ThreadPoolExecutor执行。

5.3 Executor框架成员介绍

主要成员:ThreadPoolExecutor、ScheduledThreadPoolExecutor、Future接口、Runnable接口、Callable接口和Executors。

1)ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor通常使用工厂类Executors来创建。Executors可以创建2种类型的ScheduledThreadPoolExecutor,如下。

·ScheduledThreadPoolExecutor。包含若干个线程的ScheduledThreadPoolExecutor。

·SingleThreadScheduledExecutor。只包含一个线程的ScheduledThreadPoolExecutor。

下面分别介绍这两种ScheduledThreadPoolExecutor。

下面是工厂类Executors提供的,创建固定个数线程的ScheduledThreadPoolExecutor的API。

代码语言:javascript
复制
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize,ThreadFactory

ScheduledThreadPoolExecutor适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需求而需要限制后台线程的数量的应用场景。下面是Executors提供的,创建单个线程的SingleThreadScheduledExecutor的API。

代码语言:javascript
复制
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
public static ScheduledExecutorService newSingleThreadScheduledExecutor
(ThreadFactory threadFactory)

SingleThreadScheduledExecutor适用于需要单个后台线程执行周期任务,同时需要保证顺序地执行各个任务的应用场景。

(2)ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor通常使用工厂类Executors来创建。Executors可以创建2种类型的ScheduledThreadPoolExecutor,如下。

·ScheduledThreadPoolExecutor。包含若干个线程的ScheduledThreadPoolExecutor。

·SingleThreadScheduledExecutor。只包含一个线程的ScheduledThreadPoolExecutor。

下面分别介绍这两种ScheduledThreadPoolExecutor。

下面是工厂类Executors提供的,创建固定个数线程的ScheduledThreadPoolExecutor的API。

代码语言:javascript
复制
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize,ThreadFactory

ScheduledThreadPoolExecutor适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需求而需要限制后台线程的数量的应用场景。下面是Executors提供的,创建单个线程的SingleThreadScheduledExecutor的API。

代码语言:javascript
复制
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)

SingleThreadScheduledExecutor适用于需要单个后台线程执行周期任务,同时需要保证顺序地执行各个任务的应用场景。

(3)Future接口

Future接口和实现Future接口的FutureTask类用来表示异步计算的结果。当我们把Runnable接口或Callable接口的实现类提交(submit)给ThreadPoolExecutor或ScheduledThreadPoolExecutor时,ThreadPoolExecutor或ScheduledThreadPoolExecutor会向我们返回一个FutureTask对象。下面是对应的API。

代码语言:javascript
复制
<T> Future<T> submit(Callable<T> task)
<T> Future<T> submit(Runnable task, T result)
Future<> submit(Runnable task)

有一点需要读者注意,到目前最新的JDK 8为止,Java通过上述API返回的是一个FutureTask对象。但从API可以看到,Java仅仅保证返回的是一个实现了Future接口的对象。在将来的JDK实现中,返回的可能不一定是FutureTask。

(4)Runnable接口和Callable接口

Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或Scheduled-ThreadPoolExecutor执行。它们之间的区别是Runnable不会返回结果,而Callable可以返回结果。除了可以自己创建实现Callable接口的对象外,还可以使用工厂类Executors来把一个Runnable包装成一个Callable。下面是Executors提供的,把一个Runnable包装成一个Callable的API。

public static Callable<Object> callable(Runnable task) // 假设返回对象Callable1下面是Executors提供的,把一个Runnable和一个待返回的结果包装成一个Callable的API。public static <T> Callable<T> callable(Runnable task, T result) // 假设返回对象Callable2前面讲过,当我们把一个Callable对象(比如上面的Callable1或Callable2)提交给ThreadPoolExecutor或ScheduledThreadPoolExecutor执行时,submit(…)会向我们返回一个FutureTask对象。我们可以执行FutureTask.get()方法来等待任务执行完成。当任务成功完成后FutureTask.get()将返回该任务的结果。例如,如果提交的是对象Callable1,FutureTask.get()方法将返回null;如果提交的是对象Callable2,FutureTask.get()方法将返回result对象。

5.4 ThreadPoolExecutor

Executor框架最核心的类是ThreadPoolExecutor,它是线程池的实现类,主要由下列4个组件构成。

·corePool:核心线程池的大小。

·maximumPool:最大线程池的大小。

·BlockingQueue:用来暂时保存任务的工作队列。

·RejectedExecutionHandler:当ThreadPoolExecutor已经关闭或ThreadPoolExecutor已经饱和时(达到了最大线程池大小且工作队列已满),execute()方法将要调用的Handler。

·通过Executor框架的工具类Executors,可以创建3种类型的ThreadPoolExecutor

·FixedThreadPool

·SingleThreadExecutor

·CachedThreadPool

5.4.1 Executors.newCachedThreadPool

newCachedThreadPool是一个根据需要创建新线程的线程池,当一个任务提交时,corePoolSize为0不创建核心线程,SynchronousQueue是一个不存储元素的队列,可以理解为队里永远是满的,因此最终会创建非核心线程来执行任务。对于非核心线程空闲60s时将被回收。因为Integer.MAX_VALUE非常大,可以认为是可以无限创建线程的,在资源有限的情况下容易引起OOM异常。

代码语言:javascript
复制
//创建newCachedThreadPool线程池源码
public static ExecutorService newCachedThreadPool() {
    /**
        *corePoolSize: 0,核心线程池的数量为0
    *maximumPoolSize:  Integer.MAX_VALUE,可以认为最大线程数是无限的
    *keepAliveTime: 60L
    *unit: 秒
    *workQueue: SynchronousQueue
        **/
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

测试案例:

代码语言:javascript
复制
public static void main(String[] args) {
    ExecutorService executor = Executors.newCachedThreadPool();
    for (int i = 0; i < 10; i++) {
        final int index = i;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                log.info("CachedTask:{}",index);
            }
        });
    }
}

newCachedThreadPool的返回值是ExecutorService类型,该类型只包含基础的线程池方法,但却不包含线程监控相关方法,因此在使用返回值为ExecutorService的线程池类型创建新线程时要考虑到具体情况。

5.4.2 Executors.newSingleThreadExecutor

newSingleThreadExecutor是单线程线程池,只有一个核心线程,用唯一的一个共用线程执行任务,保证所有任务按指定顺序执行(FIFO、优先级…)

代码语言:javascript
复制
//newSingleThreadExecutor创建线程池源码
public static ExecutorService newSingleThreadExecutor() {
    /**
      *  corePoolSize : 1,核心线程池的数量为1
      *  maximumPoolSize : 1,只可以创建一个非核心线程
      *  keepAliveTime : 0L
      *  unit => 秒
      *  workQueue => LinkedBlockingQueue
      **/
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

当一个任务提交时,首先会创建一个核心线程来执行任务,如果超过核心线程的数量,将会放入队列中,因为LinkedBlockingQueue是长度为Integer.MAX_VALUE的队列,可以认为是无界队列,因此往队列中可以插入无限多的任务,在资源有限的时候容易引起OOM异常,同时因为无界队列,maximumPoolSize和keepAliveTime参数将无效,压根就不会创建非核心线程。

5.4.3 Executors.newFixedThreadPool

固定长度线程池,核心线程数和最大线程数由用户传入,可以设置线程的最大并发数,超出在队列等待。

代码语言:javascript
复制
//newFixedThreadPool创建线程池源码
public static ExecutorService newFixedThreadPool(int nThreads) {
      /**
          *  corePoolSize : 核心线程的数量为自定义输入nThreads
          *  maximumPoolSize : 最大线程的数量为自定义输入nThreads
          *  keepAliveTime : 0L
          *  unit : 秒
          *  workQueue : LinkedBlockingQueue
          **/
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

newFixedThreadPool和SingleThreadExecutor类似,唯一的区别就是核心线程数不同,并且由于使用的是LinkedBlockingQueue,在资源有限的时候容易引起OOM异常。

5.4.4 Executors.newScheduledThreadPool

定长线程池,核心线程数由用户传入,支持定时和周期任务执行

代码语言:javascript
复制
//newScheduledThreadPool创建线程池源码
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
    /**
      *  corePoolSize : 核心线程的数量为自定义输入corePoolSize
      *  maximumPoolSize : 最大线程的数量为Integer.MAX_VALUE
      *  keepAliveTime : 0L
      *  unit : 纳秒
      *  workQueue : DelayedWorkQueue
      **/
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

当一个任务提交时,corePoolSize为自定义输入,首先创建核心线程,核心线程满了之后,因此最终会创建非核心线程来执行任务。非核心线程使用后将被回收。因为Integer.MAX_VALUE非常大,可以认为是可以无限创建线程的,在资源有限的情况下容易引起OOM异常。因为使用的DelayedWorkQueue可以实现定时和周期任务。

ScheduledExecutorService提供了三种方法可以使用:

schedule:延迟后执行任务

scheduleAtFixedRate:以指定的速率执行任务

scheduleWithFixedDelay:以指定的延迟执行任务

总结

FixedThreadPool和SingleThreadExecutor 允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而引起OOM异常。

CachedThreadPool 和newScheduledThreadPool允许创建的线程数为Integer.MAX_VALUE,可能会创建大量的线程,从而引起OOM异常。

基于以上原因一般禁止使用Executors去创建线程池,而是推荐自己去创建ThreadPoolExecutor。

06

并发工具类

CountDownLatch、CyclicBarrier、Semaphore:工具类提供了一种并发流程控制的手段,Exchanger工具类则提供了在线程间交换数据的一种手段。

6.1 CountDownLatch 计数器

CountDownLatch允许一个或多个线程等待其他线程完成操作。假如有这样一个需求:我们需要解析一个Excel里多个sheet的数据,此时可以考虑使用多线程,每个线程解析一个sheet里的数据,等到所有的sheet都解析完之后,程序需要提示解析完成。在这个需求中,要实现主线程等待所有线程完成sheet的解析操作,可以使用CountDownLatch来实现:

代码语言:javascript
复制
public class CountDownLatchTest {
    static CountDownLatch c = new CountDownLatch(2);
    public static void main(String[] args) throws InterruptedException {
    new Thread(new Runnable() {
        @Override
        public void run() {
            System.out.println(1);
            c.countDown();
            System.out.println(2);
            c.countDown();
        }
    }).start();
        c.await();
        System.out.println("3");
    }
}

CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N。当我们调用CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await方法会阻塞当前线程,直到N变成零。由于countDown方法可以用在任何地方,所以这里说的N个点,可以是N个线程,也可以是1个线程里的N个执行步骤。用在多个线程时,只需要把这个CountDownLatch的引用传递到线程里即可。

注意:计数器必须大于等于0,只是等于0时候,计数器就是零,调用await方法时不会阻塞当前线程。CountDownLatch不可能重新初始化或者修改CountDownLatch对象的内部计数器的值。一个线程调用countDown方法happen-before,另外一个线程调用await方法。

6.2 同步屏障CyclicBarrier

CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

CyclicBarrier简介:CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

代码语言:javascript
复制
public class CyclicBarrierTest {
    static CyclicBarrier c = new CyclicBarrier(2);
    public static void main(String[] args) {
    new Thread(new Runnable() {
    @Override
    public void run() {
        try {
            c.await();
        } catch (Exception e) {
    }
    System.out.println(1);
    }
    }).start();
    try {
        c.await();
    } catch (Exception e) {
}
System.out.println(2);
}
}

6.3 CyclicBarrier和CountDownLatch的区别

CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置。所以CyclicBarrier能处理更为复杂的业务场景。例如,如果计算发生错误,可以重置计数器,并让线程重新执行一次。CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得Cyclic-Barrier阻塞的线程数量。isBroken()方法用来了解阻塞的线程是否被中断。

6.4 控制并发线程数Semaphore

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。Semaphore可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接;假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,就可以使用Semaphore来做流量控制,如代码清单:

代码语言:javascript
复制
public class SemaphoreTest {
    private static final int THREAD_COUNT = 30;
    private static ExecutorServicethreadPool = Executors.newFixedThreadPool(THREAD_COUNT);
    private static Semaphore s = new Semaphore(10);
    public static void main(String[] args) {
        for (inti = 0; i< THREAD_COUNT; i++) {
        threadPool.execute(new Runnable() {
        @Override
        public void run() {
            try {
                s.acquire();
                System.out.println("save data");
                s.release();
            } catch (InterruptedException e) {
    }
    }
});
}
    threadPool.shutdown();
}
}

在代码中,虽然有30个线程在执行,但是只允许10个并发执行。Semaphore的构造方法

Semaphore(int permits)接受一个整型的数字,表示可用的许可证数量。Semaphore(10)表示允许10个线程获取许可证,也就是最大并发数是10。Semaphore的用法也很简单,首先线程使用Semaphore的acquire()方法获取一个许可证,使用完之后调用release()方法归还许可证。还可以用tryAcquire()方法尝试获取许可证。

·intavailablePermits():返回此信号量中当前可用的许可证数。

·intgetQueueLength():返回正在等待获取许可证的线程数。

·booleanhasQueuedThreads():是否有线程正在等待获取许可证。

·void reducePermits(int reduction):减少reduction个许可证,是个protected方法。

·Collection getQueuedThreads():返回所有等待获取许可证的线程集合,是个protected方法。

07

java的原子操作类

7.1 原子操作类型

通常在做一些线程不安全的更新操作时,会用synchronized保证多线程的同步性问题。JDK 1.5开始提供了java.util.concurrent.atomic包(以下简称Atomic包),这个包中的原子操作类提供了一种用法简单、性能高效、线程安全地更新一个变量的方式。一共提供了13个类,属于4种类型的原子更新方式,分别是原子更新基本类型、原子更新数组、原子更新引用和原子更新属性(字段)。

Atomic包提供了以下3个类。

·AtomicBoolean:原子更新布尔类型。

·AtomicInteger:原子更新整型。

·AtomicLong:原子更新长整型。

7.2 AtomicInteger

3个类提供的方法几乎一样,以AtomicInteger为例进行讲解,AtomicInteger的常用方法如下。

·int addAndGet(int delta):以原子方式将输入的数值与实例中的值(AtomicInteger里的value)相加,并返回结果。

·boolean compareAndSet(int expect,int update):如果输入的数值等于预期值,则以原子方式将该值设置为输入的值。

·int getAndIncrement():以原子方式将当前值加1,注意,这里返回的是自增前的值。

·void lazySet(int newValue):最终会设置成newValue,使用lazySet设置值后,可能导致其他线程在之后的一小段时间内还是可以读到旧的值。

int getAndSet(int newValue):以原子方式设置为newValue的值,并返回旧值。

代码语言:javascript
复制
AtomicInteger示例代码如代码清单7-1所示:
import java.util.concurrent.atomic.AtomicInteger;
    public class AtomicIntegerTest {
    static AtomicInteger ai = new AtomicInteger(1);
    public static void main(String[] args) {
    System.out.println(ai.getAndIncrement());
    System.out.println(ai.get());
    }
}

getAndIncremen方法得到源码分析:

代码语言:javascript
复制
public final int getAndIncrement() {
    for (;;) {
    int current = get();
    int next = current + 1;
    if (compareAndSet(current, next))
    return current;
    }
}
public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

for循环体的第一步先取得AtomicInteger里存储的数值,第二步对AtomicInteger的当前数值进行加1操作,关键的第三步调用compareAndSet方法来进行原子更新操作,该方法先检查当前数值是否等于current,等于意味着AtomicInteger的值没有被其他线程修改过,则将AtomicInteger的当前数值更新成next的值,如果不等compareAndSet方法会返回false,程序会进入for循环重新进行compareAndSet操作。

08

HashMap、HashTable、ConcurrentHashMap

HashMap:是线程不安全的,put操作时容易引起死循环,在多线程并发时导致HashMap的Entry链表形成环形数据,一旦形成环形数据结构,Entry的next节点永不为空形成死循环。

HashTable:线程安全,但是效率低下,每一个线程都必须竞争同一把锁。当一个线程访问HashTable的同步方法,其他线程也访问HashTable的同步方法时,会进入阻塞或轮询状态。

ConcurrentHashMap:在HashTable的基础上,对不同数据段的数据进行锁住,即使用锁分段技术。将数据分成一段一段地存储起来,给每一段数据配一把锁。由Segment数组结构和HashEntry数组结构组成,Segment是一种可重入锁,HashEntry则用于存储键值对数据。

09

锁用来控制多个线程访问共享资源的方式,一般来说,一个锁能够防止多个线程同时访问共享资源(但是有些锁可以允许多个线程并发的访问共享资源,比如读写锁)。Java里面提供了几个不同的锁:Lock接口、队列同步器AbstractQueuedSynchronizer(AQS)、重入锁(ReentrantLock)、读写锁(ReentrantReadWriteLock)

9.1 Lock接口

与syncronized不同,Lock接口是显示的获取和释放锁,Lock接口提供了如下所示的接口API:

9.2 队列同步器AQS

队列同步器AQS,是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作,并发包的作者(Doug Lea)期望它能够成为实现大部分同步需求的基础。

同步器的设计是基于模板方法模式的,使用者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,模板方法将会调用使用者重写的方法。重写同步器指定的方法时,需要使用同步器提供的如下3个方法来访问或修改同步状态。

·getState():获取当前同步状态。

·setState(int newState):设置当前同步状态。

·compareAndSetState(int expect,int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性。同步器可重写的方法:

同步器提供的模板方法:

AQS用来构建锁或其他同步组件的基础框架,使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。

9.3 重入锁(ReentrantLock)

重入锁ReentrantLock,就是支持重进入的锁,它表示该锁能够支持一个线程对资源的重复加锁。除此之外,该锁的还支持获取锁时的公平和非公平性选择。

ReentrantLock在调用lock()方法时,已经获取到锁的线程,能够再次调用lock()方法获取锁而不被阻塞。

9.3.1 实现重进入

1)线程再次获取锁。锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取。

锁的最终释放。线程重复n次获取了锁,随后在第n次释放该锁后,其他线程能够获取到该锁。锁的最终释放要求锁对于获取进行计数自增,计数表示当前锁被重复获取的次数,而锁被释放时,计数自减,当计数等于0时表示锁已经成功释放。ReentrantLock通过组合自定义同步器来实现锁的获取与释放,以非公平性(默认的)实现为例:

代码语言:javascript
复制
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
    if (compareAndSetState(0, acquires)) {
    setExclusiveOwnerThread(current);
        return true;
    }
    } else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
    if (nextc < 0)
        throw new Error("Maximum lock count exceeded");
    setState(nextc);
        return true;
    }
    return false;
}

9.4 读写锁

读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升。除了保证写操作对读操作的可见性以及并发性的提升之外,读写锁能够简化读写交互场景的编程方式。

ReadWriteLock仅定义了获取读锁和写锁的两个方法,即readLock()方法和writeLock()方法:

9.5 Condition接口

Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对象关联的锁。Condition对象是由Lock对象(调用Lock对象的newCondition()方法)创建出来的,换句话说,Condition是依赖Lock对象的。ConditionObject是同步器AbstractQueuedSynchronizer的内部类,因为Condition的操作需要获取相关联的锁,所以作为同步器的内部类也较为合理。每个Condition对象都包含着一个队列(以下称为等待队列),该队列是Condition对象实现等待/通知功能的关键。

10

volatile和synchronize

Java支持多个线程同时访问一个对象或者对象的成员变量,由于每个线程可以拥有这个变量的拷贝,程序在执行过程中,一个线程看到的变量并不一定是最新的。

Volatile:关键字volatile可以用来修饰字符(成员变量),就是告知程序任何对该变量的访问均需要从共享内存中获取,而对它的改变必须同步刷新回共享内存,它能保证所有线程对变量的访问可见性。

Synchronize:关键字synchronize可以修饰方法或者以同步块的形式来进行使用,它主要确保多个线程在同一时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性。

11

ThreadLocal

ThreadLocal即线程变量,是一个以ThreadLocal对象为键、任意对象为值的存储结构,这个结构被附带在线程上,也就是说一个线程可以根据一个ThreadLocal对象查询到绑定在这个线程上的一个值。可以通过set(T)方法来设置一个值,在当前线程下再通过get()方法获取到原先设置的值。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-11-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序大视界 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 9.1 Lock接口
  • volatile和synchronize
  • ThreadLocal
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档