前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >浅谈golang中的sync包

浅谈golang中的sync包

作者头像
素履coder
发布2022-09-28 16:26:56
5850
发布2022-09-28 16:26:56
举报
文章被收录于专栏:素履coder

1. Mutex和RWMutex#

Mutex其实就是一种互斥锁,Mutex一般叫做写锁,即不管读写都会锁住;RWMutex一般叫做读写锁,只有写时才会锁住,读时不会锁住,常用于读多写少的场景,就是为了解决Mutex不管读写都加锁的特性。

锁常用于并发访问临界资源中,可以参考我的文章案例: Golang中slice和map的线程安全问题

1.1 double-check用法#

double-check叫做二次检查,为什么会有这样的写法呢,如下例是没有二次检查的写法:

代码语言:javascript
复制
type safeMap[K comparable, V any] struct {
	m map[K]V
	mutex sync.RWMutex
}

//业务场景,若已经有key则直接返回,若没有则把value放进去
func (s *safeMap[K, V]) doubleCheck(key K, value V) V {
	s.mutex.RLock()
	oldV, ok := s.m[key]
	s.mutex.RUnlock()
	if ok { //第一次检查,因为是读操作,若加读写锁能读出来,就没必要加写锁,降低开销
		return oldV
	}

	//若上面没读取出来,才会执行这步
	s.mutex.Lock()
    defer s.mutex.Unlock()
    // ①
	s.m[key] = value //若第一次没有读到,就赋新值给这个key

	return value
}

试想,若在①处有其他业务给key赋值,而此时由于没有再次判断能否从key拿到值,就直接用value把key的值覆盖掉了,明显是错误的做法,如下是改进后的代码

代码语言:javascript
复制
type safeMap[K comparable, V any] struct {
	m map[K]V
	mutex sync.RWMutex
}

//业务场景,若已经有key则直接返回,若没有则把value放进去
func (s *safeMap[K, V]) doubleCheck(key K, value V) V {
	s.mutex.RLock()
	oldV, ok := s.m[key]
	s.mutex.RUnlock()
	if ok { //第一次检查,因为是读操作,若加读写锁能读出来,就没必要加写锁,降低开销
		return oldV
	}

	//若上面没读取出来,才会执行这步
	s.mutex.Lock()
	oldV, ok = s.m[key]
	if ok { //第二次检查,也叫double-check,若第二次查出来了直接返回
		return oldV
	}
	s.m[key] = value //若第二次还没读到,就赋新值给这个key
	s.mutex.Unlock()

	return value
}

加了写锁并不能保证其他程序不会对里面的值进行修改,所以需要二次检查

1.2 Mutex实现细节#

代码语言:javascript
复制
// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
   state int32	//监控状态值
   sema  uint32
}
1.2.1 加锁#

Lock的源码如下,CompareAndSwap简称CAS,是一种cpu指令级的赋值操作,执行速度非常快

假设现在有一个goroutine进来想要竞争加锁,CompareAndSwapInt32里面有三个参数addr *int32, old, new int32,当old和addr的值相同时,也就是锁正处于空闲期间,不需要竞争,就把addr的值换成new值并返回true,也就是加锁成功

代码语言:javascript
复制
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
	// Fast path: grab unlocked mutex.	快路径,也算一次自旋
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		if race.Enabled {
			race.Acquire(unsafe.Pointer(m))
		}
		return
	}
	// Slow path (outlined so that the fast path can be inlined)
	m.lockSlow()	//慢路径
}

如果这一步加锁成功,则直接返回,也叫做快路径,否则会进入lockSLow,也叫做慢路径,lockSLow里面是一个大的for循环,前半部分仍然是自旋,所以lock的自旋实际上是 快路径中的一次自旋+慢路径中的部分自旋,如果自旋期间能加锁成功,就会直接通过CAS加锁并返回,否则会把这个goroutine放入队列中等待

代码语言:javascript
复制
func (m *Mutex) lockSlow() {
    ......
	for {
        // 仍然属于自旋部分
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				awoke = true
			}
			runtime_doSpin()
			iter++
			old = m.state
			continue
		}
		......
        // 如果自旋时仍然没加上锁,就会把这个goroutine放到队列中
        // 省略实现代码
        ......
	}

	if race.Enabled {
		race.Acquire(unsafe.Pointer(m))
	}
}

当队列中有了等待加锁的goroutine之后,如果此时又有新的想要加锁的goroutine进来请求加锁,那么怎么决定把锁加给谁呢?

