最近因为工作上的事情更新会相对有点儿慢,这周末又加了天班。然后昨天好好休息了下,顺便翻了下《云雀叫了一整天》,看到一首小诗觉得不错分享给大家。
从前慢 木心 记得早先少年时 大家诚诚恳恳 说一句是一句 清早上火车站 长街黑暗无行人 卖豆浆的小店冒着热气 从前的日色变得慢 车,马,邮件都慢 一生只够爱一个人 从前的锁也好看 钥匙精美有样子 你锁了人家就懂了
小小一诗,人生尽在其中。
好了回归正题,Go sync
包目前只剩sync.WaitGroup
没分析了,今天起个大早赶上这一篇。
sync.WaitGroup
是Go提供的一种允许一个goroutine
等待一组goroutine
完成任务的机制,类似于Java中的CountDownLatch
。主goroutine
调用Add
方法设置需要等待的goroutine
的数量,每个goroutine
完成时调用Done
方法。与此同时,wait
方法用于阻塞主goroutine
直到所有其他goroutine
执行完毕。
利用sync.WaitGroup
完成一个多协程任务
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func init() {
rand.Seed(time.Now().Unix())
}
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 3; i++ {
wg.Add(1)
go func() {
time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
fmt.Println(fmt.Sprintf("I'm finish my work at %s", time.Now().Format("2006-01-02 15:04:05")))
wg.Done()
}()
}
wg.Wait()
}
// 第一次使用后不允许被拷贝
type WaitGroup struct {
noCopy noCopy
//64位值: 高32位为计数器 低32位为等待计数
//64位原子操作要求64位对齐,但是32位编译器无法保证这一点。所以我们分配了12字节,
//其中对齐的8字节作存储state 剩下的4字节存储sema
state1 [3]uint32
}
sync.WaitGroup
结构比较简单,只包含一个防止拷贝的noCopy
字段和一个长度为3的uint32
数组。其核心在于对state1
这个字段的操作,其字段含义体现在state()
方法:
// 从wg.state1返回指向state和sema的指针
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}
可见state1
字段存储了statep
和semap
,至于if-else的逻辑是因为原子操作需要数据8字节对齐,否则程序会panic。故而WaitGroup
会选择使用uintptr(unsafe.Pointer(&wg.state1))%8 == 0
先判断是否是8字节对齐,如果不是则拿4个字节做下padding。(因为目前大多数平台CPU字长都是8或4字节)关于内存对齐
,如果你还不清楚,那你一定是没读过之前那篇《手摸手Go 你的内存对齐了吗》
state1
字段state1
剥离出了statep
和semap
statep
表示当前WaitGroup
当前的状态,它的高32位为counter
表示计数器,低32位waiters
表示Wait等待的goroutine
数量semap
表示信号量,调用Wait
的goroutine
会被阻塞到这个信号量上waitgroup
其核心逻辑如上图,接下来看源码
WaitGroup
提供了三个方法
func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
Add
操作入参delta
可正可负,根据delta
值更新counter
。
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
//累加计数器
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32) //计数器
w := uint32(state) //等待的goroutine数量
if v < 0 {//counter不能为负数
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return
}
// Add不得与Wait同时进行
// 如果看到counter==0,则Wait不会增加等待者数量
// 仍然进行廉价的完整性检查以检测WaitGroup的滥用
if *statep != state { //表明Add和Wait方法同时调用了
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 重置waiters count 为 0.
*statep = 0
for ; w != 0; w-- {
//唤醒阻塞在semap上的goroutine
runtime_Semrelease(semap, false, 0)
}
}
*statep!=state
到这个检查点一定是counter==0
并且waiters>0
,且*statep!=state
就panic,表明sync.WaitGroup
不允许在waiters>0
未执行完Wait
方法过程中调用Add()
或Wait()
方法修改statep
。
总结来说:当counter为零时delta为正数的调用必须在wait方法调用之前发生。当counter大于零时delta为正数或负数时的调用 随时都可能发生。通常,这意味着对Add的调用应该在创建goroutine或要等待的其他事件的语句之前执行。如果使用WaitGroup来等待几个独立的事件集,则必须在所有先前的Wait调用返回之后再进行新的Add调用。
通过调用Add(-1)
,递减counter值
// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
Wait 会一直阻塞到WaitGroup的counter为0为止
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
// counter为0 则不需要等待
return
}
// 增加等待者数量
if atomic.CompareAndSwapUint64(statep, state, state+1) {
runtime_Semacquire(semap)
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}
Wait
方法先是从state1
中获取statep
和semap
atomic.LoadUint64
加载statep
,从而获取高32位的counter
和低32位的waiters
。counter==0
,表示无需等待 直接返回counter!=0
,尝试将semap+1
,如果失败则回到步骤一继续执行atomic.CompareAndSwapUint64(statep, state, state+1)
成功,则调用runtime_Semacquire(semap)
将当前goroutine阻塞在信号量semap
上。*statep != 0
则表明Wait
方法未执行完毕前,WaitGroup
又被复用了,此时会panic。WaitGroup
源码还是比较简单的,通过原子操作state1
和信号量来协调goroutine
工作。其中state1
的设计也可以说是内存对齐的一个最佳实践。通过阅读源码也掌握了使用WaitGroup
的正确姿势:
Add()
和Done()
均可修改WaitGroup
的计数数,但是要保证计数不会修改为负数,否则会发生panicWait()
方法必须等待全部Add()
方法调用完毕之后再调用,否则也可能导致panicWaitGroup
是可以重复使用的。但前提是上一次的goroutine
都调用Wait完毕后才能继续复用。