前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >听说Mutex源码是出名的不好看,我不信,来试一下

听说Mutex源码是出名的不好看,我不信,来试一下

作者头像
薯条的编程修养
发布2022-08-10 19:38:15
3790
发布2022-08-10 19:38:15
举报
文章被收录于专栏:薯条的编程修养

如何实现Mutex

MVP方案

Mutex需要两个变量:key表示锁的使用情况,value 为0表示锁未被持有,1表示锁被持有 且 没有等待者,n表示锁被持有,有n-1个等待者;sema表示等待队列的信号量,sema是个先进先出的队列,用来阻塞、唤醒协程。

代码语言:javascript
复制
type Mutex struct {
  key int32
  sema int32
}

对Mutex加锁本质是用CAS操作设置key+1,如果锁未被持有,则协程持有锁;如果锁被持有,则协程排队等待; 对Mutex解锁本质是用CAS操作设置key-1,如果key为0,代表没有等待者,直接返回;如果key大于0,代表有协程被阻塞,则按照FIFO的顺序唤醒协程。

把加锁、解锁过程看成排队上厕所,可参考下图,加锁:

<<< 左右滑动见更多 >>>

解锁:

这个mvp版本只是单纯的能用,离好用还差很远。比如如果CPU上有协程来抢锁,但是只能严格按照FIFO顺序排队的话,这样协程并发量不高。因为唤醒协程需要做上下文切换,而此时CPU上正运行着其他协程,理论上如果处于CPU时间片上的协程抢到锁的话会有更好的性能。

解决调度公平性

于是Go团队更改了调度顺序,CPU上的协程也有机会抢锁。

这就好比排队上厕所一样,CPU上的人离厕所门只有1m的距离,而被唤醒的人离厕所门可能有10m的距离,从全局最优的角度考虑,离门近的人进入厕所可以有更高的吞吐。

具体是把Mutex结构体中key由两种含义拓然成了三种含义,并改名为state:

代码语言:javascript
复制
type Mutex struct {
  state int32
  sema uint32
}

这次改进把state中的第一位作为MutexLocked专属位,表示这个Mutex是否已经被协程持有。 第二位作为MutexWoken专属位,表示Mutex是否处于唤醒状态,唤醒操作是一个比较耗性能的操作,由于每次抢锁只会有一个协程获取锁,理论上不需要唤醒太多协程,当前执行唤醒操作时只会唤醒一个协程,且如果已有唤醒的协程就不会再执行唤醒操作了。 剩余位为 MutexWaiterShift 的位置,用来表示等待者的数量。

这个版本在加锁时,CPU上正在执行操作的协程可以和被唤醒的协程同时抢锁,而不用严格执行先进先出。解锁时 Mutex 要去除 MutexLocked标记,并根据是否有等待者或是否有已经被唤醒的协程,去决定是否去唤醒协程。

然而这个版本还有一些问题,协程在抢锁时只会执行一次,如果Mutex没有被释放,这个新来的协程可能就被阻塞排队去了。

加入spin机制

这就好比上厕所一样,你排了好久队,好不容易排到了第一位可以去抢厕所了,由于厕所被锁了,不得不重新排队,是不是有点蛋疼。

此时如果你在厕所门口转几圈,理论上如果厕所里面的人方便的比较快,你转圈的时候它正好出来了,那你就可以进去了,这就是所谓的spin机制。

于是 Go 团队继续优化Mutex,这次为Mutex加入了自旋机制,协程检测锁是否被释放时,如果锁没释放会旋转(底层是执行若干次PAUSE指令),以期望在这个过程中锁被释放。自旋不是无限制的,协程旋转几圈后,如果还没抢到锁,那它只能乖乖去队列尾部排队了。

那么到这里Mutex是否完美了呢,其实这里还有一个比较大的问题:如果你是第一次来抢Mutex,抢不到让你去队尾排队,这个看起来合乎常理,但是如果你已经排了半天队,好不容易排到队头可以抢厕所了,却被CPU上的协程抢走了锁,还让你去队尾排队有点残忍,有可能造成你次次抢不到厕所,然后反复到队尾排队,最后活活憋死了。

解决饥饿问题

