前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >条件变量Cond实现

条件变量Cond实现

作者头像
数据小冰
发布于 2022-08-15 06:56:05
发布于 2022-08-15 06:56:05
59200
代码可运行
举报
文章被收录于专栏:数据小冰数据小冰
运行总次数:0
代码可运行
Cond是什么

下面是wikipedia对条件变量的定义,大体是说条件变量总的来说是等待特定条件的线程的容器

❝A condition variable is basically a container of threads that are waiting for a certain condition. ❞

Cond是Go标准库sync包提供的条件变量原语,目的是为等待通知场景下的并发问题提供解决方法。Cond通常应用于等待某个条件的一个或一组goroutine,当等待条件变为true时,其中一个或一组所有的goroutine都被唤醒执行。通俗来说,Cond和某个条件相关,这个条件可以是一个表达式、一个bool变量或是一个函数调用,只要它们的结果是bool类型的值就行。一个或一组goroutine需要这个条件才能协同完成,在条件还没有满足的时候,所有等待该条件的goroutine都会被阻塞,当条件满足的时候,等待的goroutine才能够继续运行。举个例子,在奥运会100米短跑比赛中,将每个运动员看作一个个goroutine,只有在发令枪响之后,运动员才能开始跑,这里的发令枪响就是条件变量,只有枪响之后,也就是条件满足之后,goroutine才能运行,在枪响之前,运动员处于等待状态。

Cond使用场景

我们先通过一个例子来了解Cond解决的是什么问题,该例子来至于文末的引用1。下面的程序启动了一个goroutine,当rec.data有内容的时候,打印内容退出,没有内容的时候进行空转。main goroutine休息2秒钟后更新rec的值。编译运行下面的程序,可以看到CPU使用率高达接近100%。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
type Record struct {
 sync.Mutex
 data string
}

func main() {
 var wg sync.WaitGroup

 rec := &Record{}
 wg.Add(1)
 go func(rec *Record) {
  defer wg.Done()
  for {
   rec.Lock()
   if rec.data != "" {
    fmt.Println("Data: ", rec.data)
    rec.Unlock()
    return
   }
   rec.Unlock()
  }
 }(rec)

 time.Sleep(2 * time.Second)
 rec.Lock()
 rec.data = "gopher"
 rec.Unlock()

 wg.Wait()
}

虽然上面的程序能够工作,但是占用CPU太高,哪有没有好的办法降低CPU使用率,有同学可能想到在for循环中加入time.Sleep,但是睡眠多长时间呢?这是一个问题。睡眠太长导致检查rec.data更新不及时,睡眠太短又会浪费CPU. 所以这里的问题是,我们需要有一种机制让goroutine在等待时挂起,在事件发生时向挂起的goroutine发送信号,让他恢复运行。条件变量Cond实现了这种机制。我们可以使用Cond完成上面的需求。先上改进后的代码,具体API接口作用见下一部分实现原理中的介绍。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
type Record struct {
 sync.Mutex
 data string

 cond *sync.Cond
}

func NewRecord() *Record {
 r := Record{}
 r.cond = sync.NewCond(&r)

 return &r
}

func main() {
 var wg sync.WaitGroup

 rec := NewRecord()
 wg.Add(1)
 go func(rec *Record) {
  defer wg.Done()

  rec.Lock()
  rec.cond.Wait()
  rec.Unlock()
  fmt.Println("Data: ", rec.data)
  return
 }(rec)

 time.Sleep(2 * time.Second)
 rec.Lock()
 rec.data = "gopher"
 rec.Unlock()
 rec.cond.Signal()

 wg.Wait()
}

上面的程序使用了Cond的3个接口,分别是构造函数NewCond、等待函数Wait,通知函数Signal.启动的goroutine会阻塞等待在rec.cond.Wait()这里,直到有人发信号给他,它才会继续运行。main goroutine休眠2秒后,更新rec.data的值,然后调用rec.cond.Signal发送信号,收到信号后,启动的goroutine继续运行,最后打印data的内容并退出。通过信号机制,goroutine在条件不满足时休眠,满足时被唤醒继续执行,非常完美。

Cond实现原理

下面分析的源码是Go1.14版本,Cond实现在sync包下的cond.go文件中,代码加注释不到100行,非常简单,关键的逻辑调用了运行时中的信号量代码,本文只分析与Cond相关的代码,详细信号量代码源码分析准备专门写一篇文章。

结构体定义

Cond结构定义如下,核心字段是L和notify。noCopy和checker是辅助字段,用于检查Cond对象是否被复制使用了,因为Cond同Mutex一样,也是不能被复制的。L是一个接口,定义的有两个方法Lock和Unlock,一般将Mutex或RWMutex对象赋值给L,因为它们都实现了Locker的方法。notify是一个等待队列,调用用Wait方法后,goroutine会挂起等待在notify上。