这时就会出现两种状态:

  • 正常模式:新来的goroutine和队列里的goroutine去自由竞争,谁先拿到就给谁加锁
  • 饥饿模式:如果每次都是新来的goroutine拿到锁,那么就会造成队列里的goroutine出现饥饿状态,一直拿不到锁。此时go中代码实现的做法是如果队列中的goroutine等待超过了1s,就会进入饥饿模式,此时会先给队列中的goroutine加锁,这样就可以解决饥饿模式了

这也是为什么需要自旋,因为如前面所讲,自旋操作通过CAS来实现,是一种cpu指令级的赋值操作,速度非常快,当进入自旋时,cpu容易飙升的很高,所以自旋次数需要在一个合理的范围,能在此时加锁成功就尽量不要进入队列等待;如果进入了队列还需要和其他goroutine一起竞争加锁,增加了等待时间。

1.2.2 解锁#

Unlock的源码如下:

代码语言:javascript
复制
// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
	if race.Enabled {
		_ = m.state
		race.Release(unsafe.Pointer(m))
	}

	// Fast path: drop lock bit.
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		// Outlined slow path to allow inlining the fast path.
		// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
		m.unlockSlow(new)
	}
}

如果能解锁就会马上解锁,否则就会进入unlockSlow,即进入慢路径

代码语言:javascript
复制
func (m *Mutex) unlockSlow(new int32) {
	if (new+mutexLocked)&mutexLocked == 0 {
		throw("sync: unlock of unlocked mutex")	//若已被解锁,直接抛出异常
	}
	if new&mutexStarving == 0 {	//正常状态
		old := new
		for {
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			old = m.state
		}
	} else {	//饥饿状态
		runtime_Semrelease(&m.sema, true, 1)
	}
}

1.3 RWMutex实现细节

代码语言:javascript
复制
type RWMutex struct {
	w           Mutex  // held if there are pending writers
	writerSem   uint32 // semaphore for writers to wait for completing readers
	readerSem   uint32 // semaphore for readers to wait for completing writers
	readerCount int32  // number of pending readers
	readerWait  int32  // number of departing readers
}

读写锁除了同时读不会加锁之外,读写、写读和写写都会加锁

1.3.1 读锁

读锁的加解锁分别是RLock()和RUnlock()

代码语言:javascript
复制
func (rw *RWMutex) RLock() {
	if race.Enabled {
		_ = rw.w.state
		race.Disable()
	}
    //AddInt32表示加一个读锁
	if atomic.AddInt32(&rw.readerCount, 1) < 0 {
		// A writer is pending, wait for it.
        // 进入到这里表示有其他goroutine获得了写锁,这个goroutine需要进行等待那个写锁释放了才能加读锁
		runtime_SemacquireMutex(&rw.readerSem, false, 0)
	}
	if race.Enabled {
		race.Enable()
		race.Acquire(unsafe.Pointer(&rw.readerSem))
	}
}
代码语言:javascript
复制
func (rw *RWMutex) RUnlock() {
	if race.Enabled {
		_ = rw.w.state
		race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
		race.Disable()
	}
    // 如果readerCount-1后r >= 0,则不会进入,而是直接往下执行并解锁
    // 如果readerCount-1后r < 0,则这里的读锁还不能释放,即会进入rUnlockSlow()函数
	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
		// Outlined slow-path to allow the fast-path to be inlined
		rw.rUnlockSlow(r)
	}
	if race.Enabled {
		race.Enable()
	}
}

//每进来一次,rUnlockSlow会通过AddInt32(&rw.readerWait, -1)减少一次读操作数
//当为0的时候表示可以释放读锁了,同时会触发写操作可以执行了
func (rw *RWMutex) rUnlockSlow(r int32) {
	if r+1 == 0 || r+1 == -rwmutexMaxReaders {
		race.Enable()
		throw("sync: RUnlock of unlocked RWMutex")
	}
	// A writer is pending.
	if atomic.AddInt32(&rw.readerWait, -1) == 0 {
		// The last reader unblocks the writer.
		runtime_Semrelease(&rw.writerSem, false, 1)
	}
}
1.3.2 写锁#

写锁的加解锁分别是Lock()Unlock()