于是Go团队接着去解决这个饥饿问题。他们为Mutex又新增了一个状态:mutexStarving,这个状态表示 Mutex 是否处于饥饿状态。如果一个协程等待时长超过1ms,那Mutex会进入饥饿状态。在饥饿状态下,锁的执行权会由解锁的协程直接交给队列头部的协程, 各个协程不再抢锁,也不再自旋,而是严格按照FIFO的顺序执行。这次改进中,还修复了被唤醒的协程需重新去队尾排队的问题:如果协程被唤醒后抢锁失败,会被放到队列头部等待唤醒。

这就是目前Mutex的实现现状~

协程的状态

前面介绍了Mutex的3种状态和一种计数,协程内部也有几个状态,这几个状态会影响Mutex的状态,让我们分别来看一下:

iter

iter用于记录协程的自旋次数,在go1.15版本时,超过4次就不再自旋了。

awoke

awoke表示协程是否唤醒。协程在自旋时,相当于CPU上已经有在等锁的协程。为避免Mutex解锁时再唤醒其他协程,自旋时要尝试把Mutex置为唤醒状态,Mutex处于唤醒状态后 要把本协程的 awoke 也置为true。

在抢锁前如果协程处于唤醒状态,那就需要把Mutex的mutexWoken状态清空,以便于Mutex在解锁时做唤醒操作。

starving

starving 表示协程的等待时间,如果等待时长超过1ms,starving置为true,后续操作会把Mutex也标记为饥饿状态。

Mutex的状态内部影响

前面介绍了Mutex的3种状态和一种计数,也介绍了协程内部的几个状态,下面来看看抢锁、解锁时,这些状态对一些操作的具体影响:

自旋操作

自旋操作是 协程发现锁被占用时等锁释放的方式。若锁处于饥饿状态,协程不再自旋而是直接去排队;自旋操作能否执行和 协程的旋转次数iter 以及一些机器的CPU信息相关。

源码中在执行自旋操作时要判断Mutex是否处于饥饿或者加锁状态:如果锁已经释放,或者Mutex处于饥饿状态,就没必要执行自旋操作了。

加锁操作

协程抢锁操作是在Mutex非饥饿情况下进行的,如果Mutex处于饥饿状态,协程会直接排队。

等待者操作

增加等待者:若协程自旋操作完成后,Mutex仍然处于加锁或者饥饿状态,那新来的协程只好乖乖去排队了,此时等待者数量加1。

减少等待者:解锁时要唤醒协程,此时需要把MutexWoken标记为1表示已有唤醒协程,并减少一个等待者。

若Mutex处于饥饿状态时,调度协程时需要把Mutex标记为加锁,并减少一个等待者。

饥饿操作

什么时候会饥饿:

Mutex的mutexStarving标记是由协程的starving状态计算的,如果一个协程被唤醒后发现距离第一次排队时已经超过了1ms,下次for循环在尝试加锁时会把Mutex标记为饥饿状态。

什么时候会解除饥饿:

Mutex处于饥饿状态后,如果发现等待队列中只有一个协程,或者这个协程等待时长小于1ms,就需要把Mutex转换为正常模式。

两个先决函数

协程的阻塞操作是调用runtime_SemacquireMutex函数执行的,唤醒操作是调用runtime_Semrelease进行的。本篇文章暂时不讲调度的具体细节。

Mutex源码解析

到这里铺垫的差不多了,可以正式看源码了。

lock和unlock方法为了内联操作把简单状态的逻辑单独提出来了。大部头的逻辑在lockslow和unlockslow里面。

这里有两张图表示为啥提出来把大部头逻辑提出来的好处:

lock代码:

代码语言:javascript
复制
func (m *Mutex) Lock() {
  // Fast path
 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
  if race.Enabled {
   race.Acquire(unsafe.Pointer(m))
  }
  return
 }
  // slow path
 m.lockSlow()
}

lock函数 fast path 解决的情况:

<<< 左右滑动见更多 >>>

unlock代码:

代码语言:javascript
复制
func (m *Mutex) Unlock() {
 if race.Enabled {
  _ = m.state
  race.Release(unsafe.Pointer(m))
 }
  // Fast path: drop lock bit.
 new := atomic.AddInt32(&m.state, -mutexLocked)
 if new != 0 {
    // Slow path
  m.unlockSlow(new)
 }
}