等待队列类型为notifyList,它里面的5个字段可以分为3部分理解,lock是加锁用的。wait和notify都是一个计数器,它们的初始值都为0,每次调用Wait操作,wait的值都会增加1.wait的值可以理解为调用Wait操作程序所在的goroutine的编号,notify值表示小于它的阻塞的goroutine已经唤醒处理过,调用Signal或者Broadcast时唤醒阻塞在[notify,wait)范围编号上的goroutine。head和tail是一个单链表的头尾指针节点。

通俗理解,notifyList为一个队列,它里面存储是goroutine。wait和notify分别表示生产者和消费者的位置。这个队列是一个单链表,里面的goroutine按照wait值从小到大排列。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
type Cond struct {
 noCopy noCopy

 // 当观察或修改等待条件的时候需要加锁
 L Locker
 // 等待队列
 notify  notifyList
 checker copyChecker
}

type Locker interface {
 Lock()
 Unlock()
}

type notifyList struct {
 // wait是一个计数器,它的值为调用Wait的goroutine的编号
 wait uint32
 // notify也是一个计数器,它的值表示调用Signal或者Broadcast时唤醒阻塞在[notify,wait)
 // 范围编号上的goroutine
 notify uint32
 lock   uintptr
 // head是一个指针,指向sudog单链表的头节点,阻塞g的队列中第一个g的位置
 head unsafe.Pointer
 // tail也是一个指针,指向sudog单链表的尾节点,阻塞g的队列中最后一个g的位置
 tail unsafe.Pointer
}

Wait方法

Wait方法的核心功能就是将当前的goroutine挂起,等待Signal或者Broadcast唤醒。需要注意,Wait方法中会先进行释放锁操作,后面又会执行加锁操作。这意味用户程序在调用Wait操作之前必须加锁,Wait操作完成之后需要释放锁,否则会存在释放未加锁的锁,引发panic等问题。

现在来分析从用户调用Wait操作之前加锁到这里c.L.Unlock过程中,锁在保护哪些内容。下面t肯定是受保护了,确保了每个ticket与关联的goroutine的唯一性。还有就是在这个过程中如果有并发操作的对象也是受保护的。

Wait方法中调用的runtime_notifyListAdd和runtime_notifyListWait函数是在runtime包中的sema.go文件实现的。执行runtime_notifyListWait操作将当前的goroutine挂起阻塞等待在notify队列上,收到唤醒信号之后会恢复运行。最后执行了c.L.Lock(),与我们在用户程序调用Wait之后的释放锁操作配对。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (c *Cond) Wait() {
 // 检查Cond对象是否被复制过
 c.checker.check()
 // wait自增1
 t := runtime_notifyListAdd(&c.notify)
 // 释放锁,所以在调用Wait前,必须进行加锁
 c.L.Unlock()
 // 将当前的goroutine挂起阻塞等待在notify队列上,收到唤醒信号之后
 // 恢复运行
 runtime_notifyListWait(&c.notify, t)
 // 加锁,在执行Wait之后的代码前,Cond又被加锁了
 c.L.Lock()
}

notifyListAdd原子性的将等待队列中的wait值加1

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func notifyListAdd(l *notifyList) uint32 {
 // 将wait的值原子性操作自增1,wait的初始值为0
 return atomic.Xadd(&l.wait, 1) - 1
}

notifyListWait会创建一个sudog对象s,并设置s的ticket值,将它和当前的goroutine关联起来。然后加入到队尾。最后调用gopark将当前的goroutine挂起。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func notifyListWait(l *notifyList, t uint32) {
 lock(&l.lock)

 // 小于notify的值的对应编号阻塞的goroutine之前已经唤醒过了,直接返回
 if less(t, l.notify) {
  unlock(&l.lock)
  return
 }

 // 获取一个sudog对象s
 s := acquireSudog()
 // 设置s中的g为当前的goroutine
 s.g = getg()
 // 设置ticket值为传入的t,可以理解为ticket与当前阻塞的goroutine(s.g)对应
 s.ticket = t
 s.releasetime = 0
 t0 := int64(0)
 if blockprofilerate > 0 {
  t0 = cputicks()
  s.releasetime = -1
 }
 // 将新创建的sudog对象s加入到队列的尾部,这个过程是在lock加锁的条件下进行的
 // 不用担心并发将s加入到l.tail冲突问题
 if l.tail == nil {
  l.head = s
 } else {
  l.tail.next = s
 }
 l.tail = s
 // 调用gopark阻塞当前的goroutine运行
 goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
 if t0 != 0 {
  blockevent(s.releasetime-t0, 2)
 }
 releaseSudog(s)
}
Signal方法

Signal方法将唤醒等待队列中队头的goroutine,真正的实现在notifyListNotifyOne函数,此函数实现也在runtime包中的sema.go文件。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (c *Cond) Signal() {
 c.checker.check()
 // 唤醒notifyList队列中队头的goroutine
 runtime_notifyListNotifyOne(&c.notify)
}

