首先和Synchronized(可以参考) 的不同之处,Lock完全用Java写成,在java这个层面是无关JVM实现的。其实现都依赖java.util.concurrent.AbstractQueuedSynchronizer类,简称AQS。
简单说来,AbstractQueuedSynchronizer会把所有的请求线程构成一个CLH队列,当一个线程执行完毕(lock.unlock())时会激活自己的后继节点,但正在执行的线程并不在队列中,而那些等待执行的线程全部处于阻塞状态,线程的显式阻塞是通过调用LockSupport.park()完成,而LockSupport.park()则调用sun.misc.Unsafe.park()本地方法,再进一步,HotSpot在Linux中中通过调用pthread_mutex_lock函数把线程交给系统内核进行阻塞。
具体实现代码:
获取锁流程:
一、lock方法
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
对于刚来竞争的线程首先会通过CAS设置状态,如果设置成功那么直接获取锁,执行临界区的代码,反之调用acquire(1)进入同步队列中。如果已经存在Running线程,那么CAS肯定会失败,则新的竞争线程会通过CAS的方式被追加到队尾。
二、这里的关键是acquire(1) 方法
/**
*在排除模式下获取,忽略中断。通过至少调用一次{@link #tryAcquire}来实现,成功后返回。
*否则,线程将排队,可能反复阻塞和解除阻塞,调用{@link#tryAcquire}直到成功。
*此方法可用于实现方法{@link Lock#lock}.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire方法,是调用nonfairTryAcquire方法:
三、nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//说明有线程拥有了该锁,这个线程就是自己本身那么状态++
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
1、 该方法会首先判断当前状态,如果c==0说明没有线程正在竞争该锁,如果不c !=0 说明有线程正拥有了该锁。 2、 如果发现c==0,则通过CAS设置该状态值为acquires,acquires的初始调用值为1,每次线程重入该锁都会+1,每次unlock都 会-1,但为0时释放锁,这也就是为什么一个lock要对应这个一个unlock的原因。 3、如果CAS设置成功,则可以预计其他任何线程调用CAS都不会再成功,也就认为当前线程得到了该锁,也作为Running线程,很 显然这个Running线程并未进入等待队列。 4、如果c !=0 但发现自己已经拥有锁,只是简单地++acquires,并修改status值,但因为没有竞争,所以通过setStatus修改,而非 CAS,也就是说这段代码实现了偏向锁的功能。
四、 addWaiter 构建入队节点
/**
* 为当前线程和给定模式创建和排队节点。
* @param mode Node.EXCLUSIVE 独占锁, Node.SHARED 共享锁
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
addWaiter方法负责把当前无法获得锁的线程包装为一个Node添加到队尾。 其中参数mode是独占锁还是共享锁,默认为null,独占锁。追加到队尾的动作分两步: 1、如果当前队尾已经存在(tail!=null),则使用CAS把当前线程更新为Tail 2、如果当前Tail为null或则线程调用CAS设置队尾失败,则通过enq方法继续设置Tail 五、enq方法
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
Node h = new Node(); // Dummy header
h.next = node;
node.prev = h;
if (compareAndSetHead(h)) {
tail = node;
return h;
}
}
else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
循环调用CAS,即使有高并发的场景,无限循环将会最终成功把当前线程追加到队尾(或设置队头)
六、acquireQueued 线程对外行为上阻塞,内部自旋
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {//前驱节点等于头节点,尝试获取同步状态
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
}
acquireQueued的主要作用是把已经追加到队列的线程节点(addWaiter方法返回值)进行阻塞,但阻塞前又通过tryAccquire重试是否能获得锁,如果重试成功能则无需阻塞,直接返回。 仔细看看这个方法是个无限循环,感觉如果p == head && tryAcquire(arg)条件不满足循环将永远无法结束,当然不会出现死循环,奥秘在于第12行的parkAndCheckInterrupt会把当前线程挂起,从而阻塞住线程的调用栈。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
如前面所述,LockSupport.park最终把线程交给系统(Linux)内核进行阻塞。当然也不是马上把请求不到锁的线程进行阻塞,还要检查该线程的状态,比如如果该线程处于Cancel状态则没有必要,具体的检查在shouldParkAfterFailedAcquire中, shouldParkAfterFailedAcquire就是靠前继节点判断当前线程是否应该被阻塞,如果前继节点处于CANCELLED状态,则顺便删除这些节点重新构造队列。
到此,获取锁流程结束,要想继续执行,就要有线程释放锁了
锁释放的过程:
说明:
请求锁不成功的线程会被挂起在acquireQueued方法的第12行,12行以后的代码必须等线程被解锁锁才能执行,假如被阻塞的线程得到解锁,则执行第13行,即设置interrupted = true,之后又进入无限循环。 从无限循环的代码可以看出,并不是得到释放锁的线程一定能获得锁,必须在第6行中调用tryAccquire重新竞争,因为锁是非公平的,有可能被新加入的线程获得,从而导致刚被唤醒的线程再次被阻塞,这个细节充分体现了“非公平”的精髓。通过之后将要介绍的解锁机制会可以发现,第一个释放的线程就是Head,因此p == head的判断基本都会成功。 解锁代码相对简单,主要体现在AbstractQueuedSynchronizer.release和Sync.tryRelease方法中class AbstractQueuedSynchronizer
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
如果线程多次锁定,则进行多次释放,直至status==0则真正释放锁,所谓释放锁即设置status为0,因为无竞争所以没有使用CAS。 release的语义在于:如果可以释放锁,则唤醒队列第一个线程(Head),具体唤醒代码如下:
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
这段代码的意思在于找出第一个可以unpark的线程,一般说来head.next == head,Head就是第一个线程
总体来讲线程获取锁要经历以下过程(非公平):
1、调用lock方法,会先进行cas操作看下可否设置同步状态1成功,如果成功执行临界区代码
2、如果不成功获取同步状态,如果状态是0那么cas设置为1.
3、如果同步状态既不是0也不是自身线程持有会把当前线程构造成一个节点。
4、把当前线程节点CAS的方式放入队列中,行为上线程阻塞,内部自旋获取状态。
5、线程释放锁,唤醒队列第一个节点,参与竞争。重复上述。
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有