StampedLock,这个类和其他锁的类,都不相同。下面一起来学习下这个类。
What is StampedLock
记得在学习AtomicStampedReference 时,里面是用一个标志stamp,来解决ABA问题。即通过版本号,来检验是否保持一个相对静止的状态,从而能够保证更加线程安全的读写。
特性
这种锁,提供了一种新的锁,乐观读。看里面三种方式:
写锁,排他性,阻塞读。
普通读锁,对读共享,对写排他, 读少写多
乐观读,对读共享,对写不排他,允许写。写少读多
如果普通读的时候,可能会有这样一种情况,当读很多的时候,那就会造成获取写锁线程饥饿,一直无法获取写。
所以在StampedLock里面,就提供了一种乐观读的机制,在读的时候如果发生了写,则应当重读而不是在读的时候直接阻塞写!
这次先以一个Java doc的例子,来具体理解下StampedLock的三种锁。
Java doc的例子
/**
* 坐标x,y,一个修改,一个读。
*/
class Point {
private double x, y;
private final StampedLock sl = new StampedLock();
void move(double deltaX, double deltaY) {
// 写锁,返回一个stamp
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
//finally块中,利用stamp,释放写锁
sl.unlockWrite(stamp);
}
}
double distanceFromOrigin() {
// 只读的,乐观读
long stamp = sl.tryOptimisticRead();
double currentX = x, currentY = y;
//再次检验这个stamp,看是否被写锁更改
if (!sl.validate(stamp)) {
//更改了,那么久尝试阻塞性去获取读锁
//所以currentX和currentY,也可能变化,重新读一遍
stamp = sl.readLock();
try {
currentX = x;
currentY = y;
} finally {
//用完了,释放读锁
sl.unlockRead(stamp);
}
}
//最后执行返回,可以是乐观读后的数据,或者是普通读后的数据。
return Math.sqrt(currentX * currentX + currentY * currentY);
}
void moveIfAtOrigin(double newX, double newY) {
// Could instead start with optimistic, not read mode
long stamp = sl.readLock();
try {
//没有改成功,就一直阻塞。
while (x == 0.0 && y == 0.0) {
//在stamp的基础上,尝试去获取读锁。
long ws = sl.tryConvertToWriteLock(stamp);
if (ws != 0L) {
//成功获取到了写锁。
stamp = ws;
x = newX;
y = newY;
break;
}
else {
//一次转化写锁失败,那么就释放读锁,阻塞性的去获取写锁
sl.unlockRead(stamp);
stamp = sl.writeLock();
}
}
} finally {
//释放写锁
sl.unlock(stamp);
}
}
}
如上述代码,官方的例子还是很全面的,把普通读,乐观读,写锁,以及锁转化都囊括了进去。使用起来还是很简单的。
接下来就结合源码来分析StampedLock实现机制。
StampedLock概括
先来看StampedLock的定义头:
定义:
public class StampedLock implements java.io.Serializable
如上,它并没有实现Lock或者ReadWriteLock接口,因为两个接口并不能完全满足Stamped的需求。
其次,它里面没有用到AbstractQueuedSynchronizer,但是是基于AQS这种思想的,里面也是通过一个CLH queue,来实现线程调度的,并且每一个线程用一个节点来封装,利用Unsafe的park以及unpark来进行挂起和唤醒操作,先看看WNode:
WNode:
static final class WNode {
volatile WNode prev;
volatile WNode next;
/**
* list of linked readers,链接这一条读锁的节点。
*/
volatile WNode cowait;
/**
* 当前线程,// non-null while possibly parked //当被parked的时候,不为null。
*/
volatile Thread thread;
/**
* 0, WAITING, or CANCELLED 0代表等待waiting,否则是取消。
*/
volatile int status;
/**
* RMODE or WMODE 等待写还是等待读。
*/
final int mode;
WNode(int m, WNode p) { mode = m; prev = p; }
}
state变量:
先看state的定义:
/** Lock sequence/state
* 序列 锁的状态
* long类型。
* 初始值第九位为1.
* */
private transient volatile long state;
64位,当处于写锁状态时,第8位为1,并且普通读时,只用前7位来表示,当超出了这7位表示的数量,在用一个int类型的readerOverflow来表示读的数量。
和Lock与ReadWriteLock的转化:
// views
transient ReadLockView readLockView;
transient WriteLockView writeLockView;
transient ReadWriteLockView readWriteLockView;
如上,定义了读锁,写锁,读写锁来吧StampedLock转化为相应的锁,原理就是对StampedLock里面相应方法的封装返回,但是对于StampedLock转化要注意一下几点:
没有重入性质
没有Condition队列支持
普通读
获取锁readLock:
public long readLock() {
long s = state, next; // bypass acquireRead on common uncontended case
return ((whead == wtail && (s & ABITS)
U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
next : acquireRead(false, 0L));
}
首先尝试一次获取,如果获取成功,则更改state变量值,获取失败则调用acquireRead进行阻塞式获取,最终结果要么是成功获取,要么进入队列阻塞。
下面看acquireRead:
private long acquireRead(boolean interruptible, long deadline) {
WNode node = null, p;
for (int spins = -1;;) {
WNode h;
//当whead=wtail时。
if ((h = whead) == (p = wtail)) {
for (long m, s, ns;;) {
//如果state中读锁,没有超过127,那么就在state上面更改,否则就在ReaderOverflow中更改。
//并且返回最终的state。
if ((m = (s = state) & ABITS)
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
(m
return ns;
else if (m >= WBIT) {
//被写着独占。来探测状态。
if (spins > 0) {
//自旋,随机减少。
if (LockSupport.nextSecondarySeed() >= 0)
--spins;
}
else {
if (spins == 0) {
//退出循环。
WNode nh = whead, np = wtail;
if ((nh == h && np == p) || (h = nh) != (p = np))
break;
}
//重新设置spins,供下次使用。
spins = SPINS;
}
}
}
}
if (p == null) { // initialize queue,初始化队列操作。
WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
}
else if (node == null)
//新建一个读的node。
node = new WNode(RMODE, p);
else if (h == p || p.mode != RMODE) {
if (node.prev != p)
node.prev = p;
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
p.next = node;
break;
}
}
else if (!U.compareAndSwapObject(p, WCOWAIT,node.cowait = p.cowait, node))
node.cowait = null;
else {
//阻塞式操作,要么阻塞放入队列,要么运行。
for (;;) {
WNode pp, c; Thread w;
if ((h = whead) != null && (c = h.cowait) != null &&
U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null) // help release
U.unpark(w);
if (h == (pp = p.prev) || h == p || pp == null) {
long m, s, ns;
do {
//尝试再次获取,读锁。
if ((m = (s = state) & ABITS)
U.compareAndSwapLong(this, STATE, s,
ns = s + RUNIT) :
(m
(ns = tryIncReaderOverflow(s)) != 0L))
return ns;
} while (m
}
if (whead == h && p.prev == pp) {
long time;
if (pp == null || h == p || p.status > 0) {
node = null; // throw away
break;
}
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime())
return cancelWaiter(node, p, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if ((h != pp || (state & ABITS) == WBIT) &&
whead == h && p.prev == pp)
U.park(false, time);
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, p, true);
}
}
}
}
for (int spins = -1;;) {
WNode h, np, pp; int ps;
//为head,要释放后继节点。
if ((h = whead) == p) {
if (spins
spins = HEAD_SPINS;
else if (spins
spins
for (int k = spins;;) { // spin at head,自旋,尝试去把头结点的后一个节点释放。
long m, s, ns;
if ((m = (s = state) & ABITS)
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
(m
WNode c; Thread w;
whead = node;
node.prev = null;
while ((c = node.cowait) != null) {
if (U.compareAndSwapObject(node, WCOWAIT,
c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
return ns;
}
else if (m >= WBIT &&
LockSupport.nextSecondarySeed() >= 0 && --k
break;
}
}
else if (h != null) {
WNode c; Thread w;
while ((c = h.cowait) != null) {
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
}
if (whead == h) {
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node; // stale
}
else if ((ps = p.status) == 0)
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
else if (ps == CANCELLED) {
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
}
else {
long time;
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime())
return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if (p.status
(p != h || (state & ABITS) == WBIT) &&
whead == h && node.prev == p)
U.park(false, time);
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, node, true);
}
}
}
}
这个方法代码有点长,就不细节一一说明,说说大概思路:
两个阻塞性循环,前一个自旋,做了更多初始化的相关工作,后一个是针对于头结点的自旋,如果符合要求则unpark后继节点。
如果最终发生中断,则需要取消获取锁,调用cancelWaiter,将node节点(即当前线程)从队列中删除,并且判断是否需要抛出异常。
两次循环中都会判断是否能够获取读锁:(m = (s = state) & ABITS)
释放读锁unstampedUnlockRead:
final void unstampedUnlockRead() {
for (;;) {
long s, m; WNode h;
if ((m = (s = state) & ABITS) == 0L || m >= WBIT)
throw new IllegalMonitorStateException();
else if (m
if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
if (m == RUNIT && (h = whead) != null && h.status != 0)
release(h);
break;
}
}
else if (tryDecReaderOverflow(s) != 0L)
break;
}
}
以上代码判断两点,如果读锁数量超过127,即用到了readerOverflow,则在readerOverflow上修改,否则在state的低7位修改。如果需要,则调用release(h) 唤醒下一个节点。
普通写
获取写锁writeLock:
public long writeLock() {
long s, next; // by pass acquireWrite in fully unlocked case only
//如果低8位都为0,就说明没有普通读锁,也没有写锁,就尝试获取
return ((((s = state) & ABITS) == 0L &&
//CAS方法,尝试加上128,也就是以第8位为基点上加。然后返回这个state,也就是stamped。
//失败进入acquireWrite
U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
next : acquireWrite(false, 0L));
}
如上,通过((s = state) & ABITS) == 0L 判断是否被写锁或者普通读占据,如果没有则尝试CAS修改,没有获取到,则调用acquireWrite 进行阻塞式调用。,这里就不多说acquireWrite了,具体和acquireRead思路差不多,只是一个读锁,一个写锁,相关标志变量不同。
释放锁unstampedUnlockWrite:
final void unstampedUnlockWrite() {
WNode h; long s;
if (((s = state) & WBIT) == 0L)
throw new IllegalMonitorStateException();
state = (s += WBIT) == 0L ? ORIGIN : s;
if ((h = whead) != null && h.status != 0)
release(h);
}
如上,在释放写锁的过程中,并没有去检查是否为当前线程,只是简单了检查了state变量是否符合要求,即是否是获取状态。最后用release唤醒下一个节点进行运行。
乐观读
乐观读并没有获取锁与释放锁,它只需要获取一个状态标志,然后在检测下这个状态标志有没有被一个写锁更改,更改就尝试普通读获取锁,否则直接运行下去就可以了。
tryOptimisticRead获取状态:
public long tryOptimisticRead() {
long s;
return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}
然后就直接进行运行,再调用validate检测是否被更改:
public boolean validate(long stamp) {
//禁止loadload排序。
U.loadFence();
//比较低7位,也就是只有读,没有写。
return (stamp & SBITS) == (state & SBITS);
}
当校验成功,则进行操作,如果校验失败,那么就调用普通读取重新操作rollback。具体可以结合上文那个例子进行理解。
锁转换
StampedLock里面,有锁转换这一种功能:
tryConvertToWriteLock:转换成写锁
tryConvertToReadLock:转换成读锁
tryConvertToOptimisticRead:转换成乐观读锁
从里面内容的分析可以知道,在StampedLock里面,乐观读并不会对state变量执行写操作,而写锁是排他的,读锁时共享的,所以在这样一种思维下,对于锁的转化则有一个大概的认识了。就是通过判断以及写入state变量,从而改变锁的性质。
例如对于tryCoonvertToWriteLock:
public long tryConvertToWriteLock(long stamp) {
long a = stamp & ABITS, m, s, next;
while (((s = state) & SBITS) == (stamp & SBITS)) {
if ((m = s & ABITS) == 0L) {
//检测到读锁未被占有
if (a != 0L)
//读锁中途又已经被占有,那么久不能再进行获取了。
break;
if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT))
return next;
}
else if (m == WBIT) {
if (a != m)
break;
return stamp;
}
else if (m == RUNIT && a != 0L) {
if (U.compareAndSwapLong(this, STATE, s,
next = s - RUNIT + WBIT))
return next;
}
else
break;
}
return 0L;
}
如上所示,其实大部分判断时间都会发生到if (U.compareAndSwapLong(this, STATE, s,next = s - RUNIT + WBIT))这个短路与判断里面。
接下来再简单例子:
void moveIfAtOrigin(double newX, double newY) { // upgrade
// Could instead start with optimistic, not read mode
long stamp = sl.readLock();
try {
while (x == 0.0 && y == 0.0) {
long ws = sl.tryConvertToWriteLock(stamp);
if (ws != 0L) {
stamp = ws;
x = newX;
y = newY;
break;
}
else {
sl.unlockRead(stamp);
stamp = sl.writeLock();
}
}
} finally {
sl.unlock(stamp);
}
}
当tryConvertToWriteLock 返回为0时,才确定已经获取成功。
领取专属 10元无门槛券
私享最新 技术干货