notifyListNotifyOne函数找到队头中ticket为l.notify的对象,并将该对象关联的goroutine唤醒恢复运行。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func notifyListNotifyOne(l *notifyList) {
 // 如果wait和notify值相等,说明没有阻塞等待的goroutine,也就没有要唤醒的g了,这里直接返回
 if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
  return
 }
 // 加锁执行下面操作
 lock(&l.lock)

 // 加锁后再次检查wait的值跟notify是否相等,如果相等同上直接释放锁返回
 t := l.notify
 if t == atomic.Load(&l.wait) {
  unlock(&l.lock)
  return
 }

 // notify加1,相当于消费者消费一个数据(g),下面会将队列头的goroutine唤醒
 atomic.Store(&l.notify, t+1)
 // 执行循环操作,从队列中找出ticket等于notify(l.notify-1,因为此时l.notify已加1)的sudog对象
 // 从sudog对象中获取到绑定的g,然后执行readyWithTime,readyWithTime会调用goread将g唤醒
 for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
  if s.ticket == t {
   n := s.next
   if p != nil {
    p.next = n
   } else {
    l.head = n
   }
   if n == nil {
    l.tail = p
   }
   unlock(&l.lock)
   s.next = nil
   readyWithTime(s, 4)
   return
  }
 }
 // 释放锁
 unlock(&l.lock)
}
Broadcast方法

Broadcast方法会唤醒队列中所有的goroutine.

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (c *Cond) Broadcast() {
 c.checker.check()
 // 唤醒notifyList队列中所有的goroutine
 runtime_notifyListNotifyAll(&c.notify)
}

notifyListNotifyAll函数也在sema.go文件,将等待队列中所有的goroutine执行goready进行唤醒。在实现的时候,通过拷贝的方法将当前链表拷贝到临时变量s中,达到了快速释放锁。这里锁的粒度比Signal还要小,处理的非常优雅。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func notifyListNotifyAll(l *notifyList) {
 // 如果wait和notify值相等,说明没有阻塞等待的goroutine,也就没有要唤醒的g了,这里直接返回
 if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
  return
 }

 // 加锁,将当前链表拷贝到临时变量s中,然后将原链表释放
 // 之后就可以解锁了。通过是拷贝方式达到快速解锁,这里比
 // 锁的粒度比Signal还要小。
 lock(&l.lock)
 s := l.head
 l.head = nil
 l.tail = nil

 // 原子将wait的值赋值给notify,表示[notify,wait)范围内阻塞的goroutine都将被唤醒了
 atomic.Store(&l.notify, atomic.Load(&l.wait))
 unlock(&l.lock)

 // 遍历链表中每一个sudog对象,将绑定在sudog对象上的goroutine唤醒
 for s != nil {
  next := s.next
  s.next = nil
  // readyWithTime会调用goready将goroutine唤醒
  readyWithTime(s, 4)
  s = next
 }
}
Cond使用注意事项

我们先来看stackoverflow网站讨论Cond错误使用的例子。分析错误的原因,总结经验。

下面的程序会出现:fatal error: all goroutines are asleep - deadlock! 为什么会这样呢?程序中有main goroutine和一个main函数中启动的goroutine. 执行go func之后会启动的goroutine并不一定执行而是先放入可以运行队列中,本例中会放入p.runnext中。然后main goroutine继续执行,执行完m.Lock后,睡眠了,这时会切换执行go func程序,睡眠一秒。然后会执行c.Broadcast,因为main goroutine睡眠的是2秒时间还没到。好家伙,执行Broadcast时wati和notify是相等的,直接退出了,次goroutine也就退出了。2秒后main goroutine执行c.Wait,将main goroutine挂起,此时没有可运行的goroutine了,所以打印上面的deadlock.

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package main

import (
    "sync"
    "time"
)

func main() {
    m := sync.Mutex{}
    c := sync.NewCond(&m)
    go func() {
        time.Sleep(1 * time.Second)
        c.Broadcast()
    }()
    m.Lock()
    time.Sleep(2 * time.Second)
    c.Wait()
}
  1. sync.Cond不能复制使用 sync.Cond结构中有一个notifyList队列,如果复制Cond,相当于复制了notifyList值,在并发场景下不同goroutine操作的并不是同一个notifyList,会出现与预期不一致的效果,例如可能出现有些goroutine一直阻塞。
  2. 调用Wait操作前需加锁,调用完之后释放锁 Wait内部先释放了锁然后又加锁。所以在调用Wait之前必须加锁,调用完之后释放锁,否则会出现加锁和释放锁不能配对问题。
  3. Wait通常放在在for循环内部调用,例如采用如下模式,因为waiter goroutine被唤醒不等于等待条件被满足,所以唤醒之后需要进一步检查等待条件。
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  c.L.Lock()
   for !condition() {
       c.Wait()
   }
   ... make use of condition ...
   c.L.Unlock()

Understanding Condition Variable in Go[1]How to correctly use sync.Cond[2]

Reference

[1]

Understanding Condition Variable in Go: https://kaviraj.me/understanding-condition-variable-in-go/

[2]

How to correctly use sync.Cond: https://stackoverflow.com/questions/36857167/how-to-correctly-use-sync-cond

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-09-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据小冰 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验