首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

并发编程之StampedLock

StampedLock,这个类和其他锁的类,都不相同。下面一起来学习下这个类。

What is StampedLock

记得在学习AtomicStampedReference 时,里面是用一个标志stamp,来解决ABA问题。即通过版本号,来检验是否保持一个相对静止的状态,从而能够保证更加线程安全的读写。

特性

这种锁,提供了一种新的锁,乐观读。看里面三种方式:

写锁,排他性,阻塞读。

普通读锁,对读共享,对写排他, 读少写多

乐观读,对读共享,对写不排他,允许写。写少读多

如果普通读的时候,可能会有这样一种情况,当读很多的时候,那就会造成获取写锁线程饥饿,一直无法获取写。

所以在StampedLock里面,就提供了一种乐观读的机制,在读的时候如果发生了写,则应当重读而不是在读的时候直接阻塞写!

这次先以一个Java doc的例子,来具体理解下StampedLock的三种锁。

Java doc的例子

/**

* 坐标x,y,一个修改,一个读。

*/

class Point {

private double x, y;

private final StampedLock sl = new StampedLock();

void move(double deltaX, double deltaY) {

// 写锁,返回一个stamp

long stamp = sl.writeLock();

try {

x += deltaX;

y += deltaY;

} finally {

//finally块中,利用stamp,释放写锁

sl.unlockWrite(stamp);

}

}

double distanceFromOrigin() {

// 只读的,乐观读

long stamp = sl.tryOptimisticRead();

double currentX = x, currentY = y;

//再次检验这个stamp,看是否被写锁更改

if (!sl.validate(stamp)) {

//更改了,那么久尝试阻塞性去获取读锁

//所以currentX和currentY,也可能变化,重新读一遍

stamp = sl.readLock();

try {

currentX = x;

currentY = y;

} finally {

//用完了,释放读锁

sl.unlockRead(stamp);

}

}

//最后执行返回,可以是乐观读后的数据,或者是普通读后的数据。

return Math.sqrt(currentX * currentX + currentY * currentY);

}

void moveIfAtOrigin(double newX, double newY) {

// Could instead start with optimistic, not read mode

long stamp = sl.readLock();

try {

//没有改成功,就一直阻塞。

while (x == 0.0 && y == 0.0) {

//在stamp的基础上,尝试去获取读锁。

long ws = sl.tryConvertToWriteLock(stamp);

if (ws != 0L) {

//成功获取到了写锁。

stamp = ws;

x = newX;

y = newY;

break;

}

else {

//一次转化写锁失败,那么就释放读锁,阻塞性的去获取写锁

sl.unlockRead(stamp);

stamp = sl.writeLock();

}

}

} finally {

//释放写锁

sl.unlock(stamp);

}

}

}

如上述代码,官方的例子还是很全面的,把普通读,乐观读,写锁,以及锁转化都囊括了进去。使用起来还是很简单的。

接下来就结合源码来分析StampedLock实现机制。

StampedLock概括

先来看StampedLock的定义头:

定义:

public class StampedLock implements java.io.Serializable

如上,它并没有实现Lock或者ReadWriteLock接口,因为两个接口并不能完全满足Stamped的需求。

其次,它里面没有用到AbstractQueuedSynchronizer,但是是基于AQS这种思想的,里面也是通过一个CLH queue,来实现线程调度的,并且每一个线程用一个节点来封装,利用Unsafe的park以及unpark来进行挂起和唤醒操作,先看看WNode:

WNode:

static final class WNode {

volatile WNode prev;

volatile WNode next;

/**

* list of linked readers,链接这一条读锁的节点。

*/

volatile WNode cowait;

/**

* 当前线程,// non-null while possibly parked //当被parked的时候,不为null。

*/

volatile Thread thread;

/**

* 0, WAITING, or CANCELLED 0代表等待waiting,否则是取消。

*/

volatile int status;

/**

* RMODE or WMODE 等待写还是等待读。

*/

final int mode;

WNode(int m, WNode p) { mode = m; prev = p; }

}

state变量:

先看state的定义:

/** Lock sequence/state

* 序列 锁的状态

* long类型。

* 初始值第九位为1.

* */

private transient volatile long state;

64位,当处于写锁状态时,第8位为1,并且普通读时,只用前7位来表示,当超出了这7位表示的数量,在用一个int类型的readerOverflow来表示读的数量。

和Lock与ReadWriteLock的转化:

// views

transient ReadLockView readLockView;

transient WriteLockView writeLockView;

transient ReadWriteLockView readWriteLockView;

如上,定义了读锁,写锁,读写锁来吧StampedLock转化为相应的锁,原理就是对StampedLock里面相应方法的封装返回,但是对于StampedLock转化要注意一下几点:

没有重入性质

没有Condition队列支持

普通读

获取锁readLock:

