MVP方案
Mutex需要两个变量:key表示锁的使用情况,value 为0表示锁未被持有,1表示锁被持有 且 没有等待者,n表示锁被持有,有n-1个等待者;sema表示等待队列的信号量,sema是个先进先出的队列,用来阻塞、唤醒协程。
type Mutex struct {
key int32
sema int32
}
对Mutex加锁本质是用CAS操作设置key+1,如果锁未被持有,则协程持有锁;如果锁被持有,则协程排队等待; 对Mutex解锁本质是用CAS操作设置key-1,如果key为0,代表没有等待者,直接返回;如果key大于0,代表有协程被阻塞,则按照FIFO的顺序唤醒协程。
把加锁、解锁过程看成排队上厕所,可参考下图,加锁:
<<< 左右滑动见更多 >>>
解锁:
这个mvp版本只是单纯的能用,离好用还差很远。比如如果CPU上有协程来抢锁,但是只能严格按照FIFO顺序排队的话,这样协程并发量不高。因为唤醒协程需要做上下文切换,而此时CPU上正运行着其他协程,理论上如果处于CPU时间片上的协程抢到锁的话会有更好的性能。
解决调度公平性
于是Go团队更改了调度顺序,CPU上的协程也有机会抢锁。
这就好比排队上厕所一样,CPU上的人离厕所门只有1m的距离,而被唤醒的人离厕所门可能有10m的距离,从全局最优的角度考虑,离门近的人进入厕所可以有更高的吞吐。
具体是把Mutex结构体中key
由两种含义拓然成了三种含义,并改名为state:
type Mutex struct {
state int32
sema uint32
}
这次改进把state中的第一位作为MutexLocked专属位,表示这个Mutex是否已经被协程持有。 第二位作为MutexWoken专属位,表示Mutex是否处于唤醒状态,唤醒操作是一个比较耗性能的操作,由于每次抢锁只会有一个协程获取锁,理论上不需要唤醒太多协程,当前执行唤醒操作时只会唤醒一个协程,且如果已有唤醒的协程就不会再执行唤醒操作了。 剩余位为 MutexWaiterShift 的位置,用来表示等待者的数量。
这个版本在加锁时,CPU上正在执行操作的协程可以和被唤醒的协程同时抢锁,而不用严格执行先进先出。解锁时 Mutex 要去除 MutexLocked标记,并根据是否有等待者或是否有已经被唤醒的协程,去决定是否去唤醒协程。
然而这个版本还有一些问题,协程在抢锁时只会执行一次,如果Mutex没有被释放,这个新来的协程可能就被阻塞排队去了。
加入spin机制
这就好比上厕所一样,你排了好久队,好不容易排到了第一位可以去抢厕所了,由于厕所被锁了,不得不重新排队,是不是有点蛋疼。
此时如果你在厕所门口转几圈,理论上如果厕所里面的人方便的比较快,你转圈的时候它正好出来了,那你就可以进去了,这就是所谓的spin机制。
于是 Go 团队继续优化Mutex,这次为Mutex加入了自旋机制,协程检测锁是否被释放时,如果锁没释放会旋转(底层是执行若干次PAUSE指令),以期望在这个过程中锁被释放。自旋不是无限制的,协程旋转几圈后,如果还没抢到锁,那它只能乖乖去队列尾部排队了。
那么到这里Mutex是否完美了呢,其实这里还有一个比较大的问题:如果你是第一次来抢Mutex,抢不到让你去队尾排队,这个看起来合乎常理,但是如果你已经排了半天队,好不容易排到队头可以抢厕所了,却被CPU上的协程抢走了锁,还让你去队尾排队有点残忍,有可能造成你次次抢不到厕所,然后反复到队尾排队,最后活活憋死了。
解决饥饿问题
于是Go团队接着去解决这个饥饿问题。他们为Mutex又新增了一个状态:mutexStarving
,这个状态表示 Mutex 是否处于饥饿状态。如果一个协程等待时长超过1ms,那Mutex会进入饥饿状态。在饥饿状态下,锁的执行权会由解锁的协程直接交给队列头部的协程, 各个协程不再抢锁,也不再自旋,而是严格按照FIFO的顺序执行。这次改进中,还修复了被唤醒的协程需重新去队尾排队的问题:如果协程被唤醒后抢锁失败,会被放到队列头部等待唤醒。
这就是目前Mutex的实现现状~
前面介绍了Mutex的3种状态和一种计数,协程内部也有几个状态,这几个状态会影响Mutex的状态,让我们分别来看一下:
iter
iter用于记录协程的自旋次数,在go1.15版本时,超过4次就不再自旋了。
awoke
awoke表示协程是否唤醒。协程在自旋时,相当于CPU上已经有在等锁的协程。为避免Mutex解锁时再唤醒其他协程,自旋时要尝试把Mutex置为唤醒状态,Mutex处于唤醒状态后 要把本协程的 awoke 也置为true。
在抢锁前如果协程处于唤醒状态,那就需要把Mutex的mutexWoken状态清空,以便于Mutex在解锁时做唤醒操作。
starving
starving 表示协程的等待时间,如果等待时长超过1ms,starving置为true,后续操作会把Mutex也标记为饥饿状态。
前面介绍了Mutex的3种状态和一种计数,也介绍了协程内部的几个状态,下面来看看抢锁、解锁时,这些状态对一些操作的具体影响:
自旋操作
自旋操作是 协程发现锁被占用时等锁释放的方式。若锁处于饥饿状态,协程不再自旋而是直接去排队;自旋操作能否执行和 协程的旋转次数iter 以及一些机器的CPU信息相关。
源码中在执行自旋操作时要判断Mutex是否处于饥饿或者加锁状态:如果锁已经释放,或者Mutex处于饥饿状态,就没必要执行自旋操作了。
加锁操作
协程抢锁操作是在Mutex非饥饿情况下进行的,如果Mutex处于饥饿状态,协程会直接排队。
等待者操作
增加等待者:若协程自旋操作完成后,Mutex仍然处于加锁或者饥饿状态,那新来的协程只好乖乖去排队了,此时等待者数量加1。
减少等待者:解锁时要唤醒协程,此时需要把MutexWoken标记为1表示已有唤醒协程,并减少一个等待者。
若Mutex处于饥饿状态时,调度协程时需要把Mutex标记为加锁,并减少一个等待者。
饥饿操作
什么时候会饥饿:
Mutex的mutexStarving标记是由协程的starving状态计算的,如果一个协程被唤醒后发现距离第一次排队时已经超过了1ms,下次for循环在尝试加锁时会把Mutex标记为饥饿状态。
什么时候会解除饥饿:
Mutex处于饥饿状态后,如果发现等待队列中只有一个协程,或者这个协程等待时长小于1ms,就需要把Mutex转换为正常模式。
协程的阻塞操作是调用runtime_SemacquireMutex函数执行的,唤醒操作是调用runtime_Semrelease进行的。本篇文章暂时不讲调度的具体细节。
到这里铺垫的差不多了,可以正式看源码了。
lock和unlock方法为了内联操作把简单状态的逻辑单独提出来了。大部头的逻辑在lockslow和unlockslow里面。
这里有两张图表示为啥提出来把大部头逻辑提出来的好处:
lock代码:
func (m *Mutex) Lock() {
// Fast path
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// slow path
m.lockSlow()
}
lock函数 fast path 解决的情况:
<<< 左右滑动见更多 >>>
unlock代码:
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Slow path
m.unlockSlow(new)
}
}
unlock函数 fast path 解决的情况:
<<< 左右滑动见更多 >>>
下面来看一下lockslow的源码
我把这部分源码分成三部分,整体执行如图:
第一部分是自旋部分,第二部分是为抢锁的CAS打铺垫,给Mutex的各种状态赋值,第三部分是通过cas操作给Mutex赋值,并根据CAS是否成功做一些相关逻辑处理。
来看每个部分内部的一些细节:
第一部分细节
第二部分细节
第三部分细节
接着让我们通过几个例子感受一下流程:
Mutex:0001, 新来协程
<<< 左右滑动见更多 >>>
Mutex:1001, 等待者被唤醒
<<< 左右滑动见更多 >>>
Mutex:3001, 等待者被唤醒
<<< 左右滑动见更多 >>>
下面来看一下unlockslow的源码
unlock的逻辑比较比较简单,核心就是唤醒等待者,是否唤醒等待者是根据是否存在等待者决定的,根据Mutex是否处于饥饿状态决定唤醒那个等待者。
Mutex:2101 饥饿时唤醒等待者
<<< 左右滑动见更多 >>>
下面是我阅读go1.15中src/sync/mutex.go
源码时写的一些注释,对源码感兴趣的同学可以对照着看看,有啥问题欢迎找我交流:
type Mutex struct {
state int32
sema uint32
}
type Locker interface {
Lock()
Unlock()
}
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
mutexWaiterShift = iota
starvationThresholdNs = 1e6
)
func (m *Mutex) Lock() {
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
m.lockSlow()
}
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
// 加锁时,会有多个协程会同时来抢Mutex,CAS操作可以保证原子性,如果一个协程修改了Mutex状态后还抢锁失败,
// 需要重新去抢锁,所以这里用大for循环把逻辑包起来。
for {
// 这个if循环主要判断waiter是否可以自旋,自旋条件不满足时,会执行下段代码
// old&(mutexLocked|mutexStarving) == mutexLocked 这段代码是看old是否处于饥饿状态,饥饿状态没必要自旋
// 锁释放了,或者处于饥饿状态,或者自旋次数够了,就不再自旋了
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 设置 mutexWoken 标记,这里貌似和下面Unlock的代码有联动
// 这里是为了告知持有锁的goroutine在释放锁时不需要唤醒其他goroutine了,已经有goroutine在等待,以免唤醒太多的等待协程
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
// 下面四个if判断分别用于对3种状态和一个数量进行赋值,都是为了给new赋值
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
// 新状态只有在非饥饿的条件下才可以加锁
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 如果old已经处于加锁或者饥饿状态,则等待者按照FIFO的顺序排队
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 如果符合饥饿条件 且锁还没有被释放,则将其设置为饥饿状态
// 如果锁已经释放了,那就去抢一次锁。如果进入饥饿模式,那就乖乖去排队了
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// 这里用来消除 awoke 标记
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// cas成功
// 这里用于清除Woken标记,因为后面 goroutine 只会阻塞或者抢锁成功
// 释放唤醒标识,当前goroutine都不再是唤醒状态了,以便其他goroutine进来
// cas失败
// 丢人,所以需要释放之前抢到的 mutexWoken 标识
new &^= mutexWoken
}
// 下面这里尝试给通过CAS操作把old变成new
// 这里无非就是加锁成功,或者去排队,排队的话需要看排队到头部还是尾部
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 这里还需要看一下
// 如果原来的old未加锁,且Mutex不处于饥饿状态,那goroutine获取到锁之后就可以直接退出了
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// queueLifo 为true,表示其是被唤醒的,排队时排到头部
// queueLifo 为false,表示是第一次排队,排队时排到队列尾部
queueLifo := waitStartTime != 0
// 第一次排队要记录排队时间
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 这里会阻塞
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 后续代码是队列里的goroutine被runtime_Semrelease唤醒后,从让出的地方继续执行
// 如果这个协程处于饥饿状态 或 等待时间大于1ms,则设置其为饥饿状态
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 如果Mutex处于饥饿状态
if old&mutexStarving != 0 {
// 这里是一些异常状态
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 把Mutex设置为加锁,把等待队列的waiter-1,
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 根据协程的状态看看是否需要切换至正常模式
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
// goroutine处于正常模式,标记唤醒标识,然后重新自旋去抢锁
// 执行到这里,这个goroutine是被唤醒的,需要把awoke标记 标识为true
awoke = true
iter = 0
} else {
// mutex被其他goroutine用了,继续回去自旋吧
// 赋值失败时还原状态
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// 说明state其他位不为0, 那就直接进入slow path吧
m.unlockSlow(new)
}
}
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// mutex处于正常模式
if new&mutexStarving == 0 {
old := new
for {
// 如果等待队列为空 或者 已经有其他goroutine被唤醒 or 获得了锁 or 锁处于饥饿模式
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
// 不需要唤醒goroutine,直接返回即可
return
}
// 抢占Woken标示位,获取唤醒一个goroutine的权利
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 调用这里唤醒一个goroutine
runtime_Semrelease(&m.sema, false, 1)
return
}
// 抢占不成功就归位,然后循环去吧
old = m.state
}
} else {
// 饥饿模式下,后来的goroutine不会争抢锁,
// 直接唤醒第一个等待者
runtime_Semrelease(&m.sema, true, 1)
}
}
一个问题
如果读者朋友觉得看懂了源码,可以留言回答一下这个问题:
对于Mutex直接Unlock操作为什么会fatal