代码语言:javascript
复制
func (rw *RWMutex) Lock() {
	if race.Enabled {
		_ = rw.w.state
		race.Disable()
	}
	// First, resolve competition with other writers.
	rw.w.Lock()
	// Announce to readers there is a pending writer.
	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
	// Wait for active readers.
	if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
        //将想要加锁的线程挂起
		runtime_SemacquireMutex(&rw.writerSem, false, 0)
	}
	if race.Enabled {
		race.Enable()
		race.Acquire(unsafe.Pointer(&rw.readerSem))
		race.Acquire(unsafe.Pointer(&rw.writerSem))
	}
}
代码语言:javascript
复制
func (rw *RWMutex) Unlock() {
	if race.Enabled {
		_ = rw.w.state
		race.Release(unsafe.Pointer(&rw.readerSem))
		race.Disable()
	}

	// Announce to readers there is no active writer.
	r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
	if r >= rwmutexMaxReaders {
		race.Enable()
		throw("sync: Unlock of unlocked RWMutex")
	}
	// Unblock blocked readers, if any.
	for i := 0; i < int(r); i++ {
        //唤醒被挂起的线程
		runtime_Semrelease(&rw.readerSem, false, 0)
	}
	// Allow other writers to proceed.
	rw.w.Unlock()
	if race.Enabled {
		race.Enable()
	}
}

1.4 总结

1.4.1 锁的可重入性

首先明确一点,Mutex和RWMutex都是不可重入锁,即加了锁之后不能马上再加锁,而是要先把上一个锁释放,然后才能加下一个锁,不然会造成死锁,如下代码是一个重入的写法,会造成死锁:

代码语言:javascript
复制
func TestDeadLock(t *testing.T) {
	var a sync.Mutex
	a.Lock()
	a.Lock()
	fmt.Println("++++++:", 1)
	a.Unlock()
	a.Unlock()
}

/*
fatal error: all goroutines are asleep - deadlock! # 死锁

goroutine 1 [chan receive]:
testing.(*T).Run(0xc000108340, {0x111d2ce?, 0x137a4e96cdc3b?}, 0x1126d80)
	/usr/local/go/src/testing/testing.go:1487 +0x37a
testing.runTests.func1(0xc00006a3f0?)
	/usr/local/go/src/testing/testing.go:1839 +0x6e
testing.tRunner(0xc000108340, 0xc00011dcd8)
	/usr/local/go/src/testing/testing.go:1439 +0x102
testing.runTests(0xc00007a140?, {0x11f0dc0, 0x1, 0x1}, {0x1273108?, 0x40?, 0x0?})
	/usr/local/go/src/testing/testing.go:1837 +0x457
testing.(*M).Run(0xc00007a140)
	/usr/local/go/src/testing/testing.go:1719 +0x5d9
main.main()
	_testmain.go:47 +0x1aa

......

进程 已完成,退出代码为 1
*/
1.4.2 锁的公平性

锁的公平性指线程的加锁顺序按照先来后到的顺序加锁,不允许插队;锁的非公平性指不用按先来后到的顺序给线程加锁,即是可抢占式加锁。

公平锁

优点:公平锁更不容易造成锁的饥饿状态,因为队列里所有的线程最终都会得到加锁

缺点:当每个线程持有锁的时间短的时候,队列里线程切换会很频繁,所以容易由于线程切换频繁造成大的开销和浪费时间

非公平锁

优点:由于是抢占式的,所以等待时间会更短,吞吐效率更高

缺点:① 某个线程容易一直拿不到锁导致进入饥饿状态。② 当每个线程持有锁的时间长的时候,此时由于是抢占式的,那么所有线程都可以请求加锁,造成请求过于频繁,重试次数多,做了很多无用功

go中采用的是非公平锁,正如上文所讲,会出现饥饿模式,这么设计的目的主要是为了增大吞吐效率;但go也对饥饿模式做了控制,即当饥饿模式持续超过1s时,会优先从队列中给线程加锁以解除饥饿模式

2. Once

sync.Once包整体来说比较简单,源码也很简短。这个包用来确保某个动作只执行一次,如下例:

代码语言:javascript
复制
func TestOnce(t *testing.T) {
	var one sync.Once
	for i := 0; i < 10; i++ {
		one.Do(func() {
			fmt.Println("我只执行一次")
		})
	}
}

// Output
// 我只执行一次

如下是sync.Once包的所有源码:

代码语言:javascript
复制
type Once struct {
	done uint32
	m    Mutex
}

func (o *Once) Do(f func()) {
    // 如果已经执行过一次,即状态为1,则不会进入条件内,不做任何操作,否则执行doSlow
	if atomic.LoadUint32(&o.done) == 0 {
		// Outlined slow-path to allow inlining of the fast-path.
		o.doSlow(f)
	}
}

func (o *Once) doSlow(f func()) {
	o.m.Lock()
	defer o.m.Unlock()
	if o.done == 0 {
        // 把done的状态变成1,表示已经执行一次了
		defer atomic.StoreUint32(&o.done, 1)
		f()
	}
}

可见,once的这个特性一般用来初始化某些参数或操作

3. Pool

