public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
//该方法不会阻塞,该方法只是尝试获取而已,获取失败后就会调用doAcquireShared,
//doAcquireShared方法如果获取不到锁则对应线程会被阻塞
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
//readerShouldBlock公平锁、非公平锁处理方式不同
//公平锁会考虑前面是不是有已经等待的线程(不管是获取读锁还是写锁)
//非公平锁则只关注第一个等待节点是不是获取写锁
//如果有几个线程同时竞争,可能导致compareAndSetState失败进入fullTryAcquireShared
//自旋获取到锁
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
//执行下面分支条件:
//1.当前线程曾经获取到读锁并释放了,再次获取读锁,且期间没有任何其他线程获取
//读锁!!!
//2.该线程不是第一个获取到读锁的线程,即在该线程获取到读取之前有其他线程已经
//获取到读锁,而且仍没有退出!!
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
针对公平锁而言,readerShouldBlock会判断前面是不是有等待的节点(不管是不是互斥锁),而针对非公平锁而言则是判断同步队列第一个节点是不是互斥锁(写锁)。
下面看下公平锁以及非公平锁该方法的实现:
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
//公平锁则会按照FIFO的规则来办事
return hasQueuedPredecessors();
}
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
//非公平锁则是需要判断同步队列第一个节点是不是写锁,这样做的目的是为了
//避免写锁被饿死的现象,因为写锁跟读写锁都是互斥,但是读锁则只跟写锁互斥
return apparentlyFirstQueuedIsExclusive();
}
}
cachedHoldCounter保存的是最后一个成功获取到锁的线程,该变量被赋值只会出现在r != 0 && firstReader != current的情况,也就是说当前读线程成功获取到共享锁时已经有一个读锁被获取到,而且之前线程获取到的读锁尚未被释放!!
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
下面分析下上面代码的执行流程,刚开始进入这部分代码时cachedHoldCounter为null,所以就会执行
cachedHoldCounter = rh = readHolds.get();
下面看下readHolds的定义
private transient ThreadLocalHoldCounter readHolds;
这是一个ThreadLocalHoldCounter的对象,ThreadLocalHoldCounter类到底是啥:
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}
最开始时readHolds也没有值,readHolds.get()方法会导致ThreadLocalHoldCounter类initialValue方法执行,也就会返回count=0以及tid为当前线程tid的属性。当下次该线程再次获取读锁时将会执行
else if (rh.count == 0)
readHolds.set(rh);
tryAcquireShared如果初步尝试获取共享锁失败,则会进入fullTryAcquireShared方法自旋获取读共享锁,下面看下fullTryAcquireShared方法:
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) {
int c = getState();
//存在写互斥锁且互斥锁的拥有者不是当前线程则放弃尝试自旋
if (exclusiveCount(c) != 0) {
//如果没有下面的判断,试想这么一种情况,一个线程先获取到写锁,然后
//未释放写锁之前该线程又试图获取读锁,如果没有
//getExclusiveOwnerThread() != current这个条件判断直接返回-1,那么
//获取到写锁的线程将进入睡眠无法唤醒,也就是传说中的死锁状态
if (getExclusiveOwnerThread() != current)
return -1;
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
上面是tryAcquireShared部分代码,也就是尝试获取读锁,当尝试失败后将进入doAcquireShared获取读锁,下面看下doAcquireShared该方法内部到底做了些什么:
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//如果自身节点不是第一个等待节点则直接竞争失败,但这不代表自身不能获取到锁
//这个要归功于setHeadAndPropagate,如果第一个节点不是自身节点,但是第一个节点
//是其他读节点,那么当第一个读节点获取到读锁以后会通过setHeadAndPropagate唤醒
//后面其他读节点
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//成功获取到读共享锁,那么将唤醒所有其他读锁,不唤醒写互斥锁线程!!!
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//获取读锁失败,进入阻塞等待被唤醒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//当前节点下一节点是获取共享锁则唤醒,不抢写锁的饭碗,否则可能会导致获取写锁的线程饿
//死的情况,因为该方法是获取到读锁后调用的,考虑到读写锁互斥所以如果下一个等待节点是
//准备获取写锁,则不唤醒后续节点,反之则唤醒
if (s == null || s.isShared())
doReleaseShared();
}
}