AQS:AbstructQueuedSynchronizd(抽象队列同步器),出现在 JDK 1.5 中。他是很多同步器的基础框架,比如 ReentrantLock、CountDownLatch 和 Semaphore 等都是基于 AQS 实现的。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
}
AQS内部,维护了一个FIFO队列和一个volatile的int类型的state变量。在state=1的时候表示当前对象锁已经被占有了,state变量的值修改的动作通过CAS来完成。
这个FIFO队列用来实现多线程的排队工作,他本质上是一个双向链表,因为他定义了两个Node,一个prev,一个next,这就是典型的双向链表。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
volatile Node prev;
/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
volatile Node next;
}
可以看到,采用的是双向链表而不是单向链表,那为什么采用双向链表呢?首先要搞懂这个问题,我们首先要得知道什么是双向链表,什么是单向链表。
双向链表与单向链表的区别 链表是线性表的一种,所谓的线性表包含顺序线性表和链表,顺序线性表是用数组实现的,在内存中有顺序排列,通过改变数组大小实现。而链表不是用顺序实现的,用指针实现,在内存中不连续。意思就是说,链表就是将一系列不连续的内存联系起来,将那种碎片内存进行合理的利用,解决空间的问题。 所以,链表允许插入和删除表上任意位置上的节点,但是不允许随即存取。链表有很多种不同的类型:单向链表、双向链表及循环链表。
单向链表包含两个域,一个是信息域,一个是指针域。也就是单向链表的节点被分成两部分,一部分是保存或显示关于节点的信息,第二部分存储下一个节点的地址,而最后一个节点则指向一个空值。
双向链表每个节点有2个链接,一个是指向前一个节点(当此链接为第一个链接时,指向的是空值或空列表),另一个则指向后一个节点(当此链接为最后一个链接时,指向的是空值或空列表)。意思就是说双向链表有2个指针,一个是指向前一个节点的指针,另一个则指向后一个节点的指针。
所以,相比于单向链表,双向链表存在以下优势:
双向遍历:双向链表可以很方便地正向或反向遍历,这在需要双向访问数据时非常有用。 高效的节点插入和删除:在任意位置插入或删除节点时,双向链表可以更为高效地完成操作。因为可以直接访问前一个节点,所以不需要像在单向链表中那样遍历找到前一个节点。 灵活的数据操作:双向链表支持更加灵活的数据操作,如从列表尾部添加或移除元素,以及在给定节点前后插入新节点等。
为啥AQS使用双向链表
在AQS的源码可以看出,很多方法的遍历都是从FIFO队列的尾部开始的
public final int getQueueLength() {
int n = 0;
for (Node p = tail; p != null; p = p.prev) {
if (p.thread != null)
++n;
}
return n;
}
/**
* Returns a collection containing threads that may be waiting to
* acquire. Because the actual set of threads may change
* dynamically while constructing this result, the returned
* collection is only a best-effort estimate. The elements of the
* returned collection are in no particular order. This method is
* designed to facilitate construction of subclasses that provide
* more extensive monitoring facilities.
*
* @return the collection of threads
*/
public final Collection<Thread> getQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
return list;
}
/**
* Returns a collection containing threads that may be waiting to
* acquire in exclusive mode. This has the same properties
* as {@link #getQueuedThreads} except that it only returns
* those threads waiting due to an exclusive acquire.
*
* @return the collection of threads
*/
public final Collection<Thread> getExclusiveQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
if (!p.isShared()) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
}
return list;
}
/**
* Returns a collection containing threads that may be waiting to
* acquire in shared mode. This has the same properties
* as {@link #getQueuedThreads} except that it only returns
* those threads waiting due to a shared acquire.
*
* @return the collection of threads
*/
public final Collection<Thread> getSharedQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
if (p.isShared()) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
}
return list;
}
这几个方法都是从尾部开始遍历的,在高并发环境中,AQS中的队列的头部一定是最频繁访问和修改的区域,因为头部节点是释放同步状态或是被唤醒的线程的首选位置。而把一些不太重要的操作,比如获取排队的线程、获取队列长度等操作,从尾部开始遍历可以减少在头部节点上的竞争,尤其是在执行那些不需要立即修改头部节点状态的遍历操作时。因此也可以减少并发对头部节点的竞争
在AQS中,提供了两种基本的获取同步状态的方式,分别对应于支持中断和不支持中断的场景。即acquire和acquireInterruptibly。
acquireInterruptibly这个方法允许线程在做资源竞争时中断。
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* Acquires in exclusive mode, aborting if interrupted.
* Implemented by first checking interrupt status, then invoking
* at least once {@link #tryAcquire}, returning on
* success. Otherwise the thread is queued, possibly repeatedly
* blocking and unblocking, invoking {@link #tryAcquire}
* until success or the thread is interrupted. This method can be
* used to implement method {@link Lock#lockInterruptibly}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
在执行到doAcquireInterruptibly方法时,继续跟踪代码可知,他的实现如下:
/**
* Acquires in exclusive interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
在这个方法的finally中执行了一个cancelAcquire的动作,也就是说在acquire的过程中,如果出现了线程中断异常,那么就会执行这个方法cancelAcquire,他的主要作用就是将中断的线程节点从AQS同步队列中移除。
而涉及到移除,那么就适合双向链表了,双向链表的每个节点都有prev和next的引用,可以更高效地从队列中移除某个特定的节点(如a b c d e,我要删除的时候,直接找到d的prev也就是c,再找到d的next也就是e,然后把c的next设置为e,e的prev设置为c,就完成了对d的删除)。如果是单向链表就需要从头遍历链表来寻找待删除节点的前驱节点。
AQS中有一个方法,isQueued(Thread thread),他的目的是确定传入的thread线程是否正在AQS的等待队列中等待获取同步状态。
/**
* Returns true if the given thread is currently queued.
*
* <p>This implementation traverses the queue to determine
* presence of the given thread.
*
* @param thread the thread
* @return {@code true} if the given thread is on the queue
* @throws NullPointerException if the thread is null
*/
public final boolean isQueued(Thread thread) {
if (thread == null)
throw new NullPointerException();
for (Node p = tail; p != null; p = p.prev)
if (p.thread == thread)
return true;
return false;
}
该方法的作用就是从队列的队尾开始向前遍历,检查节点所代表的线程是不是传入的线程。
这里之所以从后向前遍历,而没有选择从前向后遍历,主要是因为,在某些场景下(尤其是在使用公平锁时),新加入的线程会被添加到队列尾部,从尾部开始可能更快地找到最近加入的线程。从后往前便利确实可能提前返回的概率更大一些,因为需要从后往前遍历,那么就必须得是双向链表,只有双向链表才能方便的获取到前驱节点。
当一个线程尝试获取同步状态失败后(例如,尝试获取一个已被其他线程持有的锁),它需要决定接下来的行动。如果直接进入忙等(busy-wait)状态,这将会大量消耗CPU资源,因为线程会在一个循环中不断尝试获取锁,而没有实际的进展。在AQS中可以将线程进行挂起(park)
在执行挂起之前,会通过shouldParkAfterFailedAcquire方法通过检查等待队列中的节点状态来决定一个线程是否应该被挂起。
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//如果ws为Node.SIGNAL(值为-1),
//这意味着前驱节点已经处于等待状态,期望在释放同步状态时唤醒后继节点。
//在这种情况下,方法可以直接返回true,指示当前线程可以安全地挂起。
return true;
if (ws > 0) {
//如果ws大于0,说明前驱节点已被取消。
//此时,循环向前遍历等待队列,跳过所有已取消的节点,直到找到一个未被取消的节点作为新的前驱节点,并更新相应的链接。
//这个过程是为了维护等待队列的健康状态,移除其中的无效节点。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果前驱节点的等待状态不是SIGNAL(也即,它是0或PROPAGATE),
//则将前驱节点的等待状态更新为SIGNAL。
//这是通过compareAndSetWaitStatus方法完成的,它安全地修改节点的状态,以指示当前节点(node)在释放同步状态时需要被唤醒。
//这里并不立即挂起当前线程,而是返回false,让调用者知道它需要再次尝试获取同步状态,在无法获取时再决定挂起。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
在这个过程中,需要检查当前节点的前驱节点,判断他的状态是不是已经处于等待状态(Node.SIGNAL),如果前驱节点已经是等待状态了,那么自己这个节点也就可以安全的被挂起了。
所以,这里需要获取一个节点的前驱节点,那么就需要用双向链表了,直接获取这个节点的prev就行了。如果是单链表,就得从头开始遍历。
同步队列和条件队列是AQS中的两种不同队列,同步队列主要用于实现锁机制,而条件队列用于线程间的协调和通信
条件队列用于实现条件变量,允许线程在特定条件不满足时挂起,直到其他线程改变了条件并显式唤醒等待在该条件上的线程。比较典型的一个条件队列的使用场景就是ReentrantLock的Condition。
条件队列与同步队列不同,它是基于Condition接口实现的,用于管理那些因为某些条件未满足而等待的线程。当条件满足时,这些线程可以被唤醒。每个Condition对象都有自己的一个条件队列。
ConditionObject是AQS的一个内部类,用于实现条件变量。条件变量是并发编程中一种用于线程间通信的机制,它允许一个或多个线程在特定条件成立之前等待,同时释放相关的锁。这在某种程度上类似于对象监视器模式中的wait()和notify()方法,但提供了更灵活和更强大的控制。
它的主要实现原理:
当线程调用了Condition的await()方法后,它会释放当前持有的锁,并且该线程会被加入到条件队列中等待。 然后线程会处于等待状态,直到其他线程调用signal()或signalAll()方法来通知条件可能已经满足。