sync.Pool包一般用来缓存临时资源,在被调用的时候会一次性创建一部分内存空间充当内存池,Pool 的目的是缓存已分配但未使用的项目以供以后重用,减轻垃圾收集器(GC)的压力,同时一个Pool可以安全地同时被多个goroutine使用。也就是说,它使构建高效、线程安全的空闲列表变得容易。但是要注意,Pool内的对象可能随时被清除,且不会有通知的,所以不适合存放持久性的对象,更适合存放短时间内复用的对象。

3.1 基本用法

3.2 Pool实现细节

代码语言:javascript
复制
func TestOnce2(t *testing.T) {
	pool := sync.Pool{
		New: func() interface{} {
			return "默认值"
		},
	}

	//了解Get和Put的特性
	fmt.Println(pool.Get())	// 没有放入自定义的值就会返回默认值
	pool.Put("新值")      // 放入自定义的值
	fmt.Println(pool.Get())	// 取出刚才放入的值
	fmt.Println(pool.Get()) // 放入的值取完了,返回默认值

	//基本使用
	pool.Put("新值2")		// 放入想要复用的值
	fmt.Println(pool.Get())	// 取出使用
	pool.Put("新值2") // 放回去给下次使用
}

// Output
// 默认值
// 新值
// 默认值

// 新值2
// 新值2
3.2.1 poolChain

那么Pool池是怎么设计的呢?先简要概括一下,由于Go本身有一个GMP模型,M代表线程,P代表处理器,且一个P只能绑定一个M,每个P都有一个属于他自己的本地队列,里面放的是goroutine,也就是说,任何数据绑定在 P 上,都不需要竞争,因为 P 同一时间只有一个 G 在运行。关于GMP的介绍可以参考这篇文章: GMP 原理与调度

Pool的设计也是类似的,也是采用本地队列的方式,现在大致看看源码是怎么实现的,如下是一些基本结构体

代码语言:javascript
复制
type Pool struct {
	noCopy noCopy
	local     unsafe.Pointer // 数组类型,里面存放的是多个P对象池 [P]poolLocal
	localSize uintptr        // local数组的长度
	victim     unsafe.Pointer // 上一轮的对象池数组
	victimSize uintptr        // 上一轮的对象池数组的长度
	New func() any
}

type poolLocalInternal struct {
	private any       // 只能由相应的P使用,Get和Put会优先读取private,如果满足就直接返回
	shared  poolChain // 类型为poolChain结构,这个就是P的本地对象池;本地P可以pushHead/popHead; 任何P都可以popTail.
}

