各位读者朋友们大家好,我是随波逐流的薯条。深秋了,前几天气温骤降,北京的人和狗都不愿意出门,趴在窝里冻的打寒颤。我的书房里没装空调,暖气要十一月中旬才来,每次想学习都得下很大的决心,所以这篇文章发出来时比预期又晚了几天~
最近我的心也是冰冰的,我目前做在线数据开发,如果大家干过这活肯定知道,数据开发最重要的是数据口径,开发前一定得对清楚... 本人作为在这上面踩了很多坑的人这几天接需求时又掉进去了,一个需求在已经上线的情况下,不同来源的数据做diff总是对不上,一查就是口径不对,来来回回改了根据口径改了一遍逻辑,搞得我tm的真想和提供口径的人打一架,md。
有点扯远了,言归正传,这篇文章接着上次的 Context这三个应用场景,你知吗 继续看看context源码,读者可能觉得【Context源码,再度重相逢】这个标题比较奇怪。起这个题目是因为在下 读context源码时找了一些资料,最好的中文资料应是【码农桃花源】qcrao在19年写过的一篇关于context解析的文章,所以我在犹豫要不要写我的这篇,说实话代码都看完了不写出来吹吹牛逼总觉得有点亏。好在rao老板分析context源码基于的Go版本是1.9.2,如今Go已经1.17了,context的源码也有很多更新,于是不才就来一篇基于1.17.2的context源码分析,不多说了,发车!
context的核心作用是存储键值对和取消机制。存储键值对比较简单,取消机制比较复杂,先来看一下Context抽象出来的接口:
type Context interface {
// 如果是timerCtx或者自定义的ctx实现了此方法,返回截止时间和true,否则返回false
Deadline() (deadline time.Time, ok bool)
// 这里监听取消信号
Done() <-chan struct{}
// ctx取消时,返回对应错误,有context canceled和context deadline exceeded
Err() error
// 返回key的val
Value(key interface{}) interface{}
}
键值对ctx比较简单,先来看一下它的逻辑:要新建一个存储键值对的ctx,需要调用WithValue
,它返回一个valueCtx
地址对象。valueCtx结构体内部很简单,有个Context接口和k-v对:
type valueCtx struct {
Context
key, val interface{}
}
valueCtx
实现了Value
方法,逻辑也很简单:
func (c *valueCtx) Value(key interface{}) interface{} {
// key相同则返回key
if c.key == key {
return c.val
}
// 否则从父节点中调用Value方法继续寻找key
return c.Context.Value(key)
}
写一段代码看一下从valueCtx
中查找某个key的过程:
func main() {
ctx := context.Background()
ctx1 := context.WithValue(ctx, "name", "uutc")
ctx2 := context.WithValue(ctx1, "age", "18")
ctx3 := context.WithValue(ctx2, "traceID", "89asd7yu9asghd")
fmt.Println(ctx3.Value("name"))
}
valueCtx
是个链表模型,当我们从ctx3中查找name这个key, 最终要走到ctx1中才能返回对应的value,如图所示:
虽然链表的查找效率是O(n)的,但貌似一个请求里面也不会有1000个ctx,个人认为ctx链表的查找效率可以接受,读者有兴趣可以给go团队提个pr,把链表改成红黑树试试,嘿嘿~
context的取消机制我个人认为可以分成两种:第一种是普通取消,需要取消ctx时调用cancel函数。第二个是根据时间取消,用户可以定义一个过期time或一个deadline,到这个时间时自动取消。
现在假装没看源码,联想一下如果我们自己实现。该如何写取消。 建立ctx时, 是在parent的基础上copy一份,然后添加自己的属性, 不同协程可能持有不同的ctx,若想在请求层面做协程取消,就需要广播机制,比如在下图中:
img
若我们要取消ctx2,应分为向上取消和向下取消两部分,向下取消要把当前节点的子节点都干掉,在这里需要tx4、ctx5都取消掉;而向上取消需要把他在父节点中删除,在这里需要把自己(ctx2)从父节点ctx的子节点列表中删除;
取消这个动作本身并没有神奇的地方。ctx创建一个channel,然后协程通过select去监听这个channel,没数据时处于阻塞状态,当调用取消函数,函数内部执行close(chan)操作, select监听到关闭信号执行return,达到取消协程的目的,写个demo:
func main() {
done := make(chan struct{})
go func() {
close(done)
}()
select {
case <-done:
println("exit!")
return
}
}
下面来看go源码是怎么实现的取消,首先抽象出了一个canceler
接口,这个接口里最重要的就是cancel方法,调用这个方法可以发送取消信号,有两个结构体实现了这个接口,分别是 *cancelCtx(普通取消) 和 *timerCtx(时间取消)。
type canceler interface {
cancel(removeFromParent bool, err error)
Done() <-chan struct{}
}
cancelCtx对应前文说的普通取消机制,它是context取消机制的基石,也是源码中比较难理解的地方,先来看一下它的结构体:
type cancelCtx struct {
Context
mu sync.Mutex // context号称并发安全的基石
done atomic.Value // 用于接收ctx的取消信号,这个数据的类型做过优化,之前是 chan struct 类型
children map[canceler]struct{} // 储存此节点的实现取消接口的子节点,在根节点取消时,遍历它给子节点发送取消信息
err error // 调用取消函数时会赋值这个变量
}
若我们要生成一个可取消的ctx,需要调用WithCancel函数,这个函数的内部逻辑是:
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
c := newCancelCtx(parent) // 基于父节点,new一个CancelCtx对象
propagateCancel(parent, &c) // 挂载c到parent上
return &c, func() { c.cancel(true, Canceled) } // 返回子ctx,和返回函数
}
这里逻辑比较重的地方是propagateCancel函数和cancel方法,propagateCancel函数主要工作是把子节点挂载父节点上,下面来看看它的源码:
func propagateCancel(parent Context, child canceler) {
done := parent.Done()
// 判断父节点的done是否为nil,若为nil则为不可取消的ctx, 直接返回
if done == nil {
return
}
// 看能否从done里面读到数据,若能说明父节点已取消,取消子节点,返回即可,不能的话继续流转到后续逻辑
select {
case <-done:
child.cancel(false, parent.Err())
return
default:
}
// 调用parentCancelCtx函数,看是否能找到ctx上层最接近的可取消的父节点
if p, ok := parentCancelCtx(parent); ok {
//这里是可以找到的情况
p.mu.Lock()
// 父节点有err,说明已经取消,直接取消子节点
if p.err != nil {
child.cancel(false, p.err)
} else {
// 把本节点挂载到父节点的children map中
if p.children == nil {
p.children = make(map[canceler]struct{})
}
p.children[child] = struct{}{}
}
p.mu.Unlock()
} else {
// 若没有可取消的父节点挂载
atomic.AddInt32(&goroutines, +1)
// 新起一个协程
go func() {
select {
// 监听到父节点取消时,取消子节点
case <-parent.Done():
child.cancel(false, parent.Err())
// 监听到子节点取消时,什么都不做,退出协程
case <-child.Done():
}
}()
}
}
我看这段源码时产生了两个问题:
经过一番研究,大概解决了这俩问题,下面依次做解答。
首先看下parentCancelCtx 函数的逻辑。parentCancelCtx函数用来查找ctx最近的一个可取消的父节点,这个函数也经过了优化,原代码是:
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
for {
switch c := parent.(type) {
case *cancelCtx:
return c, true
case *timerCtx:
return &c.cancelCtx, true
case *valueCtx:
parent = c.Context
default:
return nil, false
}
}
}
这段代码比较简单,起了一个for循环,遇到*cancelCtx
和*timerCtx
类型就返回,遇到*valueCtx
则继续向上查找parent,直到找到了节点或者找不到为止。
最新版本的代码放弃粗暴的使用for{}循环寻找父节点,而是用parent.Value方法查到父节点是否能找到特定的key,由于Value是递归的,所以这里表面上看不出来循环的足迹:
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
done := parent.Done()
if done == closedchan || done == nil {
return nil, false
}
// Value是递归向上查找,直到找到有*cancelCtxKey 的ctx为止
p, ok := parent.Value(&cancelCtxKey).(*cancelCtx)
if !ok {
return nil, false
}
pdone, _ := p.done.Load().(chan struct{})
if pdone != done {
return nil, false
}
return p, true
}
知道了这个前提,我们继续研究什么条件下会走到parentCancelCtx 函数的else分支。我自己实现了一个Context,代码如下
type ContextCancel struct {
context.Context
}
func (*ContextCancel) Done() <-chan struct{} {
ch := make(chan struct{}, 100)
return ch
}
当调用这段代码时,即可走到else分支,写个demo:
func main() {
ctx := context.Background()
ctx1, _ := context.WithCancel(ctx)
ctx2 := context.WithValue(ctx1, "hello", "world")
ctx3 := ContextCancel{ctx2}
ctx4, _ := context.WithCancel(&ctx3) // 这里可以走到else分支
println(ctx4)
}
与源码中CancelCtx不同的是,我这里的Done方法只是简单返回,并没有把done的值存到Context中去。所以在执行parentCancelCtx时,这里会判断失败,返回false:
pdone, _ := p.done.Load().(chan struct{})
if pdone != done {
return nil, false
}
通过parent.Value(&cancelCtxKey).(*cancelCtx)
虽然找到了cancelCtx,但是在load Done方法值的时候却铩羽而归,parentCancelCtx 这里判断失败,最终返回nil和false,最终走到else分支。所以这个else分支主要是预防用户自己实现了一个定制的Ctx中,随意提供了一个Done chan的情况的,由于找不到可取消的父节点的,只好新起一个协程做监听。
要明白这个问题,先来看一下*cancelCtx类型的cancel方法实现:
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
// 取消时必须传入err,不然panic
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
// 已经出错了,说明已取消,直接返回
if c.err != nil {
c.mu.Unlock()
return
}
// 用户传进来的err赋给c.err
c.err = err
d, _ := c.done.Load().(chan struct{})
if d == nil {
// 这里其实和关闭chan差不多,因为后续会用closedchan作判断
c.done.Store(closedchan)
} else {
// 关闭chan
close(d)
}
// 这里是向下取消,依次取消此节点所有的子节点
for child := range c.children {
child.cancel(false, err)
}
// 清空子节点
c.children = nil
c.mu.Unlock()
// 这里是向上取消,取消此节点和父节点的联系
if removeFromParent {
removeChild(c.Context, c)
}
}
removeChild函数的逻辑比较简单,核心就是调用delete方法,在父节点的子节点中清空自己。
func removeChild(parent Context, child canceler) {
p, ok := parentCancelCtx(parent)
if !ok {
return
}
p.mu.Lock()
if p.children != nil {
delete(p.children, child) // 这里只是删除一个
}
p.mu.Unlock()
}
看完这俩函数的逻辑后,这个问题也可以回答。当父节点调用cancel函数时传递true, 其他情况传递false。
true用来向上删除,核心逻辑是调用removeChild
函数里面的的:
if p.children != nil {
delete(p.children, child) // 这里只是删除一个
}
而false就是用来非向上删除,只需要执行完cancel方法这段代码即可:
for child := range c.children {
child.cancel(false, err) // 这里把子节点都干掉
}
看到这里,ctx的普通取消机制基本差不多了,下面来看一下基于时间的取消机制。
时间取消ctx可传入两种时间,第一种是传入超时时间戳;第二种是传入ctx持续时间,比如2s之后ctx取消,持续时间在实现上是在time.Now的基础上加了个timeout凑个超时时间戳,本质上都是调用的WithDeadline
函数。
WithDeadline 函数内部new了一个timerCtx
,先来看一下这个结构体的内容:
type timerCtx struct {
cancelCtx
timer *time.Timer // 一个统一的计时器,后续通过 time.AfterFunc 使用
deadline time.Time // 过期时间戳
}
可以看到 timerCtx 内嵌了cancelCtx,实际上在超时取消这件事上,timerCtx更多负责的是超时相关的逻辑,而取消主要调用的cancelCtx的cancel方法。先来看一下WithDeadline
函数的逻辑,看如何返回一个时间Ctx:
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
// 若父节点为nil,panic
if parent == nil {
panic("cannot create context from nil parent")
}
// 如果parent有超时时间、且过期时间早于参数d,那parent取消时,child 一定需要取消,直接通过WithCancel走起
if cur, ok := parent.Deadline(); ok && cur.Before(d) {
// The current deadline is already sooner than the new one.
return WithCancel(parent)
}
// 构造一个timerCtx, 主要传入一个过期时间
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: d,
}
// 把这个节点挂载到父节点上
propagateCancel(parent, c)
dur := time.Until(d)
// 若子节点已过期,直接取消
if dur <= 0 {
c.cancel(true, DeadlineExceeded) // deadline has already passed
return c, func() { c.cancel(false, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
// 否则等到过期时间时,执行取消操作
c.timer = time.AfterFunc(dur, func() {
c.cancel(true, DeadlineExceeded)
})
}
// 返回一个ctx和一个取消函数
return c, func() { c.cancel(true, Canceled) }
}
看完源码可以知道,除了基于时间的取消,当调用CancelFunc时,也能取消超时ctx。再来看一下*timerCtx
的cancel方法的源码:
func (c *timerCtx) cancel(removeFromParent bool, err error) {
// 调用cancel的cancel取消掉它下游的ctx
c.cancelCtx.cancel(false, err)
// 取消掉它上游的ctx的连接
if removeFromParent {
removeChild(c.cancelCtx.Context, c)
}
// 把timer停掉
c.mu.Lock()
if c.timer != nil {
c.timer.Stop()
c.timer = nil
}
c.mu.Unlock()
}
至此,context源码分析的差不多了,关于这块还有个挺常见的问题,Context.TODO和Context.Backend的区别。从代码上看,他俩没啥区别,都只是emptyCtx
的对象而已,emptyCtx
源码很简单,这里不再赘述。
写这篇文章时,我在想看源码的好处什么。个人认为有两点,第一可以从源码角度看到一个概念的全部细节,第二个是可以学习大牛写代码的思路。实际上context的代码也有个迭代过程,下面列举一些阅读源码时学习到的点:
来自String() string
方法。
来自cancelCtx
源码,用atomic.Value类型替换了chan struct{}。
这点在go源码中随处可见,简单列举几处:
看源码时就感觉这个select有点突兀,一看果然为优化效率后加的~
之前的源码只是粗暴的使用Sprintf函数,后来自己搞了个stringer接口,用反射去打印Context。
截止日期已经过了,cancel已经执行过了,没必要在返回取消函数中再从父ctx中取消自己了. 感觉removeFromParent有点没抽象好,这不,作者自己都掉坑里去了。
个人感觉context代码挺值得一看的:struct里面嵌套interface,struct并不对外暴露,而是提供多个Withxxx
方法新建对象;interface对外暴露,用户可以根据需要构建自己的Context;timerCtx struct里面嵌套CancelCtx struct以此来复用cancel的逻辑等等。