前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >手摸手Go 也谈sync.WaitGroup

手摸手Go 也谈sync.WaitGroup

作者头像
用户3904122
发布2022-06-29 14:53:45
4700
发布2022-06-29 14:53:45
举报
文章被收录于专栏:光华路程序猿

最近因为工作上的事情更新会相对有点儿慢,这周末又加了天班。然后昨天好好休息了下,顺便翻了下《云雀叫了一整天》,看到一首小诗觉得不错分享给大家。

从前慢 木心 记得早先少年时 大家诚诚恳恳 说一句是一句 清早上火车站 长街黑暗无行人 卖豆浆的小店冒着热气 从前的日色变得慢 车,马,邮件都慢 一生只够爱一个人 从前的锁也好看 钥匙精美有样子 你锁了人家就懂了

小小一诗,人生尽在其中。

好了回归正题,Go sync包目前只剩sync.WaitGroup没分析了,今天起个大早赶上这一篇。

sync.WaitGroup是Go提供的一种允许一个goroutine等待一组goroutine完成任务的机制,类似于Java中的CountDownLatch。主goroutine调用Add方法设置需要等待的goroutine的数量,每个goroutine完成时调用Done方法。与此同时,wait方法用于阻塞主goroutine直到所有其他goroutine执行完毕。

基本使用

利用sync.WaitGroup完成一个多协程任务

代码语言:javascript
复制
package main

import (
 "fmt"
 "math/rand"
 "sync"
 "time"
)

func init() {
 rand.Seed(time.Now().Unix())
}
func main() {
 wg := sync.WaitGroup{}
 for i := 0; i < 3; i++ {
  wg.Add(1)
  go func() {
   time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
   fmt.Println(fmt.Sprintf("I'm finish my work at %s", time.Now().Format("2006-01-02 15:04:05")))
   wg.Done()
  }()
 }
 wg.Wait()
}

sync.WaitGroup源码分析

数据结构

代码语言:javascript
复制
// 第一次使用后不允许被拷贝
type WaitGroup struct {
 noCopy noCopy
  //64位值: 高32位为计数器 低32位为等待计数
  //64位原子操作要求64位对齐,但是32位编译器无法保证这一点。所以我们分配了12字节,
  //其中对齐的8字节作存储state 剩下的4字节存储sema
 state1 [3]uint32
}

sync.WaitGroup结构比较简单,只包含一个防止拷贝的noCopy字段和一个长度为3的uint32数组。其核心在于对state1这个字段的操作,其字段含义体现在state()方法:

代码语言:javascript
复制
// 从wg.state1返回指向state和sema的指针
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
 if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
  return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
 } else {
  return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
 }
}

可见state1字段存储了statepsemap,至于if-else的逻辑是因为原子操作需要数据8字节对齐,否则程序会panic。故而WaitGroup会选择使用uintptr(unsafe.Pointer(&wg.state1))%8 == 0先判断是否是8字节对齐,如果不是则拿4个字节做下padding。(因为目前大多数平台CPU字长都是8或4字节)关于内存对齐,如果你还不清楚,那你一定是没读过之前那篇《手摸手Go 你的内存对齐了吗

state1

字段state1剥离出了statepsemap

  • statep 表示当前WaitGroup当前的状态,它的高32位为counter表示计数器,低32位waiters表示Wait等待的goroutine数量
  • semap表示信号量,调用Waitgoroutine会被阻塞到这个信号量上

waitgroup

其核心逻辑如上图,接下来看源码

操作方法

WaitGroup提供了三个方法

代码语言:javascript
复制
func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait() 
  • Add 负责修改counter值以及释放阻塞在semp信号量的goroutine
  • Done通过调用Add递减counter值
  • Wait阻塞等待counter值为0为止
Add

Add操作入参delta可正可负,根据delta值更新counter

  • 当counter为0时,所有等待时阻塞的goroutine会被释放
  • 如果counter为负数 则Add会发生panic
