📢 注意,该文本非最终版本,正在更新中,版权所有,请勿转载!!
在第二章我们会去看 k8s 中常用对象的源码,不过在看这些对象之前,我们需要聊一聊 informer 机制。这个机制可以说是 k8s 设计之中的一个重点了。这个机制的设计不仅仅让代码本身变得清晰,更让整个系统的结构更容易扩展。所以这个机制需要放到第二章的第一节来说。
我第一接触 informer 是在使用 client-go 的时候。相信有很多同学和我一样,学习 k8s 的路径通常是,从基本的使用开始,然后慢慢的有一些自定义的需求需要使用 client-go 进行开发。使用 client-go 开发真的很方便,能力很强大。而在其中我第一次碰到了 informer。从了解了这个机制之后,才逐渐明白 k8s 本身是如何去控制里面的资源的。
还是一样的,本文不涉及具体这个机制的详细原理,更专注在源码本身。当然,我先通过两个小点帮助你回忆起来 informer 机制。
首先是控制循环,这个我认为是 k8s 的精髓,它通过一个循环来让整个系统趋向与我们申明的一个期望状态。 https://github.com/kubernetes/community/blob/master/contributors/devel/sig-api-machinery/controllers.md#writing-controllers
for {
实际状态 := 获取集群中对象 X 的实际状态(Actual State)
期望状态 := 获取集群中对象 X 的期望状态(Expectation State)
if 实际状态 == 期望状态{
什么都不做
} else {
执行编排动作,将实际状态调整为期望状态
}
}
下面的例子说明了 informer 的用法 https://github.com/kubernetes/community/blob/master/contributors/devel/sig-api-machinery/controllers.md#rough-structure
func NewController(pods informers.PodInformer) *Controller {
c := &Controller{
pods: pods.Lister(),
podsSynced: pods.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "controller-name"),
}
pods.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
// ...
},
UpdateFunc: func(old interface{}, new interface{}) {
// ...
},
DeleteFunc: func(obj interface{}) {
// ...
},
},)
return c
}
其他你都不需要看,关键在于 AddEventHandler
,看到它你就知道了,本质就是让你能监听一些变动的事件,当事件来的时候,你就会知道,具体你知道了之后干嘛,就是你的事情了。
下面这个图非常清晰的说明了 informer 的流程和关系,记住这个图片,后面还会用到
结合以上回忆,思考一下:如果我们希望去控制一个对象,那么我们需要知道这个对象现在的状态是什么,或者知道它发生了什么变化,变化能不能满足我们的期望,如果不满足应该怎么调整。那么,想要知道一个对象的状态有两种方式,一种是你主动去查询,对吧,而另一种就是让别人告诉你。而 informer 就是后一种。
这次寻码的原因和之前不太一样,之前我们都是为了看某一个东西的源码,就去搜索相关的代码。而这次是在 client-go 的使用过程中产生的好奇,所以从使用的角度,就很容易去寻码了。因为你需要使用这个方法去创建一个 informer ,那么你就会想知道里面究竟发生了什么对吧?那么这次我们就从这个方法 NewIndexerInformer
的 newInformer
开始。
从初始化我们就可以知道 informer 里面究竟有什么。
// staging/src/k8s.io/client-go/tools/cache/controller.go:380
func NewIndexerInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
indexers Indexers,
) (Indexer, Controller) {
// This will hold the client state, as we know it.
clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, nil)
}
这个方法可以帮助我们创建 indexer
和 informer
我们先不管什么是 indexer
。直接 newInformer
。
// staging/src/k8s.io/client-go/tools/cache/controller.go:483
func newInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
clientState Store,
transformer TransformFunc,
) Controller {
// This will hold incoming changes. Note how we pass clientState in as a
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
Transformer: transformer,
})
cfg := &Config{
Queue: fifo,
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resyncPeriod,
RetryOnError: false,
Process: func(obj interface{}, isInInitialList bool) error {
if deltas, ok := obj.(Deltas); ok {
return processDeltas(h, clientState, deltas, isInInitialList)
}
return errors.New("object given as Process argument is not Deltas")
},
}
return New(cfg)
}
好,一个迷惑点来了,返回一个 Controller
?Controller
?傻傻分不清楚,这个 Controller
和我们常说的 Controller
组件完全不是一个东西,这个 Controller
是一个接口,其实它就是 Informer
哦。
关键来了,初始化的时候里面有一个 DeltaFIFO
的队列。有一个 Process
的处理方法,我们将 h
也就是我们外部的 handler
函数放进去了,这个 h 就是我们外部用户在使用 client-go 时申明的需要如何处理事件的方法。
源码阅读技巧:你不一定非要按事件的来龙去脉来看源码,有什么看什么,最后再串起来也是可以的
由于我们现在看到了 Process
方法,知道这里是处理事件的,于是我们先看这个 processDeltas
。
// staging/src/k8s.io/client-go/tools/cache/controller.go:436
func processDeltas(
handler ResourceEventHandler,
clientState Store,
deltas Deltas,
isInInitialList bool,
) error {
for _, d := range deltas {
obj := d.Object
switch d.Type {
case Sync, Replaced, Added, Updated:
if old, exists, err := clientState.Get(obj); err == nil && exists {
clientState.Update(obj)
handler.OnUpdate(old, obj)
} else {
clientState.Add(obj)
handler.OnAdd(obj, isInInitialList)
}
case Deleted:
clientState.Delete(obj)
handler.OnDelete(obj)
}
}
return nil
}
删减了部分代码,不说一目了然,可以说是非常明确了。根据不同的事件类型,调用外部对于 handler 的方法处理事件就可以了。
那么问题就来了,这些要处理的事件是从哪里来的呢?于是我们看 processDeltas
方法的调用方 Process
,也就是从下往上找,是谁调用了 Process
方法呢?还好引用的地方不多,容易被找到关键在这里。
// staging/src/k8s.io/client-go/tools/cache/controller.go:186
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
不用多解释,这里 queue 就是我们之前在初始化看到的 DeltaFIFO
,虽然我们不知道队列里面是做了什么,但无非这个循环的意思就是,从队列中不断 Pop 出事件,然后调用 Process 去处理这个事件。而我们此时可以顺变看一眼,processLoop
方法是在 controller(Informer) Run 也就是启动的时候被一起启动了(代码这里按下不表)。_当然这个循环中有一个重试的机制,如果遇到需要重试的任务,会重新放到队列里面去,一个小的不错设计_。
那么,只要我们知道是哪里在往这个队列里面塞数据,就知道事件从哪里来了。
好,下一个坑就出现了,由于我们是倒着看的,那么我想知道谁往队列里面塞数据,如果你想要看这个 queue 有多少地方在放数据,你会发现太多了,由于 DeltaFIFO 这个实现到处都在引用,所以这样看是很难找的。于是我们需要回到原理上来。看图说话,在最上面说 informer 的流程图的时候我们可以看到有一个 Reflector
的东西在放数据。于是乎我们应该去寻找的是这个东西,你好像在哪里看到过呢?没错 Run 的时候也就是在执行 processLoop 之前。
// staging/src/k8s.io/client-go/tools/cache/controller.go:129
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
r := NewReflectorWithOptions(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
ReflectorOptions{
ResyncPeriod: c.config.FullResyncPeriod,
TypeDescription: c.config.ObjectDescription,
Clock: c.clock,
},
)
r.ShouldResync = c.config.ShouldResync
r.WatchListPageSize = c.config.WatchListPageSize
if c.config.WatchErrorHandler != nil {
r.watchErrorHandler = c.config.WatchErrorHandler
}
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}
你可以发现,初始化 Reflector 的时候将 c.config.Queue
放进去了作为它的 store
,那么关键就在这里面了。之后的链路是: r.Run
-> r.ListAndWatch
-> r.watch
-> watchHandler
, 好家伙, 链路还有点长的。好在代码并不复杂。在 watchHandler
中有如下精髓:
// staging/src/k8s.io/client-go/tools/cache/reflector.go:743
resourceVersion := meta.GetResourceVersion()
switch event.Type {
case watch.Added:
err := store.Add(event.Object)
case watch.Modified:
err := store.Update(event.Object)
case watch.Deleted:
err := store.Delete(event.Object)
case watch.Bookmark:
if _, ok := meta.GetAnnotations()["k8s.io/initial-events-end"]; ok {
if exitOnInitialEventsEndBookmark != nil {
*exitOnInitialEventsEndBookmark = true
}
}
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
}
破案了,store.Add
明白了,原来你在这里。这里我们就可以串起来了,图也就非常明白了,代码也疏通了。
Reflector
监听资源变动,将变动放到队列 DeltaFIFO
中Informer
不停的从队列中拿取,并调用外部的 handler
进行处理就像我们一开始说的那样,想要控制一个对象,你需要先知道对象的状态。那么第一个设计的优点就来了:与其不停的去通过 API 查询对象的状态,不如你自己主动去监听状态的变化。这是一种事件机制的设计,在很多地方都会用到。而主要的原因是查询的无效次数过多,而且 API 的压力又大。
而在 k8s 中太多需要监控对象的地方了,如果无论是谁来都要写一遍监控的代码,并且还要处理各种事件的解析、队列、重试..太麻烦了,于是 k8s 将其抽象为 Informer 的机制。从外部你只需要关注如何 handle 事件就可以了,从代码看有一种函数闭包的思想在里面。而且哦,关键在于 DeltaFIFO 还有各种细节的优化。
DeltaFIFO
真的是一个很不错设计,是值得我们去学习,并且在其他项目中可以直接拿来抄的一种优化方案。
Wait
等着,有元素来了会 Broadcast
通知,节省资源dedupDeltas
里面会 This will combine the most recent two deltas if they are the same.
也就所谓的压缩事件,也就 “聚合”,这是在给消费端减负,相同的事件你只需处理一次就好了,相当于这里就帮你过滤了一次,好贴心。从原理上来说 informer 本身不复杂,而且真的是一个不错的设计,从我的感受上来总结可以用两个词 事件+解耦 ,一个事件通知机制加上一个抽象解耦的实现。希望你能体会到,之后我们会用到它。当然这里介绍的是 informer 本身,它的前后都还有好多小助手哦。