Mutex其实就是一种互斥锁,Mutex一般叫做写锁,即不管读写都会锁住;RWMutex一般叫做读写锁,只有写时才会锁住,读时不会锁住,常用于读多写少的场景,就是为了解决Mutex不管读写都加锁的特性。
锁常用于并发访问临界资源中,可以参考我的文章案例: Golang中slice和map的线程安全问题
double-check叫做二次检查,为什么会有这样的写法呢,如下例是没有二次检查的写法:
type safeMap[K comparable, V any] struct {
m map[K]V
mutex sync.RWMutex
}
//业务场景,若已经有key则直接返回,若没有则把value放进去
func (s *safeMap[K, V]) doubleCheck(key K, value V) V {
s.mutex.RLock()
oldV, ok := s.m[key]
s.mutex.RUnlock()
if ok { //第一次检查,因为是读操作,若加读写锁能读出来,就没必要加写锁,降低开销
return oldV
}
//若上面没读取出来,才会执行这步
s.mutex.Lock()
defer s.mutex.Unlock()
// ①
s.m[key] = value //若第一次没有读到,就赋新值给这个key
return value
}
试想,若在①处有其他业务给key赋值,而此时由于没有再次判断能否从key拿到值,就直接用value把key的值覆盖掉了,明显是错误的做法,如下是改进后的代码
type safeMap[K comparable, V any] struct {
m map[K]V
mutex sync.RWMutex
}
//业务场景,若已经有key则直接返回,若没有则把value放进去
func (s *safeMap[K, V]) doubleCheck(key K, value V) V {
s.mutex.RLock()
oldV, ok := s.m[key]
s.mutex.RUnlock()
if ok { //第一次检查,因为是读操作,若加读写锁能读出来,就没必要加写锁,降低开销
return oldV
}
//若上面没读取出来,才会执行这步
s.mutex.Lock()
oldV, ok = s.m[key]
if ok { //第二次检查,也叫double-check,若第二次查出来了直接返回
return oldV
}
s.m[key] = value //若第二次还没读到,就赋新值给这个key
s.mutex.Unlock()
return value
}
加了写锁并不能保证其他程序不会对里面的值进行修改,所以需要二次检查
// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
state int32 //监控状态值
sema uint32
}
Lock的源码如下,CompareAndSwap简称CAS,是一种cpu指令级的赋值操作,执行速度非常快
假设现在有一个goroutine进来想要竞争加锁,CompareAndSwapInt32里面有三个参数addr *int32, old, new int32
,当old和addr的值相同时,也就是锁正处于空闲期间,不需要竞争,就把addr的值换成new值并返回true,也就是加锁成功
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex. 快路径,也算一次自旋
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow() //慢路径
}
如果这一步加锁成功,则直接返回,也叫做快路径,否则会进入lockSLow,也叫做慢路径,lockSLow里面是一个大的for循环,前半部分仍然是自旋,所以lock的自旋实际上是 快路径中的一次自旋+慢路径中的部分自旋,如果自旋期间能加锁成功,就会直接通过CAS加锁并返回,否则会把这个goroutine放入队列中等待
func (m *Mutex) lockSlow() {
......
for {
// 仍然属于自旋部分
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
......
// 如果自旋时仍然没加上锁,就会把这个goroutine放到队列中
// 省略实现代码
......
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
当队列中有了等待加锁的goroutine之后,如果此时又有新的想要加锁的goroutine进来请求加锁,那么怎么决定把锁加给谁呢?
这时就会出现两种状态:
这也是为什么需要自旋,因为如前面所讲,自旋操作通过CAS来实现,是一种cpu指令级的赋值操作,速度非常快,当进入自旋时,cpu容易飙升的很高,所以自旋次数需要在一个合理的范围,能在此时加锁成功就尽量不要进入队列等待;如果进入了队列还需要和其他goroutine一起竞争加锁,增加了等待时间。
Unlock的源码如下:
// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
如果能解锁就会马上解锁,否则就会进入unlockSlow,即进入慢路径
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex") //若已被解锁,直接抛出异常
}
if new&mutexStarving == 0 { //正常状态
old := new
for {
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else { //饥饿状态
runtime_Semrelease(&m.sema, true, 1)
}
}
1.3 RWMutex实现细节
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers
readerWait int32 // number of departing readers
}
读写锁除了同时读不会加锁之外,读写、写读和写写都会加锁
读锁的加解锁分别是RLock()和RUnlock()
func (rw *RWMutex) RLock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
//AddInt32表示加一个读锁
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
// 进入到这里表示有其他goroutine获得了写锁,这个goroutine需要进行等待那个写锁释放了才能加读锁
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}
func (rw *RWMutex) RUnlock() {
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
// 如果readerCount-1后r >= 0,则不会进入,而是直接往下执行并解锁
// 如果readerCount-1后r < 0,则这里的读锁还不能释放,即会进入rUnlockSlow()函数
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
if race.Enabled {
race.Enable()
}
}
//每进来一次,rUnlockSlow会通过AddInt32(&rw.readerWait, -1)减少一次读操作数
//当为0的时候表示可以释放读锁了,同时会触发写操作可以执行了
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
写锁的加解锁分别是Lock()
和Unlock()
func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
//将想要加锁的线程挂起
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}
func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}
// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
//唤醒被挂起的线程
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}
首先明确一点,Mutex和RWMutex都是不可重入锁,即加了锁之后不能马上再加锁,而是要先把上一个锁释放,然后才能加下一个锁,不然会造成死锁,如下代码是一个重入的写法,会造成死锁:
func TestDeadLock(t *testing.T) {
var a sync.Mutex
a.Lock()
a.Lock()
fmt.Println("++++++:", 1)
a.Unlock()
a.Unlock()
}
/*
fatal error: all goroutines are asleep - deadlock! # 死锁
goroutine 1 [chan receive]:
testing.(*T).Run(0xc000108340, {0x111d2ce?, 0x137a4e96cdc3b?}, 0x1126d80)
/usr/local/go/src/testing/testing.go:1487 +0x37a
testing.runTests.func1(0xc00006a3f0?)
/usr/local/go/src/testing/testing.go:1839 +0x6e
testing.tRunner(0xc000108340, 0xc00011dcd8)
/usr/local/go/src/testing/testing.go:1439 +0x102
testing.runTests(0xc00007a140?, {0x11f0dc0, 0x1, 0x1}, {0x1273108?, 0x40?, 0x0?})
/usr/local/go/src/testing/testing.go:1837 +0x457
testing.(*M).Run(0xc00007a140)
/usr/local/go/src/testing/testing.go:1719 +0x5d9
main.main()
_testmain.go:47 +0x1aa
......
进程 已完成,退出代码为 1
*/
锁的公平性指线程的加锁顺序按照先来后到的顺序加锁,不允许插队;锁的非公平性指不用按先来后到的顺序给线程加锁,即是可抢占式加锁。
公平锁
优点:公平锁更不容易造成锁的饥饿状态,因为队列里所有的线程最终都会得到加锁
缺点:当每个线程持有锁的时间短的时候,队列里线程切换会很频繁,所以容易由于线程切换频繁造成大的开销和浪费时间
非公平锁
优点:由于是抢占式的,所以等待时间会更短,吞吐效率更高
缺点:① 某个线程容易一直拿不到锁导致进入饥饿状态。② 当每个线程持有锁的时间长的时候,此时由于是抢占式的,那么所有线程都可以请求加锁,造成请求过于频繁,重试次数多,做了很多无用功
go中采用的是非公平锁,正如上文所讲,会出现饥饿模式,这么设计的目的主要是为了增大吞吐效率;但go也对饥饿模式做了控制,即当饥饿模式持续超过1s时,会优先从队列中给线程加锁以解除饥饿模式
sync.Once包整体来说比较简单,源码也很简短。这个包用来确保某个动作只执行一次,如下例:
func TestOnce(t *testing.T) {
var one sync.Once
for i := 0; i < 10; i++ {
one.Do(func() {
fmt.Println("我只执行一次")
})
}
}
// Output
// 我只执行一次
如下是sync.Once包的所有源码:
type Once struct {
done uint32
m Mutex
}
func (o *Once) Do(f func()) {
// 如果已经执行过一次,即状态为1,则不会进入条件内,不做任何操作,否则执行doSlow
if atomic.LoadUint32(&o.done) == 0 {
// Outlined slow-path to allow inlining of the fast-path.
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
// 把done的状态变成1,表示已经执行一次了
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
可见,once的这个特性一般用来初始化某些参数或操作
sync.Pool包一般用来缓存临时资源,在被调用的时候会一次性创建一部分内存空间充当内存池,Pool 的目的是缓存已分配但未使用的项目以供以后重用,减轻垃圾收集器(GC)的压力,同时一个Pool可以安全地同时被多个goroutine使用。也就是说,它使构建高效、线程安全的空闲列表变得容易。但是要注意,Pool内的对象可能随时被清除,且不会有通知的,所以不适合存放持久性的对象,更适合存放短时间内复用的对象。
func TestOnce2(t *testing.T) {
pool := sync.Pool{
New: func() interface{} {
return "默认值"
},
}
//了解Get和Put的特性
fmt.Println(pool.Get()) // 没有放入自定义的值就会返回默认值
pool.Put("新值") // 放入自定义的值
fmt.Println(pool.Get()) // 取出刚才放入的值
fmt.Println(pool.Get()) // 放入的值取完了,返回默认值
//基本使用
pool.Put("新值2") // 放入想要复用的值
fmt.Println(pool.Get()) // 取出使用
pool.Put("新值2") // 放回去给下次使用
}
// Output
// 默认值
// 新值
// 默认值
// 新值2
// 新值2
那么Pool池是怎么设计的呢?先简要概括一下,由于Go本身有一个GMP模型,M代表线程,P代表处理器,且一个P只能绑定一个M,每个P都有一个属于他自己的本地队列,里面放的是goroutine,也就是说,任何数据绑定在 P 上,都不需要竞争,因为 P 同一时间只有一个 G 在运行。关于GMP的介绍可以参考这篇文章: GMP 原理与调度
Pool的设计也是类似的,也是采用本地队列的方式,现在大致看看源码是怎么实现的,如下是一些基本结构体
type Pool struct {
noCopy noCopy
local unsafe.Pointer // 数组类型,里面存放的是多个P对象池 [P]poolLocal
localSize uintptr // local数组的长度
victim unsafe.Pointer // 上一轮的对象池数组
victimSize uintptr // 上一轮的对象池数组的长度
New func() any
}
type poolLocalInternal struct {
private any // 只能由相应的P使用,Get和Put会优先读取private,如果满足就直接返回
shared poolChain // 类型为poolChain结构,这个就是P的本地对象池;本地P可以pushHead/popHead; 任何P都可以popTail.
}
type poolLocal struct {
poolLocalInternal
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
现在再来看看poolChain的实现
type poolChain struct {
head *poolChainElt
tail *poolChainElt
}
type poolChainElt struct {
poolDequeue
next, prev *poolChainElt
}
// poolDequeue 是一个无锁(lock-free)且固定大小(fixed-size)的单生产者、多消费者队列。
// 单个生产者可以从头部推送和弹出,消费者可以从尾部弹出
type poolDequeue struct {
headTail uint64
vals []eface // vals是ring buffer类型的数组,又称环形缓冲区
}
type eface struct {
typ, val unsafe.Pointer
}
整体结构逻辑如下图:
由前面的结构体和上图所示,private值是Get和Put优先读取的值;shared指向的是poolChain,这里的数据可以被别的P偷走。 poolChain则是采用 链表+ring buffer 的结构,关于 ring buffer可以参考这篇文章: ring buffer,一篇文章讲透它? ,那么这么设计有什么好处呢?好处是:ring buffer属于预先一次性分配内存,而且是环状结构,所以内存可以重复利用。
Get 从 Pool 中选择任意项,将其从 Pool 中移除,并将其返回给调用者。如果 Get 返回 nil 并且 p.New 非 nil,则 Get 返回调用 p.New 的结果
func (p *Pool) Get() any {
if race.Enabled {
race.Disable()
}
l, pid := p.pin()
x := l.private //① 优先读取private值
l.private = nil
if x == nil { //② 如果private值为空,则去结构为poolChain的shared里面寻找
x, _ = l.shared.popHead() // 先从头部开始取
if x == nil { // 如果没取出来,则调用getSlow,该函数主要逻辑是去其它P的本地队列队列里偷
x = p.getSlow(pid)
}
}
runtime_procUnpin()
if race.Enabled {
race.Enable()
if x != nil { //③ 如果private值不为空则直接返回
race.Acquire(poolRaceAddr(x))
}
}
if x == nil && p.New != nil {
x = p.New() //④ 如果上述步骤都没取到值,就新建一个
}
return x
}
func (p *Pool) getSlow(pid int) any {
size := runtime_LoadAcquintptr(&p.localSize)
locals := p.local
// 尝试从其它P的poolChain偷.
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil { //从链尾开始取值
return x //如果取到了则直接返回
}
}
// 如果从其它P的队列拿不到值,则从victim取值,前面有介绍,victim存放的是上一轮即将被清理的对象池
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil { // 从victim取值时也是先看private能不能拿到
l.private = nil
return x
}
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil { // private拿不到就从P的本地队列,由链尾开始取值
return x
}
}
// 将victim标记为空
atomic.StoreUintptr(&p.victimSize, 0)
return nil
}
举个例子,比如我现在想要一根铅笔写字,步骤大概如下:
① 先看自己手上有没有(private)
② 自己手上没有就看自己文具盒里有没有(shared poolChain)
③ 还没有就找同学借(从别的P的poolChain偷)
④ 没借到就看自己之前打算扔掉的文具盒还有没有铅笔(看victim有没有)
⑤ 如果上述都没有就新买一个(New函数创建一个新的) 3.2.3 Put Put操作简单,就是将 x 添加到池中
Put操作简单,就是将 x 添加到池中
func (p *Pool) Put(x any) {
if x == nil {
return
}
if race.Enabled {
if fastrandn(4) == 0 {
return
}
race.ReleaseMerge(poolRaceAddr(x))
race.Disable()
}
l, _ := p.pin()
if l.private == nil { // 如果private没放东西,就直接把传进来的值放在private
l.private = x
x = nil
}
if x != nil {
l.shared.pushHead(x) //否则把值放到poolChain
}
runtime_procUnpin()
if race.Enabled {
race.Enable()
}
}
func (c *poolChain) pushHead(val any) {
d := c.head
if d == nil {
// 如果poolChain的head不存在,就新建一个长度为8的poolDequeue
const initSize = 8 // 必须为2的幂
d = new(poolChainElt)
d.vals = make([]eface, initSize)
c.head = d
storePoolChainElt(&c.tail, d)
}
if d.pushHead(val) { //如果能放入,则直接放入并返回
return
}
// 如果当前的poolDequeue满了,就新建一个2倍于上一长度的poolDequeue
newSize := len(d.vals) * 2
if newSize >= dequeueLimit {
newSize = dequeueLimit
}
d2 := &poolChainElt{prev: d}
d2.vals = make([]eface, newSize)
c.head = d2
storePoolChainElt(&d.next, d2)
d2.pushHead(val)
}
在Get和Put中都会调用pin这个函数,这个函数的作用是:将当前goroutine放到相应的P上,返回这个P的poolLocal池并且禁止P被抢占
为什么Pool需要这样设计呢?比较需要注意的有 ring buffer(圆形缓冲器)、victim、GC 等
ring buffer降低的频繁创建内存的开销,保证了内存的复用
victim里存放的是上一轮要被清理的缓存对象,这里为什么要设计成让缓存对象多留一轮,在下一轮才清除掉呢?这是为了防止在GC时立马把缓存池汇总的数据清除掉而造成的性能抖动。如果先利用 victim 作为过渡,当在本轮的对象池中实在取不到数据,也可以从 victim 中取,这样程序性能会更加平滑
极客时间go实战训练营