代码语言:javascript
复制
func (wg *WaitGroup) Add(delta int) {
 statep, semap := wg.state()
  //累加计数器
 state := atomic.AddUint64(statep, uint64(delta)<<32)
 v := int32(state >> 32) //计数器
 w := uint32(state) //等待的goroutine数量

 if v < 0 {//counter不能为负数
  panic("sync: negative WaitGroup counter")
 }
 if w != 0 && delta > 0 && v == int32(delta) {
  panic("sync: WaitGroup misuse: Add called concurrently with Wait")
 }
 if v > 0 || w == 0 {
  return
 }
  // Add不得与Wait同时进行
  // 如果看到counter==0,则Wait不会增加等待者数量
  // 仍然进行廉价的完整性检查以检测WaitGroup的滥用
 if *statep != state { //表明Add和Wait方法同时调用了
  panic("sync: WaitGroup misuse: Add called concurrently with Wait")
 }
 // 重置waiters count 为 0.
 *statep = 0
 for ; w != 0; w-- {
    //唤醒阻塞在semap上的goroutine
  runtime_Semrelease(semap, false, 0)
 }
}

*statep!=state到这个检查点一定是counter==0并且waiters>0,且*statep!=state就panic,表明sync.WaitGroup不允许在waiters>0未执行完Wait方法过程中调用Add()Wait()方法修改statep

总结来说:当counter为零时delta为正数的调用必须在wait方法调用之前发生。当counter大于零时delta为正数或负数时的调用 随时都可能发生。通常,这意味着对Add的调用应该在创建goroutine或要等待的其他事件的语句之前执行。如果使用WaitGroup来等待几个独立的事件集,则必须在所有先前的Wait调用返回之后再进行新的Add调用。

Done

通过调用Add(-1),递减counter值

代码语言:javascript
复制
// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
 wg.Add(-1)
}
Wait

Wait 会一直阻塞到WaitGroup的counter为0为止

代码语言:javascript
复制
func (wg *WaitGroup) Wait() {
 statep, semap := wg.state()
 for {
  state := atomic.LoadUint64(statep)
  v := int32(state >> 32)
  w := uint32(state)
  if v == 0 {
   // counter为0 则不需要等待
   return
  }
  // 增加等待者数量
  if atomic.CompareAndSwapUint64(statep, state, state+1) {
   runtime_Semacquire(semap)
   if *statep != 0 {
    panic("sync: WaitGroup is reused before previous Wait has returned")
   }
   return
  }
 }
}
  1. Wait方法先是从state1中获取statepsemap
  2. 进入一个for的无限循环,atomic.LoadUint64加载statep,从而获取高32位的counter和低32位的waiters
  3. 如果counter==0,表示无需等待 直接返回
  4. 如果counter!=0,尝试将semap+1,如果失败则回到步骤一继续执行
  5. 如果atomic.CompareAndSwapUint64(statep, state, state+1)成功,则调用runtime_Semacquire(semap)将当前goroutine阻塞在信号量semap上。
  6. 检查*statep != 0则表明Wait方法未执行完毕前,WaitGroup又被复用了,此时会panic。

总结

WaitGroup源码还是比较简单的,通过原子操作state1和信号量来协调goroutine工作。其中state1的设计也可以说是内存对齐的一个最佳实践。通过阅读源码也掌握了使用WaitGroup的正确姿势:

  • Add()Done()均可修改WaitGroup的计数数,但是要保证计数不会修改为负数,否则会发生panic
  • Wait()方法必须等待全部Add()方法调用完毕之后再调用,否则也可能导致panic
  • WaitGroup是可以重复使用的。但前提是上一次的goroutine都调用Wait完毕后才能继续复用。
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-03-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 光华路程序猿 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 基本使用
  • sync.WaitGroup源码分析
    • 数据结构
      • 操作方法
        • Add
        • Done
        • Wait
    • 总结
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档