CountDownLatch 是 Android 平台中常用的线程同步工具类,它可以让一个或多个线程等待其他线程完成某个任务后再继续执行。它通过一个计数器来实现,计数器的初始值可以设置为一个正整数,每当一个线程完成任务后,计数器的值会递减 1。当计数器的值递减到 0 时,等待的线程才会被唤醒,继续执行后续的操作。
CountDownLatch 经常用于以下场景:
CountDownLatch 的实现原理基于 AQS(AbstractQueuedSynchronizer)同步器。AQS 是 Java 并发编程中常用的同步器框架,它提供了很多用于实现线程同步的机制,例如锁、信号量、屏障等。
CountDownLatch 内部维护了一个计数器变量 state
,以及一个 AQS 队列。当 CountDownLatch 对象被创建时,state
变量会被初始化为指定的值。调用 countDown()
方法会使 state
变量的值递减 1。调用 await()
方法的线程会尝试获取 AQS 队列的锁,如果 state
变量的值为 0,则表示所有等待的线程都已经完成任务,AQS 队列会释放锁,唤醒所有等待的线程。如果 state
变量的值不为 0,则调用 await()
方法的线程会进入 AQS 队列等待。
下面,我们将深入其源码,分析其核心实现细节。
CountDownLatch
使用了一个名为 Sync
的内部类来继承 AbstractQueuedSynchronizer
(AQS),这是处理锁和同步器的一个框架。Sync
用一个单一的非负整数来表示状态,这个状态就是剩余需要等待的事件数量。
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
}
在这段代码中,构造函数接受初始计数作为参数,并通过 setState()
方法设置 AQS 的状态值。
countDown()
方法在调用时减少计数器的值。当计数器达到零时,释放所有等待的线程。
public void countDown() {
sync.releaseShared(1);
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
这里,releaseShared()
方法调用导致 tryReleaseShared()
被执行。tryReleaseShared()
尝试原子地减少状态值,并在计数到达零时返回 true
,这会导致等待在 CountDownLatch
上的线程被唤醒。
线程调用 await()
方法等待计数器达到零。这是通过 AQS 来实现阻塞和后续的唤醒。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
在这段代码中,await()
调用了 acquireSharedInterruptibly()
,这会导致 tryAcquireShared()
的执行。如果 getState()
返回 0,表示没有更多的事件需要等待,方法返回 1,允许线程继续执行。如果不是 0,则返回 -1,表示线程应该被阻塞。
由于 CountDownLatch
的实现依赖于高效的 AQS 框架,其性能通常很高。但是,CountDownLatch
是一次性的,计数器达到零后不能被重置。如果需要一个可重置的版本,可以考虑使用 CyclicBarrier
或 Semaphore
。
CountDownLatch
特别适用于一种情况:一个线程必须等待其他几个线程完成某些操作后,才能继续执行,例如主线程等待初始化线程加载完毕后继续执行。
这种机制在多线程编程中非常有用,尤其是在处理复杂的依赖关系和执行顺序时。
AbstractQueuedSynchronizer
(AQS)使用了一个内部的 FIFO 队列来管理所有等待获取资源的线程,并通过一个单一的整型变量来表示同步状态。
AQS 的核心在于管理同步状态(state)和管理线程之间的排队等待。它提供了两种模式的同步机制:
ReentrantLock
。Semaphore
和 CountDownLatch
就是采用的共享模式。AQS 内部使用一个名为 Node
的静态内部类来实现同步队列,每个节点(Node)可能代表一个正在等待获取资源的线程。节点会根据线程的等待状态被构造为不同的类型(独占、共享、条件等)。
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled. */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking. */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition. */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate.
*/
static final int PROPAGATE = -3;
...
}
同步状态是通过一个整型的变量来管理的,具体的含义由实现 AQS 的同步器来定义。例如,在 ReentrantLock
中,它表示锁的持有计数;而在 Semaphore
中,它表示当前可用的许可数。
/**
* The synchronization state.
*/
private volatile int state;
使用 CAS(Compare-And-Swap)操作来修改状态,这是一种无锁的同步机制,能有效地减少锁的开销。例如:
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
CountDownLatch
的 await()
方法可以在指定的时间内等待,直到计数器减到零。当使用带有超时功能的 await()
时,内部实际上调用的是 AQS
的 doAcquireSharedNanos
方法。这个方法是 AQS 提供的共享模式下的获取方式,允许线程以纳秒为单位等待直到获取成功,或者直到超时发生。
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) {
cancelAcquire(node);
return false;
}
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
addWaiter(Node.SHARED)
将当前线程包装成一个 Node
(共享模式)并加入到等待队列中。tryAcquireShared
方法获取资源。tryAcquireShared
返回一个非负值,说明成功获取了共享资源。随后通过 setHeadAndPropagate
将当前节点设置为头节点,并向后传播(可能唤醒后续等待的节点),然后退出循环。false
。LockSupport.parkNanos
来挂起线程。cancelAcquire
方法取消该节点的获取请求。doAcquireSharedNanos
方法体现了 AQS 的设计精髓:将线程以节点形式组织在一个双向队列中,通过细粒度的锁(这里是共享锁)和高效的线程调度(挂起和唤醒)机制来实现同步控制。此外,这种方法还兼顾了超时机制,使得线程不会无限期地等待资源。在 CountDownLatch
中,这一机制用于确保线程可以在指定时间内等待其他操作的完成。
CountDownLatch 的主要使用方法如下:
CountDownLatch latch = new CountDownLatch(count);
其中 count
表示需要等待的线程数。
latch.await();
调用 await()
方法的线程会阻塞,直到 state
变量的值递减到 0。
latch.countDown();
调用 countDown()
方法会使 state
变量的值递减 1。
以下示例演示了如何使用 CountDownLatch 等待多个子线程完成任务后再执行主线程任务:
import java.util.concurrent.CountDownLatch
class AppInitializer {
fun initializeApplication() {
// 假设我们有三个数据源需要加载
val latch = CountDownLatch(3)
// 模拟从网络加载数据
Thread {
try {
loadFromNetwork()
println("Network data loaded")
} finally {
latch.countDown()
}
}.start()
// 模拟从数据库加载数据
Thread {
try {
loadFromDatabase()
println("Database data loaded")
} finally {
latch.countDown()
}
}.start()
// 模拟从文件系统加载数据
Thread {
try {
loadFromFileSystem()
println("File system data loaded")
} finally {
latch.countDown()
}
}.start()
// 等待所有数据源加载完成
latch.await()
println("All data sources have been loaded. Application is ready to be used.")
}
private fun loadFromNetwork() {
// 模拟网络延迟
Thread.sleep(2000)
}
private fun loadFromDatabase() {
// 模拟数据库访问延迟
Thread.sleep(1500)
}
private fun loadFromFileSystem() {
// 模拟文件读取延迟
Thread.sleep(1000)
}
}
本文详细介绍了CountDownLatch的原理和使用方法,并提供了一个使用示例和一些应用场景。希望本文能够帮助你更好地理解和使用CountDownLatch。