点击上方“晏霖”,选择“置顶或者星标”
曾经有人关注了我
后来他有了女朋友
2.9.1 Java中的并发容器
在我们开发中,经常会使用到容器来存储对象或数据,容器的作用非常大,合理使用各个容器的特性和方法可以简化开发,提高系统性能和安全性。
我们所说的容器,在Java中可分为两大块,首先就是Collection接口的分支,有Set、List、Queue,另一块就是Map接口的分支,代表集合HashMap,他们派生出来的子类也很多,每个子类都有自己的特性。这部分属于Java基础,在面试中几乎常常被问到,熟练掌握Java集合特性,是面试官在试探一个Java程序员基础知识扎实程度重要手段之一。
早期,我们在Java多线程下操作一个容器除了借助synchronized关键字,就是使用同步容器。典型的同步容器有Vector和Hashtable,而且当时在设计Hashtable的时候还没有考虑到驼峰命名。我们列举出一些常见的线程安全的和非线程安全的容器,如下表2-14所示。
表2-14 早期Java中线程安全与非安全容器
线程安全 | 非线程安全 |
---|---|
Vector | ArrayList、LinkedList |
Properties | HashSet、TreeSet |
HashTable | HashMap |
…… | …… |
在多线程中,无论是使用同步方法还是同步容器,使其对容器操作具有线程安全都是允许的,但是最大的问题就是效率,我们为了提高效率才使用的多线程,或者某些并发场景下,都是对效率问题不可忽视的,或者有些复杂的场景还会多线程交替对容器进行存取,可能会发生ArrayIndexOutOfBoundsException异常。又例如在多线程处理HashMap使,put操作会引起死循环,这种死循环会导致cpu接近100%。总结一下就是,同步容器对所有容器操作串行化,来实现他们的线程安全性,代价就是效率,因为串行化严重降低并发性和吞吐量。所以,在Java5.0提供来多种并发容器,不仅可以保证线程安全同时又能保证高效操作。增加来ConcurrentHashMap来代替基于散列的Map,以及CopyOnWriteArrayList在多线程操作下代替List,还有为提高阻塞队列性能的ConcurrentLinkedQueue。在我所接触的项目中,开发人员对ConcurrentHashMap使用率远远高于CopyOnWriteArrayList和ConcurrentLinkedQueue,原因可能第一是因为并发场景使用少,第二对于数据容器结构,Map使用起来较为广泛和方便。本小结有必要解释一下CopyOnWriteArrayList,其实对于我来说,更比较喜欢用List,只是个人习惯而已,在多线程和并发情况下,CopyOnWriteArrayList可以代替List,类似这样的还有CopyOnWriteArraySet代替同步Set。使用方法和普通的List和Set没有区别。可以看到他们的命名规则是在ArrayList和ArraySet加前缀CopyOnWrite,CopyOnWrite理解成“写时复制”,这也是他保证线程安全的特征,指的是,在每次修改或写入时,都会创建并重新生成一个新的容器副本。我们可以把这个过程形象的理解成项目的版本迭代开发,每个人每次迭代都会对前一个版本进行完整更新,在此基础上进行开发。每次要变更这个容器都会复制一份,在此基础上进行修改,这样保证了多个线程同时对这个容器进行迭代时,不会被彼此干扰。CopyOnWriteArrayList更像是一种读写分离的实现,他不能保证修改过程中对于读取数据的实时准确性,但是可以保证最终一致性。可见COW这一过程需要复制底层数组,这是有一定性能开销对,特别对于容器数据量较大时,影响比较大。CopyOnWriteArrayList还有一个应用场景,在循环中对普通的List进行remove操作时会出现ConcurrentModificationException异常,还可能会出现数组下标越界异常,对于这样的操作我们可以使用CopyOnWriteArrayList代替List,但是我推荐把集合转成Iterator进行循环对其remove操作。
2.9.2 ConcurrentHashMap原理
大家对ConcurrentHashMap应该是非常熟悉的,在并发和多线程中代替HashMap,保证线程安全,并且提高并发效率。对于使用者来说也是很简单的,其内部提供操作容器的方法与HashMap基本一致,而且在设计上也沿用了一些HashMap的概念。ConcurrentHashMap是在JDK5时引入的线程安全的哈希式集合,在JDK8之前一直采用分段锁的设计思想,分段锁是由Segment内部类实现的,他继承ReentrantLock,用来管理每个HashEntry,相当于把容器分成众多的Segment,对于修改数据时只需对他所在对Segment进行同步达到线程安全对。在JDK8之后,ConcurrentHashMap抛弃了分段锁的概念,直接使用Node内部类作为存储具体的键值对,把put流程的控制粒度更加细化,引入了CAS无锁操作和synchronized来保证并发安全。并且数据结构跟HashMap1.8的结构一样,数组+链表/红黑树。以上我只是简单说一些变更的内容和一些概念,这些概念可以引申出许多问题,很多同学可能在面试过程中已经领教过了。而且在JDK8之后的ConcurrentHashMap源码已经达到了6300多行,引入了很多内部方法,涉及到的知识点是非常多的。要了解每一行代码的含义是一件非常难的事,所以我们要归纳其重点,了解其精华即可。下面我们要结合源码来聊一聊ConcurrentHashMap原理的重要内容。还是老办法,深入一个容器或者对象的源码,首先要看内部定义的属性和类,请看示例代码2-37。
代码清单2-37 ConcurrentHashMap.java
//最大容量
private static final int MAXIMUM_CAPACITY = 1 << 30;
//默认容量
private static final int DEFAULT_CAPACITY = 16;
//array的大小,只用于toarray方法
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//根据这个数来计算segment的个数,segment的个数是仅小于这个数且是2的几次方的一个数(ssize)
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//加载因子
private static final float LOAD_FACTOR = 0.75f;
//变成红黑树的链表节点数,超过8就由链表转换成红黑树
static final int TREEIFY_THRESHOLD = 8;
//当树节点小于6自动转换成链表
static final int UNTREEIFY_THRESHOLD = 6;
// 在转变成树之前,还会有一次判断,只有键值对数量大于 64 才会发生转换。这是为了避免在哈希表建立初期,多个键值对恰好被放入了同一个链表中而导致不必要的转化。
static final int MIN_TREEIFY_CAPACITY = 64;
//每次进行转移的最小值这个值作为一个下限来避免Rsisize遇到过多的内存争用
private static final int MIN_TRANSFER_STRIDE = 16;
// 生成sizeCtl所使用的bit位数
private static final int RESIZE_STAMP_BITS = 16;
// 进行扩容所允许的最大线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// 记录sizeCtl中的大小所需要进行的偏移位数
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
// 一系列的标识
//forwarding nodes 主要作用是表征一个节点已经被处理干净(resize的时候被转移到新表了)
static final int MOVED = -1; // hash for forwarding nodes
//表示树的根节点
static final int TREEBIN = -2; // hash for roots of trees
//表示transient
static final int RESERVED = -3; // hash for transient reservations
//31个1用来计算普通的node的哈希码
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
// 获取可用的CPU个数
static final int NCPU = Runtime.getRuntime().availableProcessors();
// 进行序列化的属性
private static final ObjectStreamField[] serialPersistentFields = {
new ObjectStreamField("segments", Segment[].class),
new ObjectStreamField("segmentMask", Integer.TYPE),
new ObjectStreamField("segmentShift", Integer.TYPE),
};
//省略部分代码
//也是用节点数组来构成哈希表,在第一次插入的时候会懒初始化,size是2的整数次幂,直接通过iterators来访问
transient volatile Node<K,V>[] table;
//默认null,扩容时新生成的数组,是原数组的2倍
private transient volatile Node<K,V>[] nextTable;
//用来计算整个表的size,和longadder里面的base一致
private transient volatile long baseCount;
//用来控制表初始化,当为负数值。各种状态下这个int的值如下:
//初始化的:-1
//resize:-(1 + 被激活的参与resize的数量)
//表为空:表的initial size,如果没有这个值则为0
//初始化后:该表下次应该resize的值,等于当前表的size的0.75倍
private transient volatile int sizeCtl;
//当resizing的时候下一个tab下标索引值(当前值+1)
private transient volatile int transferIndex;
//当resize和创建counterCells的时候的自选锁,和longadder一致
private transient volatile int cellsBusy;
//和longadder的cells一致
private transient volatile CounterCell[] counterCells;
了解来其内部重要的属性和一些字段的含义,接下来我们要看看,这个容器最重要的组成结构Node,它是一个最核心的内部类,它包装了key-value键值对,所有插入ConcurrentHashMap的数据都包装在这里面,类似于一个HashMap,请看示例代码2-38。
代码清单2-38 ConcurrentHashMap.java
//Node节点定义,可以看出Node是一个键值对
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
//ConcurrentHashMap中的HashEntry相对于HashMap中的Entry有一定的差异性:HashEntry中的value以及next都被volatile修饰,这样在多线程读写过程中能够保持它们的可见性。
volatile V val;
volatile Node<K,V> next;
Node(int hash, K key, V val) {
this.hash = hash;
this.key = key;
this.val = val;
}
Node(int hash, K key, V val, Node<K,V> next) {
this(hash, key, val);
this.next = next;
}
public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString() {
return Helpers.mapEntryToString(key, val);
}
//不允许直接setValue
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}
// 它增加了find方法辅助map.get()方法。
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}
这个Node内部类与HashMap中定义的Node类很相似,但是有一些差别:
1. 它对value和next属性设置了volatile同步锁。
2. 它不允许调用setValue方法直接改变Node的value域。
3. 它增加了find方法辅助map.get()方法。
我们除了Node,还需要了解其他重要的内部类,由于篇幅原因,内部类的代码省略,大家在源码中自行查看。
代码清单2-39 ConcurrentHashMap.java
//当链表长度过长的时候,会转换为TreeNode,是把这些结点包装成TreeNode放在TreeBin对象中,由TreeBin完成对红黑树的包装。在红黑树结构中实际存放数据的节点
static final class TreeNode<K,V> extends Node<K,V> {……}
//存储对红黑树节点的引用。他是一个对象,包装的很多TreeNode节点,它代替了TreeNode的根节点,也就是说在实际的ConcurrentHashMap“数组”中,存放的是TreeBin对象,而不是TreeNode对象。
static final class TreeBin<K,V> extends Node<K,V> {……}
//一个用于连接两个table的节点类。它包含一个nextTable指针,用于指向下一张表。而且这个节点的key value next指针全部为null,它的hash值为-1. 这里面定义的find的方法是从nextTable里进行查询下一个节点,而不是以自身为头节点进行查找下一个节点,可以比喻成两个车厢之间的链子。
static final class ForwardingNode<K,V> extends Node<K,V> {……}
//占位加锁节点,执行某个方法时,对其加锁。
static final class ReservationNode<K,V> extends Node<K,V> {……}
根据以上我们对ConcurrentHashMap内部重要对属性和内部类的初步认识,可以对ConcurrentHashMap整体的结构分析,整理出图片,如图2-23所示。
图2-23 ConcurrentHashMap数据结构
图2-23示例中,左侧一列数据我们叫他table,这也是源码中定义的,这个table的数据存储分2中情况,这和JDK8中HashMap存储结构是一样的,分别是链表和红黑树。当且仅当槽内元素个数增加到8个,并且table的容量已经扩容到大于等于64时,节点链表转为红黑树,当某个槽内元素个数减少到6个时,由红黑树转回链表。链表转为红黑树需要2个条件都满足才可以,如果容量没有达到64时,而链表大于8个元素,只会扩容不会转为红黑树,扩容后元素将会重新排列。
下面介绍ConcurrentHashMap核心函数分析。和HashMap一样,我们最常用的就是put()和get(),首先我们从put()讲起。向ConcurrentHashMap存入元素使用put()方法,put()内部调用putVal()函数,这里才是存储一个元素核心的内容,对于putVal函数的流程大体如下。
① 判断存储的key、value是否为空,若为空,则抛出异常,否则,进入步骤②
② 计算key的hash值,随后进入无限循环,该无限循环可以确保成功插入数据,若table表为空或者长度为0,则初始化table表,否则,进入步骤③。
③ 根据key的hash值取出table表中的结点元素,若取出的结点为空(该桶为空),则使用CAS将key、value、hash值生成的结点放入桶中。否则,进入步骤④。
④ 若该结点的的hash值为MOVED,则对该桶中的结点进行转移,否则,进入步骤⑤
⑤ 对桶中的第一个结点(即table表中的结点)进行加锁,对该桶进行遍历,桶中的结点的hash值与key值与给定的hash值和key值相等,则根据标识选择是否进行更新操作(用给定的value值替换该结点的value值),若遍历完桶仍没有找到hash值与key值和指定的hash值与key值相等的结点,则直接新生一个结点并赋值为之前最后一个结点的下一个结点。进入步骤⑥
⑥ 若binCount值达到红黑树转化的阈值,则将桶中的结构转化为红黑树存储,最后,增加binCount的值。
我们把上述步骤转换称流程图如2-24所示。
图 2-24 ConcurrentHashMap存储元素流程图
接下来我们来看一下get()函数,无论在HashMap还是ConcurrentHashMap中取元素的操作是非常简单的,下面我们结合源码分析get()函数,如示例代码2-40所示。
代码清单2-40 ConcurrentHashMap.java
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 计算key的hash值
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 表不为空并且表的长度大于0并且key所在的桶不为空
if ((eh = e.hash) == h) {
// 表中的元素的hash值与key的hash值相等
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
// 键相等,返回值
return e.val;
}
else if (eh < 0)// 结点hash值小于0
// 在桶(链表/红黑树)中查找
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {// 对于结点hash值大于0的情况
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
get函数根据key的hash值来计算在哪个桶中,再遍历桶,查找元素,若找到则返回该结点,否则,返回null。
最后我们来聊聊ConcurrentHashMap的size()、mappingCount()函数,这两个函数都是用来统计ConcurrentHashMap元素的数量,唯一的区别就是,mappingCount()返回值支持long类型,如果当前容器元素大于Integer的最大值,使用size()函数只会返回Integer.MAX_VALUE,如果使用mappingCount()就会返回真实的数量,mappingCount()函数也是JDK8之后推荐使用的。无论是size()函数还是mappingCount()函数,核心统计的方法都是sumCount()。ConcurrentHashMap 提供了 baseCount、counterCells 两个辅助变量和一个CounterCell 辅助内部类。sumCount()就是迭代 counterCells来统计sum的过程。put 操作时,肯定会影响size(),在put()方法最后会调用addCount()方法。在addCount()方法中,如果counterCells == null, 则对 baseCount 做 CAS 自增操作。如果并发导致 baseCount CAS 失败了使用 counterCells。如果counterCells CAS 失败了,在 fullAddCount 方法中,会继续死循环操作,直到成功。ConcurrentHashMap在统计元素的数量大费周折,然而最后的结果还是一个估计值,多线程同时插入和移除时,统计数量并不是一定准确的,如果在统计容器元素数量的时候阻塞一切的插入和移除方法,返回的结果应该是准确的,当然ConcurrentHashMap就是一个支持并发高性能容器,如果出现这样的操作,牺牲的就是性能,所以在设计的时候也是权衡了利弊,不能保证鱼和熊掌兼得,那就牺牲微小的准确性来提高其性能。
2.9.3 Java中的队列
队列是一种数据结构,他有先进先出的性质,这点他和栈的性质正好相反。一般使用都是在队列尾部加入元素和从队列头部移除元素,我们经常把他使用在并发环境下。队列也可以理解成生产者-消费者模式,生产者将资源缓存进队列,消费者从缓存队列中取出资源。Java中提供2种队列,一种是BlockingQueue接口为代表的阻塞队列,应用较为广泛,一种是以ConcurrentLinkedQueue为代表的非阻塞队列,性能高于阻塞队列。
我们对两种队列进行应用层面上的分析,首先我们要了解什么是阻塞队列。
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
我们看一下BlockingQueue提供的几种方法,如下表2-15所示。
表2-15 阻塞队列提供的几种方法
方法\处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(e)队列未满时,返回true;队列满则抛出。 | offer(e)队列未满时,返回true;队列满时返回false。非阻塞立即返回。 | put(e)队列未满时,直接插入没有返回值;队列满时会阻塞等待,一直等到队列未满时再插入。 | offer(e,time,unit)设定等待的时间,如果在指定时间内还不能往队列中插入数据则返回false,插入成功返回true。 |
移除方法 | remove()队列不为空时,返回队首值并移除;队列为空时抛出。 | poll()队列不为空时返回队首值并移除;队列为空时返回null。非阻塞立即返回。 | take()队列不为空返回队首值并移除;当队列为空时会阻塞等待,一直等到队列不为空时再返回队首值。 | poll(time,unit)设定等待的时间,如果在指定时间内队列还未孔则返回null,不为空则返回队首值。 |
检查方法 | element() | peek() | 不可用 | 不可用 |
注:异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException("Queue full")异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。
下面我们来了解在JDK7中一共提供了7个阻塞队列,分别是。
1. ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列,在创建对象时必须制定容量大小,支持公平锁和非公平锁。
2. LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列,此队列的默认长度为Integer.MAX_VALUE,按照先进先出的顺序进行排序。
3. PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列,默认自然序进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证同优先级元素的顺序。
4. DelayQueue:一个使用优先级队列实现的无界阻塞队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素DelayQueue可以运用在以下应用场景:1.缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。2.定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。)。
5. SynchronousQueue:一个不存储元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。支持公平锁和非公平锁(Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收)。
6. LinkedTransferQueue:一个由链表结构组成的无界阻塞队列,相当于其它队列,LinkedTransferQueue队列多了transfer和tryTransfer方法。
7. LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列,队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半。
下面来探讨阻塞队列的实现原理,本文以ArrayBlockingQueue为例,其他阻塞队列实现原理可能和ArrayBlockingQueue有一些差别,但是大体思路应该类似,有兴趣的朋友可自行查看其他阻塞队列的实现源码。
ArrayBlockingQueue提供了3中构造函数,如下表2-16所示。
表2-15 ArrayBlockingQueue的3种构造方法
构造方法 | 描述 |
---|---|
public ArrayBlockingQueue(int capacity) | 构造指定大小的有界队列 |
public ArrayBlockingQueue(int capacity, boolean fair) | 构造指定大小的有界队列,指定为公平或非公平锁 |
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) | 构造指定大小的有界队列,指定为公平或非公平锁,指定在初始化时加入一个集合 |
代码清单2-41 ArrayBlockingQueue.java
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/**
* 序列化ID。这个类依赖于默认的序列化
*甚至对于默认序列化的项数组,即使
*它是空的。否则,它不能被声明为.,即
*这里需要。
*/
private static final long serialVersionUID = -817911632652898426L;
/** 排队的物品 */
final Object[] items;
/** 用于下一次获取、轮询、查看或删除的项目索引 */
int takeIndex;
/** 下一个put、.或add的项目索引 */
int putIndex;
/** 队列中的元素数量 */
int count;
/*
* 并发控制使用经典的两条件算法
*发现在任何教科书。
*/
/** 主锁保护所有通道 */
final ReentrantLock lock;
/** 等待获取的条件 */
private final Condition notEmpty;
/** 等待放入的条件 */
private final Condition notFull;
可以看出,ArrayBlockingQueue中用来存储元素的实际上是一个数组,takeIndex和putIndex分别表示队首元素和队尾元素的下标,count表示队列中元素的个数。lock是一个可重入锁,notEmpty和notFull是等待条件。
下面我简单的使用ArrayBlockingQueue写一个案例,使用单线程进行存入,多线程取出,如下代码2-42所示
代码清单2-42 BlockingQueueTest.java
public class BlockingQueueTest {
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue abq = new ArrayBlockingQueue(10, true);
for (int i = 1; i <= 10; i++) {
put(abq, i);
get(abq);
}
}
private static void get(ArrayBlockingQueue abq) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(new Runnable() {
@Override
public void run() {
Integer integer = null;
try {
integer = (Integer) abq.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
if (null == integer) {
System.out.println("队列是空,没有取到元素");
} else {
System.out.println("取到了一个元素是:" + integer + "---目前队列中元素个数是 :" + abq.size());
}
}
});
executorService.shutdown();
}
/**
* 添加元素
*
* @param abq
* @param i
*/
private static void put(ArrayBlockingQueue abq, int i) throws InterruptedException {
abq.put(i);
System.out.println("存入了一个元素是 " + i);
}
}
插入和取出的方法最常用的是put和take,因为只有这两个是阻塞的,使用时都需要先获取锁,没有获取到锁的线程会被挡在第⼀道大门之外 自旋拿锁,直到获取到锁。就算拿到锁了之后,也不⼀定会顺利进⾏put/take操作,需要判断队列是否可用(是否满/空),如果不可用,则会被阻塞,并释放锁。其他的方法根据实际情况而定。
运行结果如下,可以看出多线程并发也没有发生线程安全问题,而且体现出队列是先入先出。
存入了一个元素是 1
存入了一个元素是 2
取到了一个元素是:1---目前队列中元素个数是 :1
存入了一个元素是 3
取到了一个元素是:2---目前队列中元素个数是 :1
存入了一个元素是 4
取到了一个元素是:3---目前队列中元素个数是 :1
存入了一个元素是 5
取到了一个元素是:4---目前队列中元素个数是 :1
存入了一个元素是 6
取到了一个元素是:5---目前队列中元素个数是 :1
存入了一个元素是 7
取到了一个元素是:6---目前队列中元素个数是 :1
存入了一个元素是 8
取到了一个元素是:7---目前队列中元素个数是 :1
存入了一个元素是 9
取到了一个元素是:8---目前队列中元素个数是 :0
存入了一个元素是 10
取到了一个元素是:9---目前队列中元素个数是 :1
取到了一个元素是:10---目前队列中元素个数是 :0
阻塞队列介绍完毕,阻塞队列使用put/take方法可以实现在队列已满或空的时候达到线程阻塞状态,阻塞这种方式在线程并发时固然安全,但是也会造成效率上的问题,所以说今天我们来讲一个非阻塞队列——ConcurrentLinkedQueue,他能保证并发安全,而且还可以提高效率。通常 ConcurrentLinkedQueue 的性能好于 BlockingQueue。它是一个基于链接节点的无界线程安全队列。该队列的元素遵循先进先出的原则。头是最先加入的,尾是最近加入的,也就是插入的时候往尾部插,取出元素从头部取。该队列不允许null元素。
ConcurrentLinkedQueue内部是遵循CAS(比较并交换)的方式来实现。
下面介绍ConcurrentLinkedQueue几个重要方法。
首先重点说明add()和offer() 都是加入元素的方法(在ConcurrentLinkedQueue中这俩个方法没有任何区别,个别资料说不推荐使用add方法,说队列满了插入会抛异常,下面贴出源码,add方法调用的就是offer,所以这两个没区别,并且官方文档也有如下说明)
public boolean add(E e) {
return offer(e);
}
其次是poll()和peek()都是取头元素节点,区别在于前者会删除元素,后者不会。
下面我对队列的入队和出队的方法进行讲解。
1、入队
元素插入需要做两件事,第一是将入队节点设置成当前队列的最后一个节点。第二是更新tail节点,如果原来的tail节点的next节点不为空,则将tail更新为刚入队的节点(即队尾结点),如果原来的tail节点为空,则tail节点不动,把元素插入到tail的next节点处。也就是说每次tail移动都要隔着一个节点。请看如下2-43源码。
代码清单2-43 ConcurrentLinkedQueue.java
public boolean offer(E e) {
//首先入队的对象不允许为null
checkNotNull(e);
//入队前,创建一个入队节点,构造一个内部函数
final Node<E> newNode = new Node<E>(e);
//死循环,入队不成功反复入队。
for (Node<E> t = tail, p = t;;) {
//创建一个指向tail节点的引用
Node<E> q = p.next;
//如果q=null说明p是尾节点则插入
if (q == null) {
// cas插入
if (p.casNext(null, newNode)) {
//cas成功说明新增节点已经被放入链表,然后设置当前尾节点
if (p != t) // 一次跳两个节点
casTail(t, newNode); // 失败是可以的.
return true;
}
// 丢失的CAS与另一线程竞争;重新读取下一个
}
else if (p == q)
//多线程操作时候,由于poll时候会把老的head变为自引用,然后head的next变为新head,所以这里需要
//重新找新的head,因为新的head后面的节点才是激活的节点
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}
private static void checkNotNull(Object v) {
if (v == null)
throw new NullPointerException();
}
2、出队
不是每次出队时都更新head节点,当head节点里有元素时,直接弹出head节点里的元素,而不会更新head节点。只有当head节点里没有元素时,则弹出head的next结点并更新head结点为原来head的next结点的next结点。
poll操作是在链表头部获取并且移除一个元素,下面看看实现原理,如示例代码2-44所示。
代码清单2-44 ConcurrentLinkedQueue.java
public E poll() {
restartFromHead:
//死循环
for (;;) {
//死循环
for (Node<E> h = head, p = h, q;;) {
//保存当前节点值
E item = p.item;
//当前节点有值则cas变为null
if (item != null && p.casItem(item, null)) {
//cas成功标志当前节点以及从链表中移除
if (p != h) // 类似tail间隔2设置一次头节点
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
//当前队列为空则返回null
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
//自引用了,则重新找新的队列头节点
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
观察入队和出队的源码可以发现,无论入队还是出队,都是在死循环中进行的,也就是说,当一个线程调用了入队、出队操作时,会尝试获取链表的tail、head结点进行插入和删除操作,而插入和删除是通过CAS操作实现的,而CAS具有原子性。
故此,如果有其他任何一个线程成功执行了插入、删除都会改变tail/head结点,那么当前线程的插入和删除操作就会失败,则通过循环再次定位tail、head结点位置进行插入、删除,直到成功为止。
也就是说,ConcurrentLinkedQueue的线程安全是通过其插入、删除时采取CAS操作来保证的。不会出现同一个tail结点的next指针被多个同时插入的结点所抢夺的情况出现。
对于该队列来说size()函数是通过遍历整个集合,因此在队列容量较大时会有一定效率问题,如果只是查看队列是否为空可以使用isEmpty()函数来代替size()。
此外,如果在执行期间添加或删除元素。对于size()方法,返回的结果可能不准确。因此,此方法在并发时通常不太有用。
胖虎
热 爱 生 活 的 人
终 将 被 生 活 热 爱
我在这里等你哟!