下面是wikipedia对条件变量的定义,大体是说条件变量总的来说是等待特定条件的线程的容器。
❝A condition variable is basically a container of threads that are waiting for a certain condition. ❞
Cond是Go标准库sync包提供的条件变量原语,目的是为等待通知场景下的并发问题提供解决方法。Cond通常应用于等待某个条件的一个或一组goroutine,当等待条件变为true时,其中一个或一组所有的goroutine都被唤醒执行。通俗来说,Cond和某个条件相关,这个条件可以是一个表达式、一个bool变量或是一个函数调用,只要它们的结果是bool类型的值就行。一个或一组goroutine需要这个条件才能协同完成,在条件还没有满足的时候,所有等待该条件的goroutine都会被阻塞,当条件满足的时候,等待的goroutine才能够继续运行。举个例子,在奥运会100米短跑比赛中,将每个运动员看作一个个goroutine,只有在发令枪响之后,运动员才能开始跑,这里的发令枪响就是条件变量,只有枪响之后,也就是条件满足之后,goroutine才能运行,在枪响之前,运动员处于等待状态。
我们先通过一个例子来了解Cond解决的是什么问题,该例子来至于文末的引用1。下面的程序启动了一个goroutine,当rec.data有内容的时候,打印内容退出,没有内容的时候进行空转。main goroutine休息2秒钟后更新rec的值。编译运行下面的程序,可以看到CPU使用率高达接近100%。
type Record struct {
sync.Mutex
data string
}
func main() {
var wg sync.WaitGroup
rec := &Record{}
wg.Add(1)
go func(rec *Record) {
defer wg.Done()
for {
rec.Lock()
if rec.data != "" {
fmt.Println("Data: ", rec.data)
rec.Unlock()
return
}
rec.Unlock()
}
}(rec)
time.Sleep(2 * time.Second)
rec.Lock()
rec.data = "gopher"
rec.Unlock()
wg.Wait()
}
虽然上面的程序能够工作,但是占用CPU太高,哪有没有好的办法降低CPU使用率,有同学可能想到在for循环中加入time.Sleep,但是睡眠多长时间呢?这是一个问题。睡眠太长导致检查rec.data更新不及时,睡眠太短又会浪费CPU. 所以这里的问题是,我们需要有一种机制让goroutine在等待时挂起,在事件发生时向挂起的goroutine发送信号,让他恢复运行。条件变量Cond实现了这种机制。我们可以使用Cond完成上面的需求。先上改进后的代码,具体API接口作用见下一部分实现原理中的介绍。
type Record struct {
sync.Mutex
data string
cond *sync.Cond
}
func NewRecord() *Record {
r := Record{}
r.cond = sync.NewCond(&r)
return &r
}
func main() {
var wg sync.WaitGroup
rec := NewRecord()
wg.Add(1)
go func(rec *Record) {
defer wg.Done()
rec.Lock()
rec.cond.Wait()
rec.Unlock()
fmt.Println("Data: ", rec.data)
return
}(rec)
time.Sleep(2 * time.Second)
rec.Lock()
rec.data = "gopher"
rec.Unlock()
rec.cond.Signal()
wg.Wait()
}
上面的程序使用了Cond的3个接口,分别是构造函数NewCond、等待函数Wait,通知函数Signal.启动的goroutine会阻塞等待在rec.cond.Wait()这里,直到有人发信号给他,它才会继续运行。main goroutine休眠2秒后,更新rec.data的值,然后调用rec.cond.Signal发送信号,收到信号后,启动的goroutine继续运行,最后打印data的内容并退出。通过信号机制,goroutine在条件不满足时休眠,满足时被唤醒继续执行,非常完美。
下面分析的源码是Go1.14版本,Cond实现在sync包下的cond.go文件中,代码加注释不到100行,非常简单,关键的逻辑调用了运行时中的信号量代码,本文只分析与Cond相关的代码,详细信号量代码源码分析准备专门写一篇文章。
Cond结构定义如下,核心字段是L和notify。noCopy和checker是辅助字段,用于检查Cond对象是否被复制使用了,因为Cond同Mutex一样,也是不能被复制的。L是一个接口,定义的有两个方法Lock和Unlock,一般将Mutex或RWMutex对象赋值给L,因为它们都实现了Locker的方法。notify是一个等待队列,调用用Wait方法后,goroutine会挂起等待在notify上。
等待队列类型为notifyList,它里面的5个字段可以分为3部分理解,lock是加锁用的。wait和notify都是一个计数器,它们的初始值都为0,每次调用Wait操作,wait的值都会增加1.wait的值可以理解为调用Wait操作程序所在的goroutine的编号,notify值表示小于它的阻塞的goroutine已经唤醒处理过,调用Signal或者Broadcast时唤醒阻塞在[notify,wait)范围编号上的goroutine。head和tail是一个单链表的头尾指针节点。
通俗理解,notifyList为一个队列,它里面存储是goroutine。wait和notify分别表示生产者和消费者的位置。这个队列是一个单链表,里面的goroutine按照wait值从小到大排列。
type Cond struct {
noCopy noCopy
// 当观察或修改等待条件的时候需要加锁
L Locker
// 等待队列
notify notifyList
checker copyChecker
}
type Locker interface {
Lock()
Unlock()
}
type notifyList struct {
// wait是一个计数器,它的值为调用Wait的goroutine的编号
wait uint32
// notify也是一个计数器,它的值表示调用Signal或者Broadcast时唤醒阻塞在[notify,wait)
// 范围编号上的goroutine
notify uint32
lock uintptr
// head是一个指针,指向sudog单链表的头节点,阻塞g的队列中第一个g的位置
head unsafe.Pointer
// tail也是一个指针,指向sudog单链表的尾节点,阻塞g的队列中最后一个g的位置
tail unsafe.Pointer
}
Wait方法的核心功能就是将当前的goroutine挂起,等待Signal或者Broadcast唤醒。需要注意,Wait方法中会先进行释放锁操作,后面又会执行加锁操作。这意味用户程序在调用Wait操作之前必须加锁,Wait操作完成之后需要释放锁,否则会存在释放未加锁的锁,引发panic等问题。
现在来分析从用户调用Wait操作之前加锁到这里c.L.Unlock过程中,锁在保护哪些内容。下面t肯定是受保护了,确保了每个ticket与关联的goroutine的唯一性。还有就是在这个过程中如果有并发操作的对象也是受保护的。
Wait方法中调用的runtime_notifyListAdd和runtime_notifyListWait函数是在runtime包中的sema.go文件实现的。执行runtime_notifyListWait操作将当前的goroutine挂起阻塞等待在notify队列上,收到唤醒信号之后会恢复运行。最后执行了c.L.Lock(),与我们在用户程序调用Wait之后的释放锁操作配对。
func (c *Cond) Wait() {
// 检查Cond对象是否被复制过
c.checker.check()
// wait自增1
t := runtime_notifyListAdd(&c.notify)
// 释放锁,所以在调用Wait前,必须进行加锁
c.L.Unlock()
// 将当前的goroutine挂起阻塞等待在notify队列上,收到唤醒信号之后
// 恢复运行
runtime_notifyListWait(&c.notify, t)
// 加锁,在执行Wait之后的代码前,Cond又被加锁了
c.L.Lock()
}
notifyListAdd原子性的将等待队列中的wait值加1
func notifyListAdd(l *notifyList) uint32 {
// 将wait的值原子性操作自增1,wait的初始值为0
return atomic.Xadd(&l.wait, 1) - 1
}
notifyListWait会创建一个sudog对象s,并设置s的ticket值,将它和当前的goroutine关联起来。然后加入到队尾。最后调用gopark将当前的goroutine挂起。
func notifyListWait(l *notifyList, t uint32) {
lock(&l.lock)
// 小于notify的值的对应编号阻塞的goroutine之前已经唤醒过了,直接返回
if less(t, l.notify) {
unlock(&l.lock)
return
}
// 获取一个sudog对象s
s := acquireSudog()
// 设置s中的g为当前的goroutine
s.g = getg()
// 设置ticket值为传入的t,可以理解为ticket与当前阻塞的goroutine(s.g)对应
s.ticket = t
s.releasetime = 0
t0 := int64(0)
if blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
// 将新创建的sudog对象s加入到队列的尾部,这个过程是在lock加锁的条件下进行的
// 不用担心并发将s加入到l.tail冲突问题
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
// 调用gopark阻塞当前的goroutine运行
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
if t0 != 0 {
blockevent(s.releasetime-t0, 2)
}
releaseSudog(s)
}
Signal方法将唤醒等待队列中队头的goroutine,真正的实现在notifyListNotifyOne函数,此函数实现也在runtime包中的sema.go文件。
func (c *Cond) Signal() {
c.checker.check()
// 唤醒notifyList队列中队头的goroutine
runtime_notifyListNotifyOne(&c.notify)
}
notifyListNotifyOne函数找到队头中ticket为l.notify的对象,并将该对象关联的goroutine唤醒恢复运行。
func notifyListNotifyOne(l *notifyList) {
// 如果wait和notify值相等,说明没有阻塞等待的goroutine,也就没有要唤醒的g了,这里直接返回
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
// 加锁执行下面操作
lock(&l.lock)
// 加锁后再次检查wait的值跟notify是否相等,如果相等同上直接释放锁返回
t := l.notify
if t == atomic.Load(&l.wait) {
unlock(&l.lock)
return
}
// notify加1,相当于消费者消费一个数据(g),下面会将队列头的goroutine唤醒
atomic.Store(&l.notify, t+1)
// 执行循环操作,从队列中找出ticket等于notify(l.notify-1,因为此时l.notify已加1)的sudog对象
// 从sudog对象中获取到绑定的g,然后执行readyWithTime,readyWithTime会调用goread将g唤醒
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
n := s.next
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
unlock(&l.lock)
s.next = nil
readyWithTime(s, 4)
return
}
}
// 释放锁
unlock(&l.lock)
}
Broadcast方法会唤醒队列中所有的goroutine.
func (c *Cond) Broadcast() {
c.checker.check()
// 唤醒notifyList队列中所有的goroutine
runtime_notifyListNotifyAll(&c.notify)
}
notifyListNotifyAll函数也在sema.go文件,将等待队列中所有的goroutine执行goready进行唤醒。在实现的时候,通过拷贝的方法将当前链表拷贝到临时变量s中,达到了快速释放锁。这里锁的粒度比Signal还要小,处理的非常优雅。
func notifyListNotifyAll(l *notifyList) {
// 如果wait和notify值相等,说明没有阻塞等待的goroutine,也就没有要唤醒的g了,这里直接返回
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
// 加锁,将当前链表拷贝到临时变量s中,然后将原链表释放
// 之后就可以解锁了。通过是拷贝方式达到快速解锁,这里比
// 锁的粒度比Signal还要小。
lock(&l.lock)
s := l.head
l.head = nil
l.tail = nil
// 原子将wait的值赋值给notify,表示[notify,wait)范围内阻塞的goroutine都将被唤醒了
atomic.Store(&l.notify, atomic.Load(&l.wait))
unlock(&l.lock)
// 遍历链表中每一个sudog对象,将绑定在sudog对象上的goroutine唤醒
for s != nil {
next := s.next
s.next = nil
// readyWithTime会调用goready将goroutine唤醒
readyWithTime(s, 4)
s = next
}
}
我们先来看stackoverflow网站讨论Cond错误使用的例子。分析错误的原因,总结经验。
下面的程序会出现:fatal error: all goroutines are asleep - deadlock! 为什么会这样呢?程序中有main goroutine和一个main函数中启动的goroutine. 执行go func之后会启动的goroutine并不一定执行而是先放入可以运行队列中,本例中会放入p.runnext中。然后main goroutine继续执行,执行完m.Lock后,睡眠了,这时会切换执行go func程序,睡眠一秒。然后会执行c.Broadcast,因为main goroutine睡眠的是2秒时间还没到。好家伙,执行Broadcast时wati和notify是相等的,直接退出了,次goroutine也就退出了。2秒后main goroutine执行c.Wait,将main goroutine挂起,此时没有可运行的goroutine了,所以打印上面的deadlock.
package main
import (
"sync"
"time"
)
func main() {
m := sync.Mutex{}
c := sync.NewCond(&m)
go func() {
time.Sleep(1 * time.Second)
c.Broadcast()
}()
m.Lock()
time.Sleep(2 * time.Second)
c.Wait()
}
c.L.Lock()
for !condition() {
c.Wait()
}
... make use of condition ...
c.L.Unlock()
Understanding Condition Variable in Go[1]How to correctly use sync.Cond[2]
[1]
Understanding Condition Variable in Go: https://kaviraj.me/understanding-condition-variable-in-go/
[2]
How to correctly use sync.Cond: https://stackoverflow.com/questions/36857167/how-to-correctly-use-sync-cond
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有