public long readLock() {

long s = state, next; // bypass acquireRead on common uncontended case

return ((whead == wtail && (s & ABITS)

U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?

next : acquireRead(false, 0L));

}

首先尝试一次获取,如果获取成功,则更改state变量值,获取失败则调用acquireRead进行阻塞式获取,最终结果要么是成功获取,要么进入队列阻塞。

下面看acquireRead:

private long acquireRead(boolean interruptible, long deadline) {

WNode node = null, p;

for (int spins = -1;;) {

WNode h;

//当whead=wtail时。

if ((h = whead) == (p = wtail)) {

for (long m, s, ns;;) {

//如果state中读锁,没有超过127,那么就在state上面更改,否则就在ReaderOverflow中更改。

//并且返回最终的state。

if ((m = (s = state) & ABITS)

U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :

(m

return ns;

else if (m >= WBIT) {

//被写着独占。来探测状态。

if (spins > 0) {

//自旋,随机减少。

if (LockSupport.nextSecondarySeed() >= 0)

--spins;

}

else {

if (spins == 0) {

//退出循环。

WNode nh = whead, np = wtail;

if ((nh == h && np == p) || (h = nh) != (p = np))

break;

}

//重新设置spins,供下次使用。

spins = SPINS;

}

}

}

}

if (p == null) { // initialize queue,初始化队列操作。

WNode hd = new WNode(WMODE, null);

if (U.compareAndSwapObject(this, WHEAD, null, hd))

wtail = hd;

}

else if (node == null)

//新建一个读的node。

node = new WNode(RMODE, p);

else if (h == p || p.mode != RMODE) {

if (node.prev != p)

node.prev = p;

else if (U.compareAndSwapObject(this, WTAIL, p, node)) {

p.next = node;

break;

}

}

else if (!U.compareAndSwapObject(p, WCOWAIT,node.cowait = p.cowait, node))

node.cowait = null;

else {

//阻塞式操作,要么阻塞放入队列,要么运行。

for (;;) {

WNode pp, c; Thread w;

if ((h = whead) != null && (c = h.cowait) != null &&

U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&

(w = c.thread) != null) // help release

U.unpark(w);

if (h == (pp = p.prev) || h == p || pp == null) {

long m, s, ns;

do {

//尝试再次获取,读锁。

if ((m = (s = state) & ABITS)

U.compareAndSwapLong(this, STATE, s,

ns = s + RUNIT) :

(m

(ns = tryIncReaderOverflow(s)) != 0L))

return ns;

} while (m

}

if (whead == h && p.prev == pp) {

long time;

if (pp == null || h == p || p.status > 0) {

node = null; // throw away

break;

}

if (deadline == 0L)

time = 0L;

else if ((time = deadline - System.nanoTime())

return cancelWaiter(node, p, false);

Thread wt = Thread.currentThread();

U.putObject(wt, PARKBLOCKER, this);

node.thread = wt;

if ((h != pp || (state & ABITS) == WBIT) &&

whead == h && p.prev == pp)

U.park(false, time);

node.thread = null;

U.putObject(wt, PARKBLOCKER, null);

if (interruptible && Thread.interrupted())

return cancelWaiter(node, p, true);

}

}

}

}

for (int spins = -1;;) {

WNode h, np, pp; int ps;

//为head,要释放后继节点。

if ((h = whead) == p) {

if (spins

spins = HEAD_SPINS;

else if (spins

spins

for (int k = spins;;) { // spin at head,自旋,尝试去把头结点的后一个节点释放。

long m, s, ns;

if ((m = (s = state) & ABITS)

U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :

(m

WNode c; Thread w;

whead = node;

node.prev = null;

while ((c = node.cowait) != null) {

if (U.compareAndSwapObject(node, WCOWAIT,

c, c.cowait) &&

(w = c.thread) != null)

U.unpark(w);

}

return ns;

}

else if (m >= WBIT &&

LockSupport.nextSecondarySeed() >= 0 && --k

break;

}

}

else if (h != null) {

WNode c; Thread w;

while ((c = h.cowait) != null) {

if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&

(w = c.thread) != null)

U.unpark(w);

}

}

if (whead == h) {

if ((np = node.prev) != p) {

if (np != null)

(p = np).next = node; // stale

}

else if ((ps = p.status) == 0)

U.compareAndSwapInt(p, WSTATUS, 0, WAITING);

else if (ps == CANCELLED) {

if ((pp = p.prev) != null) {

node.prev = pp;

pp.next = node;

}

}

else {

long time;

if (deadline == 0L)

time = 0L;

else if ((time = deadline - System.nanoTime())

return cancelWaiter(node, node, false);

Thread wt = Thread.currentThread();

U.putObject(wt, PARKBLOCKER, this);

node.thread = wt;

if (p.status

(p != h || (state & ABITS) == WBIT) &&

whead == h && node.prev == p)

U.park(false, time);

node.thread = null;

U.putObject(wt, PARKBLOCKER, null);

if (interruptible && Thread.interrupted())

return cancelWaiter(node, node, true);

}

}

}

}

这个方法代码有点长,就不细节一一说明,说说大概思路:

两个阻塞性循环,前一个自旋,做了更多初始化的相关工作,后一个是针对于头结点的自旋,如果符合要求则unpark后继节点。

如果最终发生中断,则需要取消获取锁,调用cancelWaiter,将node节点(即当前线程)从队列中删除,并且判断是否需要抛出异常。

两次循环中都会判断是否能够获取读锁:(m = (s = state) & ABITS)

释放读锁unstampedUnlockRead:

final void unstampedUnlockRead() {

for (;;) {

long s, m; WNode h;

if ((m = (s = state) & ABITS) == 0L || m >= WBIT)

throw new IllegalMonitorStateException();

else if (m

if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {

if (m == RUNIT && (h = whead) != null && h.status != 0)

release(h);

break;

}

}

else if (tryDecReaderOverflow(s) != 0L)

break;

}

}

以上代码判断两点,如果读锁数量超过127,即用到了readerOverflow,则在readerOverflow上修改,否则在state的低7位修改。如果需要,则调用release(h) 唤醒下一个节点。

普通写

获取写锁writeLock:

public long writeLock() {

long s, next; // by pass acquireWrite in fully unlocked case only

//如果低8位都为0,就说明没有普通读锁,也没有写锁,就尝试获取

return ((((s = state) & ABITS) == 0L &&

//CAS方法,尝试加上128,也就是以第8位为基点上加。然后返回这个state,也就是stamped。

//失败进入acquireWrite

U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?

next : acquireWrite(false, 0L));

}

如上,通过((s = state) & ABITS) == 0L 判断是否被写锁或者普通读占据,如果没有则尝试CAS修改,没有获取到,则调用acquireWrite 进行阻塞式调用。,这里就不多说acquireWrite了,具体和acquireRead思路差不多,只是一个读锁,一个写锁,相关标志变量不同。

释放锁unstampedUnlockWrite:

final void unstampedUnlockWrite() {

WNode h; long s;

if (((s = state) & WBIT) == 0L)

throw new IllegalMonitorStateException();

state = (s += WBIT) == 0L ? ORIGIN : s;

if ((h = whead) != null && h.status != 0)

release(h);

}

如上,在释放写锁的过程中,并没有去检查是否为当前线程,只是简单了检查了state变量是否符合要求,即是否是获取状态。最后用release唤醒下一个节点进行运行。

乐观读

乐观读并没有获取锁与释放锁,它只需要获取一个状态标志,然后在检测下这个状态标志有没有被一个写锁更改,更改就尝试普通读获取锁,否则直接运行下去就可以了。

tryOptimisticRead获取状态:

public long tryOptimisticRead() {

long s;

return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;

}

然后就直接进行运行,再调用validate检测是否被更改:

public boolean validate(long stamp) {

//禁止loadload排序。

U.loadFence();

//比较低7位,也就是只有读,没有写。

return (stamp & SBITS) == (state & SBITS);

}

当校验成功,则进行操作,如果校验失败,那么就调用普通读取重新操作rollback。具体可以结合上文那个例子进行理解。

锁转换

StampedLock里面,有锁转换这一种功能:

tryConvertToWriteLock:转换成写锁

tryConvertToReadLock:转换成读锁

tryConvertToOptimisticRead:转换成乐观读锁

从里面内容的分析可以知道,在StampedLock里面,乐观读并不会对state变量执行写操作,而写锁是排他的,读锁时共享的,所以在这样一种思维下,对于锁的转化则有一个大概的认识了。就是通过判断以及写入state变量,从而改变锁的性质。

例如对于tryCoonvertToWriteLock:

public long tryConvertToWriteLock(long stamp) {

long a = stamp & ABITS, m, s, next;

while (((s = state) & SBITS) == (stamp & SBITS)) {

if ((m = s & ABITS) == 0L) {

//检测到读锁未被占有

if (a != 0L)

//读锁中途又已经被占有,那么久不能再进行获取了。

break;

if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT))

return next;

}

else if (m == WBIT) {

if (a != m)

break;

return stamp;

}

else if (m == RUNIT && a != 0L) {

if (U.compareAndSwapLong(this, STATE, s,

next = s - RUNIT + WBIT))

return next;

}

else

break;

}

return 0L;

}

如上所示,其实大部分判断时间都会发生到if (U.compareAndSwapLong(this, STATE, s,next = s - RUNIT + WBIT))这个短路与判断里面。

接下来再简单例子:

void moveIfAtOrigin(double newX, double newY) { // upgrade

// Could instead start with optimistic, not read mode

long stamp = sl.readLock();

try {

while (x == 0.0 && y == 0.0) {

long ws = sl.tryConvertToWriteLock(stamp);

if (ws != 0L) {

stamp = ws;

x = newX;

y = newY;

break;

}

else {

sl.unlockRead(stamp);

stamp = sl.writeLock();

}

}

} finally {

sl.unlock(stamp);

}

}

当tryConvertToWriteLock 返回为0时,才确定已经获取成功。

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20190128G08B8S00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券