unlock函数 fast path 解决的情况:

<<< 左右滑动见更多 >>>

下面来看一下lockslow的源码

我把这部分源码分成三部分,整体执行如图:

第一部分是自旋部分,第二部分是为抢锁的CAS打铺垫,给Mutex的各种状态赋值,第三部分是通过cas操作给Mutex赋值,并根据CAS是否成功做一些相关逻辑处理。

来看每个部分内部的一些细节:

第一部分细节

第二部分细节

第三部分细节

接着让我们通过几个例子感受一下流程:

Mutex:0001, 新来协程

<<< 左右滑动见更多 >>>

Mutex:1001, 等待者被唤醒

<<< 左右滑动见更多 >>>

Mutex:3001, 等待者被唤醒

<<< 左右滑动见更多 >>>

下面来看一下unlockslow的源码

unlock

unlock的逻辑比较比较简单,核心就是唤醒等待者,是否唤醒等待者是根据是否存在等待者决定的,根据Mutex是否处于饥饿状态决定唤醒那个等待者。

Mutex:2101 饥饿时唤醒等待者

<<< 左右滑动见更多 >>>

Mutex源码注释

下面是我阅读go1.15中src/sync/mutex.go源码时写的一些注释,对源码感兴趣的同学可以对照着看看,有啥问题欢迎找我交流:

代码语言:javascript
复制

type Mutex struct {
 state int32
 sema  uint32
}

type Locker interface {
 Lock()
 Unlock()
}

const (
 mutexLocked = 1 << iota // mutex is locked
 mutexWoken
 mutexStarving
 mutexWaiterShift = iota
 starvationThresholdNs = 1e6
)

func (m *Mutex) Lock() {
 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
  if race.Enabled {
   race.Acquire(unsafe.Pointer(m))
  }
  return
 }
 m.lockSlow()
}