type poolLocal struct {
	poolLocalInternal
	pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

现在再来看看poolChain的实现

代码语言:javascript
复制
type poolChain struct {
	head *poolChainElt
	tail *poolChainElt
}

type poolChainElt struct {
	poolDequeue
	next, prev *poolChainElt
}

// poolDequeue 是一个无锁(lock-free)且固定大小(fixed-size)的单生产者、多消费者队列。
// 单个生产者可以从头部推送和弹出,消费者可以从尾部弹出
type poolDequeue struct {
	headTail uint64
	vals []eface	// vals是ring buffer类型的数组,又称环形缓冲区
}

type eface struct {
	typ, val unsafe.Pointer
}

整体结构逻辑如下图:

由前面的结构体和上图所示,private值是Get和Put优先读取的值;shared指向的是poolChain,这里的数据可以被别的P偷走。 poolChain则是采用 链表+ring buffer 的结构,关于 ring buffer可以参考这篇文章: ring buffer,一篇文章讲透它? ,那么这么设计有什么好处呢?好处是:ring buffer属于预先一次性分配内存,而且是环状结构,所以内存可以重复利用。

3.2.2 Get

Get 从 Pool 中选择任意项,将其从 Pool 中移除,并将其返回给调用者。如果 Get 返回 nil 并且 p.New 非 nil,则 Get 返回调用 p.New 的结果

代码语言:javascript
复制
func (p *Pool) Get() any {
	if race.Enabled {
		race.Disable()
	}
	l, pid := p.pin()
	x := l.private	//① 优先读取private值
	l.private = nil
	if x == nil {	//② 如果private值为空,则去结构为poolChain的shared里面寻找
		x, _ = l.shared.popHead() // 先从头部开始取
		if x == nil { // 如果没取出来,则调用getSlow,该函数主要逻辑是去其它P的本地队列队列里偷
			x = p.getSlow(pid)
		}
	}
	runtime_procUnpin()
	if race.Enabled {
		race.Enable()
		if x != nil {	//③ 如果private值不为空则直接返回
			race.Acquire(poolRaceAddr(x))
		}
	}
	if x == nil && p.New != nil {
		x = p.New()	//④ 如果上述步骤都没取到值,就新建一个
	}
	return x
}

func (p *Pool) getSlow(pid int) any {
	size := runtime_LoadAcquintptr(&p.localSize)
	locals := p.local
	// 尝试从其它P的poolChain偷.
	for i := 0; i < int(size); i++ {
		l := indexLocal(locals, (pid+i+1)%int(size))
		if x, _ := l.shared.popTail(); x != nil {	//从链尾开始取值
			return x //如果取到了则直接返回
		}
	}

    // 如果从其它P的队列拿不到值,则从victim取值,前面有介绍,victim存放的是上一轮即将被清理的对象池
	size = atomic.LoadUintptr(&p.victimSize)
	if uintptr(pid) >= size {
		return nil
	}
	locals = p.victim
	l := indexLocal(locals, pid)
	if x := l.private; x != nil {	// 从victim取值时也是先看private能不能拿到
		l.private = nil
		return x
	}
	for i := 0; i < int(size); i++ {
		l := indexLocal(locals, (pid+i)%int(size))
		if x, _ := l.shared.popTail(); x != nil { // private拿不到就从P的本地队列,由链尾开始取值
			return x
		}
	}

    // 将victim标记为空
	atomic.StoreUintptr(&p.victimSize, 0)

	return nil
}

举个例子,比如我现在想要一根铅笔写字,步骤大概如下:

① 先看自己手上有没有(private)

② 自己手上没有就看自己文具盒里有没有(shared poolChain)

③ 还没有就找同学借(从别的P的poolChain偷)

④ 没借到就看自己之前打算扔掉的文具盒还有没有铅笔(看victim有没有)

⑤ 如果上述都没有就新买一个(New函数创建一个新的) 3.2.3 Put Put操作简单,就是将 x 添加到池中

3.2.3 Put

Put操作简单,就是将 x 添加到池中

代码语言:javascript
复制
func (p *Pool) Put(x any) {
	if x == nil {
		return
	}
	if race.Enabled {
		if fastrandn(4) == 0 {
			return
		}
		race.ReleaseMerge(poolRaceAddr(x))
		race.Disable()
	}
	l, _ := p.pin()
	if l.private == nil {	// 如果private没放东西,就直接把传进来的值放在private
		l.private = x
		x = nil
	}
	if x != nil {
		l.shared.pushHead(x) //否则把值放到poolChain
	}
	runtime_procUnpin()
	if race.Enabled {
		race.Enable()
	}
}

func (c *poolChain) pushHead(val any) {
	d := c.head
	if d == nil {
		// 如果poolChain的head不存在,就新建一个长度为8的poolDequeue
		const initSize = 8 // 必须为2的幂
		d = new(poolChainElt)
		d.vals = make([]eface, initSize)
		c.head = d
		storePoolChainElt(&c.tail, d)
	}

	if d.pushHead(val) {	//如果能放入,则直接放入并返回
		return
	}

    // 如果当前的poolDequeue满了,就新建一个2倍于上一长度的poolDequeue
	newSize := len(d.vals) * 2
	if newSize >= dequeueLimit {
		newSize = dequeueLimit
	}

	d2 := &poolChainElt{prev: d}
	d2.vals = make([]eface, newSize)
	c.head = d2
	storePoolChainElt(&d.next, d2)
	d2.pushHead(val)
}

在Get和Put中都会调用pin这个函数,这个函数的作用是:将当前goroutine放到相应的P上,返回这个P的poolLocal池并且禁止P被抢占

3.3 总结

为什么Pool需要这样设计呢?比较需要注意的有 ring buffer(圆形缓冲器)、victim、GC 等

ring buffer降低的频繁创建内存的开销,保证了内存的复用

victim里存放的是上一轮要被清理的缓存对象,这里为什么要设计成让缓存对象多留一轮,在下一轮才清除掉呢?这是为了防止在GC时立马把缓存池汇总的数据清除掉而造成的性能抖动。如果先利用 victim 作为过渡,当在本轮的对象池中实在取不到数据,也可以从 victim 中取,这样程序性能会更加平滑

参考链接#

极客时间go实战训练营

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-09-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. Mutex和RWMutex#
    • 1.1 double-check用法#
      • 1.2 Mutex实现细节#
      • 1.3.1 读锁
      • 1.4 总结
        • 1.4.1 锁的可重入性
        • 2. Once
        • 3. Pool
          • 3.1 基本用法
            • 3.2 Pool实现细节
              • 3.3 总结
                • 参考链接#
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档