func (m *Mutex) lockSlow() {
 var waitStartTime int64
 starving := false
 awoke := false
 iter := 0
 old := m.state

 // 加锁时,会有多个协程会同时来抢Mutex,CAS操作可以保证原子性,如果一个协程修改了Mutex状态后还抢锁失败,
 // 需要重新去抢锁,所以这里用大for循环把逻辑包起来。
 for {
  // 这个if循环主要判断waiter是否可以自旋,自旋条件不满足时,会执行下段代码
  // old&(mutexLocked|mutexStarving) == mutexLocked 这段代码是看old是否处于饥饿状态,饥饿状态没必要自旋
  // 锁释放了,或者处于饥饿状态,或者自旋次数够了,就不再自旋了
  if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
   // 设置 mutexWoken 标记,这里貌似和下面Unlock的代码有联动
   // 这里是为了告知持有锁的goroutine在释放锁时不需要唤醒其他goroutine了,已经有goroutine在等待,以免唤醒太多的等待协程
   if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
    awoke = true
   }
   runtime_doSpin()
   iter++
   old = m.state
   continue
  }

  // 下面四个if判断分别用于对3种状态和一个数量进行赋值,都是为了给new赋值
  new := old
  // Don't try to acquire starving mutex, new arriving goroutines must queue.
  // 新状态只有在非饥饿的条件下才可以加锁
  if old&mutexStarving == 0 {
   new |= mutexLocked
  }
  // 如果old已经处于加锁或者饥饿状态,则等待者按照FIFO的顺序排队
  if old&(mutexLocked|mutexStarving) != 0 {
   new += 1 << mutexWaiterShift
  }

  // 如果符合饥饿条件 且锁还没有被释放,则将其设置为饥饿状态
  // 如果锁已经释放了,那就去抢一次锁。如果进入饥饿模式,那就乖乖去排队了
  if starving && old&mutexLocked != 0 {
   new |= mutexStarving
  }

  // 这里用来消除 awoke 标记
  if awoke {
   if new&mutexWoken == 0 {
    throw("sync: inconsistent mutex state")
   }
   // cas成功
   // 这里用于清除Woken标记,因为后面 goroutine 只会阻塞或者抢锁成功
   // 释放唤醒标识,当前goroutine都不再是唤醒状态了,以便其他goroutine进来

   // cas失败
   // 丢人,所以需要释放之前抢到的 mutexWoken 标识
   new &^= mutexWoken
  }

  // 下面这里尝试给通过CAS操作把old变成new
  // 这里无非就是加锁成功,或者去排队,排队的话需要看排队到头部还是尾部
  if atomic.CompareAndSwapInt32(&m.state, old, new) {

   // 这里还需要看一下
   // 如果原来的old未加锁,且Mutex不处于饥饿状态,那goroutine获取到锁之后就可以直接退出了
   if old&(mutexLocked|mutexStarving) == 0 {
    break // locked the mutex with CAS
   }

   // queueLifo 为true,表示其是被唤醒的,排队时排到头部
   // queueLifo 为false,表示是第一次排队,排队时排到队列尾部
   queueLifo := waitStartTime != 0

   // 第一次排队要记录排队时间
   if waitStartTime == 0 {
    waitStartTime = runtime_nanotime()
   }
   // 这里会阻塞
   runtime_SemacquireMutex(&m.sema, queueLifo, 1)

   // 后续代码是队列里的goroutine被runtime_Semrelease唤醒后,从让出的地方继续执行
   
   // 如果这个协程处于饥饿状态 或 等待时间大于1ms,则设置其为饥饿状态
   starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs

   old = m.state
   // 如果Mutex处于饥饿状态
   if old&mutexStarving != 0 {
    // 这里是一些异常状态
    if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
     throw("sync: inconsistent mutex state")
    }

    // 把Mutex设置为加锁,把等待队列的waiter-1,
    delta := int32(mutexLocked - 1<<mutexWaiterShift)

    // 根据协程的状态看看是否需要切换至正常模式
    if !starving || old>>mutexWaiterShift == 1 {

     delta -= mutexStarving
    }
    atomic.AddInt32(&m.state, delta)
    break
   }

   // goroutine处于正常模式,标记唤醒标识,然后重新自旋去抢锁
   // 执行到这里,这个goroutine是被唤醒的,需要把awoke标记 标识为true
   awoke = true
   iter = 0
  } else {
   // mutex被其他goroutine用了,继续回去自旋吧
   // 赋值失败时还原状态
   old = m.state
  }
 }

 if race.Enabled {
  race.Acquire(unsafe.Pointer(m))
 }
}


func (m *Mutex) Unlock() {
 if race.Enabled {
  _ = m.state
  race.Release(unsafe.Pointer(m))
 }

 new := atomic.AddInt32(&m.state, -mutexLocked)
 if new != 0 {
  // 说明state其他位不为0, 那就直接进入slow path吧
  m.unlockSlow(new)
 }
}

func (m *Mutex) unlockSlow(new int32) {
 if (new+mutexLocked)&mutexLocked == 0 {
  throw("sync: unlock of unlocked mutex")
 }
 // mutex处于正常模式
 if new&mutexStarving == 0 {
  old := new
  for {
   // 如果等待队列为空 或者 已经有其他goroutine被唤醒 or 获得了锁 or 锁处于饥饿模式
   if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
    // 不需要唤醒goroutine,直接返回即可
    return
   }
   // 抢占Woken标示位,获取唤醒一个goroutine的权利
   new = (old - 1<<mutexWaiterShift) | mutexWoken
   if atomic.CompareAndSwapInt32(&m.state, old, new) {
    // 调用这里唤醒一个goroutine
    runtime_Semrelease(&m.sema, false, 1)
    return
   }
   // 抢占不成功就归位,然后循环去吧
   old = m.state
  }
 } else {
  // 饥饿模式下,后来的goroutine不会争抢锁,
  // 直接唤醒第一个等待者
  runtime_Semrelease(&m.sema, true, 1)
 }
}

一个问题

如果读者朋友觉得看懂了源码,可以留言回答一下这个问题:

对于Mutex直接Unlock操作为什么会fatal

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-07-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 薯条的编程修养 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 如何实现Mutex
  • 协程的状态
  • Mutex的状态内部影响
  • 两个先决函数
  • Mutex源码解析
    • unlock
    • Mutex源码注释
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档