前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >Kubernetes Informer基本原理

Kubernetes Informer基本原理

作者头像
政采云前端团队
发布于 2024-01-30 05:27:22
发布于 2024-01-30 05:27:22
54000
代码可运行
举报
文章被收录于专栏:采云轩采云轩
运行总次数:0
代码可运行

不论是 k8s 自身组件,还是自己编写 controller,都需要通过 apiserver 监听 etcd 事件来完成自己的控制循环逻辑。

如何高效可靠进行事件监听,k8s 客户端工具包 client-go 提供了一个通用的 informer 包,通过 informer,可以方便和高效的进行 controller 开发。

informer 包提供了如下的一些功能:

1、本地缓存(store)

2、索引机制(indexer)

3、Handler 注册功能(eventHandler)

1、informer 架构

整个 informer 机制架构如下图(图片源自 Client-go):

可以看到这张图分为上下两个部分,上半部分由 client-go 提供,下半部分则是需要自己实现的控制循环逻辑

本文主要分析上半部分的逻辑,包括下面几个组件:

1.1、Reflector:

从图上可以看到 Reflector 是一个和 apiserver 交互的组件,通过 list 和 watch api 将资源对象压入队列

1.2、DeltaFifo:

DeltaFifo的结构体示意如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
type DeltaFIFO struct {
  ...
  // We depend on the property that items in the s    et are in
  // the queue and vice versa, and that all Deltas in this
  // map have at least one Delta.
  items map[string]Deltas
  queue []string
  ...
}

主要分为两部分,fifo 和 delta

(1)fifo:先进先出队列

对应结构体中的 queue,结构体示例如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[default/centos-fd77b5886-pfrgn, xxx, xxx]

(2)delta:对应结构体中的items,存储了资源对象并且携带了资源操作类型的一个 map,结构体示例如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
map:{"default/centos-fd77b5886-pfrgn":[{Replaced &Pod{ObjectMeta: ${pod参数}], "xxx": [{},{}]}

消费者从 queue 中 pop 出对象进行消费,并从 items 获取具体的消费操作(执行动作 Update/Deleted/Sync,和执行的对象 object spec)

1.3、Indexer:

client-go 用来存储资源对象并自带索引功能的本地存储,deltaFIFO 中 pop 出的对象将存储到 Indexer。

indexer 与 etcd 集群中的数据保持一致,从而 client-go 可以直接从本地缓存获取资源对象,减少 apiserver 和 etcd 集群的压力。

2、一个基本例子

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func main() {

  stopCh := make(chan struct{})
  defer close(stopCh)
  
  // (1)New a k8s clientset
  masterUrl := "172.27.32.110:8080"
  config, err := clientcmd.BuildConfigFromFlags(masterUrl, "")
  if err != nil {
    klog.Errorf("BuildConfigFromFlags err, err: %v", err)
  }
  
  clientset, err := k.NewForConfig(config)
  if err != nil {
    klog.Errorf("Get clientset err, err: %v", err)
  }
  
  // (2)New a sharedInformers factory
  sharedInformers := informers.NewSharedInformerFactory(clientset, defaultResync)
  
  
  // (3)Register a informer
  //  f.informers[informerType] = informer,
  //  the detail for informer is build in NewFilteredPodInformer()
  podInformer := sharedInformers.Core().V1().Pods().Informer()
  
  // (4)Register event handler
  podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
      AddFunc: func(obj interface{}) {
        mObj := obj.(v1.Object)
        klog.Infof("Get new obj: %v", mObj)
        klog.Infof("Get new obj name: %s", mObj.GetName())
      },
  })
  
  // (5)Start all informers
  sharedInformers.Start(stopCh)
  
  // (6)A cronjob for cache sync
  if !cache.WaitForCacheSync(stopCh, podInformer.HasSynced) {
    klog.Infof("Cache sync fail!")
  }
  
  // (7)Use lister
  podLister := sharedInformers.Core().V1().Pods().Lister()
  pods, err := podLister.List(labels.Everything())
  if err != nil {
    klog.Infof("err: %v", err)
  }
  klog.Infof("len(pods), %d", len(pods))
  for _, v := range pods {
    klog.Infof("pod: %s", v.Name)
  }
  
  <- stopChan
}

上面就是一个简单的 informer 的使用例子,整个过程如上述几个步骤,着重说一下(2)、(3)、(4)、(5)四个步骤

3、流程分析

3.1、New a sharedInformers factory
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
sharedInformers := informers.NewSharedInformerFactory(clientset, defaultResync)

factory := &sharedInformerFactory{
  client:           client,
  namespace:        v1.NamespaceAll,
  defaultResync:    defaultResync,
  informers:        make(map[reflect.Type]cache.SharedIndexInformer),
  startedInformers: make(map[reflect.Type]bool),
  customResync:     make(map[reflect.Type]time.Duration),
}

这个过程就是创建一个 informer 的工厂 sharedInformerFactory,sharedInformerFactory 中有一个 informers 对象,里面是一个 informer 的 map,sharedInformerFactory 是为了防止过多的重复 informer 监听 apiserver,导致 apiserver 压力过大,在同一个服务中,不同的 controller 使用同一个 informer

3.2、Register a informer

这个过程主要是生成和注册 informer 到 sharedInformerFactory

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
podInformer := sharedInformers.Core().V1().Pods().Informer()

func (f *podInformer) Informer() cache.SharedIndexInformer {
  return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}

### f.factory.InformerFor:
### 注册 informer 
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
  ...
  informer = newFunc(f.client, resyncPeriod)
  f.informers[informerType] = informer
  return informer
}

### f.defaultInformer:
### 生成 informer
func (f *podInformer) defaultInformer(client k.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
  return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

func NewFilteredPodInformer(client k.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
  return cache.NewSharedIndexInformer(
    &cache.ListWatch{
    ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
      if tweakListOptions != nil {
        tweakListOptions(&options)
      }
      return client.CoreV1().Pods(namespace).List(context.TODO(), options)
    },
    WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
      if tweakListOptions != nil {
        tweakListOptions(&options)
      }
      return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
    },
    },
    &corev1.Pod{},
    resyncPeriod,
    indexers,
  )
}

### cache.NewSharedIndexInformer:
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
  realClock := &clock.RealClock{}
  sharedIndexInformer := &sharedIndexInformer{
    processor:                       &sharedProcessor{clock: realClock},
    indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
    listerWatcher:                   lw,
    objectType:                      exampleObject,
    resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
    defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
    cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
    clock:                           realClock,
  }
  return sharedIndexInformer
}

首先通过 f.defaultInformer 方法生成 informer,然后通过 f.factory.InformerFor 方法,将 informer 注册到 sharedInformerFactory

3.3、Register event handler

这个过程展示如何注册一个回调函数,以及如何触发这个回调函数

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
### podInformer.AddEventHandler:
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
  s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}

func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {

  ...
  listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(),  initialBufferSize)
  if !s.started {
    s.processor.addListener(listener)
    return
  }
  ...
}

### s.processor.addListener(listener)func (p *sharedProcessor) addListener(listener *processorListener) {
  p.addListenerLocked(listener)
  if p.listenersStarted {
    p.wg.Start(listener.run)
    p.wg.Start(listener.pop)
  }
}

### listener.run:
func (p *processorListener) run() {
  // this call blocks until the channel is closed.  When a panic happens during the notification
  // we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
  // the next notification will be attempted.  This is usually better than the alternative of never
  // delivering again.
  stopCh := make(chan struct{})
  wait.Until(func() {
    for next := range p.nextCh {
      switch notification := next.(type) {        // 通过next结构体本身的类型来判断事件类型
      case updateNotification:
        p.handler.OnUpdate(notification.oldObj, notification.newObj)
      case addNotification:
        p.handler.OnAdd(notification.newObj)
      case deleteNotification:
        p.handler.OnDelete(notification.oldObj)
      default:
        utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
      }
    }
    // the only way to get here is if the p.nextCh is empty and closed
    close(stopCh)
  }, 1*time.Second, stopCh)
}

### listener.pop:
func (p *processorListener) pop() {

  var nextCh chan<- interface{}
  var notification interface{}
  for {
    select {
    case nextCh <- notification:
      // Notification dispatched
      var ok bool
      notification, ok = p.pendingNotifications.ReadOne()
      if !ok { // Nothing to pop
        nextCh = nil // Disable this select case
      }
    case notificationToAdd, ok := <-p.addCh:
      if !ok {
        return
      }
      if notification == nil { // No notification to pop (and pendingNotifications is empty)
        // Optimize the case - skip adding to pendingNotifications
        notification = notificationToAdd
        nextCh = p.nextCh
      } else { // There is already a notification waiting to be dispatched
        p.pendingNotifications.WriteOne(notificationToAdd)
      }
    }
  }
}

这个过程总结就是:

(1)AddEventHandler 到 sharedProcessor,注册事件回调函数到 sharedProcessor

(2)listener pop 方法里会监听 p.addCh,通过 nextCh = p.nextCh 将 addCh 将事件传递给 p.nextCh

(3)listener run 方法里会监听 p.nextCh,收到信号之后,判断是属于什么类型的方法,并且执行前面注册的 Handler

所以后面需要关注当资源对象发生变更时,是如何将变更信号给 p.addCh,进一步触发回调函数的

3.4、Start all informers

通过 sharedInformers.Start(stopCh)启动所有的 informer,代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
  for informerType, informer := range f.informers {
    if !f.startedInformers[informerType] {
      go informer.Run(stopCh)
      f.startedInformers[informerType] = true
    }
  }
}

我们的例子中其实就只启动了 PodInformer,接下来看到 podInformer 的 Run 方法做了什么

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
### go informer.Run(stopCh)func (s *sharedIndexInformer) Run(stopCh <-chan struct{}){
  defer utilruntime.HandleCrash()

  fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{   // Deltafifo
    KnownObjects:          s.indexer,
    EmitDeltaTypeReplaced: true,
  })
  cfg := &Config{
    Queue:            fifo,         // Deltafifo
    ListerWatcher:    s.listerWatcher,  // listerWatcher
    ObjectType:       s.objectType,
    FullResyncPeriod: s.resyncCheckPeriod,
    RetryOnError:     false,
    ShouldResync:     s.processor.shouldResync,
    // HandleDeltas, added to process, and done in processloop
    Process:           s.HandleDeltas,
    WatchErrorHandler: s.watchErrorHandler,
  }

  func() {
    ...
    s.controller = New(cfg)
    ...
  }
  
  s.controller.Run(stopCh)
}
### s.controller.Run(stopCh)
func (c *controller) Run(stopCh <-chan struct{}) {

  r := NewReflector(
    c.config.ListerWatcher,
    c.config.ObjectType,
    c.config.Queue,
    c.config.FullResyncPeriod,
  )
  c.reflector = r

  // Run reflector
  wg.StartWithChannel(stopCh, r.Run)  

  // Run processLoop, pop from deltafifo and do ProcessFunc,
  // ProcessFunc is the s.HandleDeltas before
  wait.Until(c.processLoop, time.Second, stopCh)
}

可以看到上面的逻辑首先生成一个 DeltaFifo,然后接下来的逻辑分为两块,生产和消费:

(1)生产—r.Run:

主要的逻辑就是利用 list and watch 将资源对象包括操作类型压入队列 DeltaFifo

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
#### r.Run:

func (r *Reflector) Run(stopCh <-chan struct{}) {
// 执行listAndWatch
if err := r.ListAndWatch(stopCh);
}

// 执行ListAndWatch流程
func (r *Reflector)ListAndWatch(stopCh <-chan struct{}) error{
  // 1、list:
  // (1)、list pods, 实际调用的是podInformer里的ListFunc方法,
  // client.CoreV1().Pods(namespace).List(context.TODO(), options)
  
  r.listerWatcher.List(opts)
  // (2)、获取资源版本号,用于watch
  resourceVersion = listMetaInterface.GetResourceVersion()
  
  //  (3)、数据转换,转换成列表
  items, err := meta.ExtractList(list)
  
  // (4)、将资源列表中的资源对象和版本号存储到DeltaFifo中
  r.syncWith(items, resourceVersion);
  
  // 2、watch,无限循环去watch apiserver,当watch到事件的时候,执行watchHandler将event事件压入fifo
  for {
    // (1)、watch pods, 实际调用的是podInformer里的WatchFunc方法,
    // client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
    w, err := r.listerWatcher.Watch(options)
    
    // (2)、watchHandler
    // watchHandler watches pod,更新DeltaFifo信息,并且更新resourceVersion
    if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh);
  }
}

### r.watchHandler
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
    ...
loop:
  for {
    select {
    case event, ok := <-w.ResultChan():
      newResourceVersion := meta.GetResourceVersion()
      switch event.Type {
      case watch.Added:
        err := r.store.Add(event.Object)    // Add event to srore, store的具体方法在fifo中
        if err != nil {
            utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
        }
      ...
      }
      *resourceVersion = newResourceVersion
      r.setLastSyncResourceVersion(newResourceVersion)
      eventCount++
    }
  }
  ...
}

### r.store.Add:
## 即为deltaFifo的add方法:

func (f *DeltaFIFO) Add(obj interface{}) error {
  ...
  return f.queueActionLocked(Added, obj)
  ...
}

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
  id, err := f.KeyOf(obj)
  if err != nil {
    return KeyError{obj, err}
  }
  newDeltas := append(f.items[id], Delta{actionType, obj})
  newDeltas = dedupDeltas(newDeltas)
  if len(newDeltas) > 0 {
    if _, exists := f.items[id]; !exists {
      f.queue = append(f.queue, id)
    }

    f.items[id] = newDeltas
    f.cond.Broadcast()          // 通知所有阻塞住的消费者
  }
  ...
  return nil
}

(2)消费—c.processLoop:

消费逻辑就是从 DeltaFifo pop 出对象,然后做两件事情:(1)触发前面注册的 eventhandler (2)更新本地索引缓存 indexer,保持数据和 etcd 一致

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (c *controller) processLoop() {
  for {
    obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
  }
}

### Queue.Pop:
## Queue.Pop是一个带有处理函数的pod方法,首先先看Pod逻辑,即为deltaFifo的pop方法:
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
  for {                       // 无限循环
    for len(f.queue) == 0 {
      f.cond.Wait()       // 阻塞直到生产端broadcast方法通知
    }
    id := f.queue[0]
    item, ok := f.items[id]
    delete(f.items, id)
    err := process(item)        // 执行处理方法
    if e, ok := err.(ErrRequeue); ok {
      f.addIfNotPresent(id, item)     // 如果处理失败的重新加入到fifo中重新处理
      err = e.Err
    }
    return item, err
  }
}

### c.config.Process:
## c.config.Process是在初始化controller的时候赋值的,即为前面的s.HandleDeltas

### s.HandleDeltas:
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
  s.blockDeltas.Lock()
  defer s.blockDeltas.Unlock()
  // from oldest to newest
  for _, d := range obj.(Deltas) {
    switch d.Type {
    case Sync, Replaced, Added, Updated:
      s.cacheMutationDetector.AddObject(d.Object)
        if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
          if err := s.indexer.Update(d.Object); err != nil {
            return err
          }
          isSync := false
          switch {
          case d.Type == Sync:
            // Sync events are only propagated to listeners that requested resync
            isSync = true
          case d.Type == Replaced:
            if accessor, err := meta.Accessor(d.Object); err == nil {
                if oldAccessor, err := meta.Accessor(old); err == nil {
                  // Replaced events that didn't change resourceVersion are treated as resync events
                  // and only propagated to listeners that requested resync
                  isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
                }
            }
          }
          s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
        } else {
          if err := s.indexer.Add(d.Object); err != nil {
            return err
          }
          s.processor.distribute(addNotification{newObj: d.Object}, false)
        }
    case Deleted:
      if err := s.indexer.Delete(d.Object); err != nil {
        return err
      }
      s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
    }
  }
  return nil
}

可以看到上面主要执行两部分逻辑:

s.processor.distribute
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
#### s.processor.distribute:
### 例如新增通知:s.processor.distribute(addNotification{newObj: d.Object}, false)
### 其中addNotification就是add类型的通知,后面会通过notification结构体的类型来执行不同的eventHandler

func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
  p.listenersLock.RLock()
  defer p.listenersLock.RUnlock()
  
  if sync {
    for _, listener := range p.syncingListeners {
      listener.add(obj)
    }
  } else {
    for _, listener := range p.listeners {
      listener.add(obj)
    }
  }
}

func (p *processorListener) add(notification interface{}) {
  p.addCh <- notification     // 新增notification到addCh
}

这里 p.addCh 对应到前面说的关注对象 p.addCh,processorListener 收到 addCh 信号之后传递给 nextCh,然后通过 notification 结构体的类型来执行不同的 eventHandler

s.indexer 的增删改:

这个就是本地数据的缓存和索引,自定义控制逻辑里面会通过 indexer 获取操作对象的具体参数,这里就不展开细讲了。

4、总结

至此一个 informer 的 client-go 部分的流程就走完了,可以看到启动 informer 主要流程就是:

1、Reflector ListAndWatch:

(1)通过一个 reflector run 起来一个带有 list 和 watch api 的 client

(2)list 到的 pod 列表通过 DeltaFifo 存储,并更新最新的 ResourceVersion

(3)继续监听 pod,监听到的 pod 操作事件继续存储到 DeltaFifo 中

2、DeltaFifo 生产和消费:

(1)生产:list and watch 到的事件生产压入队列 DeltaFifo

(2)消费:执行注册的 eventHandler,并更新本地 indexer

所以 informer 本质其实就是一个通过 deltaFifo 建立生产消费机制,并且带有本地缓存和索引,以及可以注册回调事件的 apiServer 的客户端库。

5、参考

  • https://github.com/kubernetes/sample-controller/tree/master
  • https://jimmysong.io/kubernetes-handbook/develop/client-go-informer-sourcecode-analyse.html
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-01-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 政采云技术 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
16.深入k8s:Informer使用及其源码分析
这次讲解我用了很一些图,尽可能的把这个模块给描述清楚,如果感觉对你有所帮助不妨发一封邮件激励一下我~
luozhiyun
2020/10/28
2.6K0
16.深入k8s:Informer使用及其源码分析
k8s源码分析- Informer机制
由于Informer这部分的源码比较复杂,调用链路也很长,后面的源码分析,都会围绕这一张图展开。
kinnylee
2020/10/15
5.5K0
k8s源码分析- Informer机制
kubernetes controller 解析
controller内部有个内存cache,cache 一般和lister/ indexer 一起配合使用, 用一个 Indexer interface进行的包装
王磊-字节跳动
2019/10/07
1.8K0
Kubernetes Controller Manager 工作原理
在 Kubernetes Master 节点中,有三个重要组件:ApiServer、ControllerManager、Scheduler,它们一起承担了整个集群的管理工作。本文尝试梳理清楚 ControllerManager 的工作流程和原理。
CS实验室
2021/03/22
3.1K0
Kubernetes Controller Manager 工作原理
【K8s源码品读】009:Phase 1 - kube-scheduler - Informer监听资源变化
什么是Informer?这一节,我将先抛开代码,重点讲一下这个Informer,因为它是理解k8s运行机制的核心概念。
junedayday
2021/08/05
4620
如何高效掌控K8s资源变化?K8s Informer实现机制浅析
作者:腾讯云云巢团队研发工程师 王成 导语:本文通过分析 K8s 中 Reflector(反射器)、DeletaFIFO(增量队列)、Indexer(索引器)、Controller(控制器)、SharedInformer(共享资源通知器)、processorListener(事件监听处理器)、workqueue(事件处理工作队列) 等组件,对 Informer 实现机制进行了解析。 PART ONE 概述 进入 K8s 的世界,会发现有很多的 Controller,它们都是为了完成某类资源(如 pod
腾源会
2021/09/15
5230
浅谈 K8s Informer
进入 K8s 的世界,会发现有很多的 Controller,它们都是为了完成某类资源(如 pod 是通过 DeploymentController, ReplicaSetController 进行管理)的调谐,目标是保持用户期望的状态。
astraw99
2021/09/14
1.4K2
浅谈 K8s Informer
记一次在deployment中添加灰度暂停功能
本文主要聊聊如何在k8s deployment中添加灰度暂停功能。因为是基于deployment原本支持的RollingUpdate更新方式 和 pause进行设计,所以文章中大篇幅会对deployment源码阅读分析。 k8s v1.16
你算哪块香橙夹心饼干
2021/08/04
1.4K0
记一次在deployment中添加灰度暂停功能
kubernetes client-go解析
Indexer保存了来自apiServer的资源。使用listWatch方式来维护资源的增量变化。通过这种方式可以减小对apiServer的访问,减轻apiServer端的压力
charlieroro
2020/03/24
1.3K1
kubernetes client-go解析
Kubernetes Informer机制源码解析
这篇文章来源于云原生社区组织的 Kubernetes 源码研习社的作业,是个人学习Informer机制、理解Informer各个组件的设计的总结。
CNCF
2020/09/14
1.1K0
《一起读 kubernetes 源码》揭秘 k8s 关键机制 informer
在第二章我们会去看 k8s 中常用对象的源码,不过在看这些对象之前,我们需要聊一聊 informer 机制。这个机制可以说是 k8s 设计之中的一个重点了。这个机制的设计不仅仅让代码本身变得清晰,更让整个系统的结构更容易扩展。所以这个机制需要放到第二章的第一节来说。
LinkinStar
2024/05/01
2870
《一起读 kubernetes 源码》揭秘 k8s 关键机制 informer
K8s 系列(四) - 浅谈 Informer
进入 K8s 的世界,会发现有很多的 Controller,它们都是为了完成某类资源(如 pod 是通过 DeploymentController, ReplicaSetController 进行管理)的调谐,目标是保持用户期望的状态。
astraw99
2021/09/22
1.4K1
K8s 系列(四) - 浅谈 Informer
k8s-client-go源码剖析(二)
本周是K8S源码研习社第一期第二周,学习内容是学习Informer机制,本文以这个课题进行展开。
用户2672162
2021/02/02
5090
图解 K8S 源码 - Informer 篇(上)
众所周知,在 Kubernetes 中各组件是通过 HTTP 协议进行通信的,而组件间的通信也并没有依赖任何中间件,那么如何保证消息的实时性、可靠性、顺序性呢?Informer 机制很好的解决了这个问题。Kubernetes 中各组件与 API Server 的通信都是通过 client-go 的 informer 机制来保证和完成的。
郭旭东
2020/12/30
1.1K0
图解 K8S 源码 - Informer 篇(上)
【K8s源码品读】010:Phase 1 - kube-scheduler - Informer是如何保存数据的
聚焦目标 了解Informer在发现资源变化后,是怎么处理的 目录 查看消费的过程 掌握Index数据结构 信息的分发distribute Informer的综合思考 Process func (c *controller) processLoop() { for { // Pop出Object元素 obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err ==
junedayday
2021/08/05
2980
名字服务Polaris中服务发现详解
源码地址:https://github.com/polarismesh/polaris-controller/blob/main/README-zh.md
tunsuy
2023/08/19
3370
名字服务Polaris中服务发现详解
Kubernetes之Informer机制详解
本文尝试从Informer中的Lister、Watcher、Indexer、Store及Controller 5个组件展开对其进行详细阐述。希望对您有所帮助!
锅总
2024/06/28
1.4K0
Kubernetes之Informer机制详解
kubernetes 中 informer 的使用
在实际开发过程中,若想要获取 kubernetes 中某个资源(比如 pod)的所有对象,可以使用 kubectl、k8s REST API、client-go(ClientSet、Dynamic Client、RESTClient 三种方式) 等多种方式访问 k8s 集群获取资源。在笔者的开发过程中,最初都是直接调用 k8s 的 REST API 来获取的,使用 kubectl get pod -v=9 可以直接看到调用 k8s 的接口,然后在程序中直接访问还是比较方便的。但是随着集群规模的增长或者从国内获取海外 k8s 集群的数据,直接调用 k8s 接口获取所有 pod 还是比较耗时,这个问题有多种解决方法,最初是直接使用 k8s 原生的 watch 接口来获取的,下面是一个伪代码:
田飞雨
2019/12/13
4.4K0
kubernetes 中 informer 的使用
k8s informer 是如何保证事件不丢失的?
我们知道 k8s 里重要概念之一就是 声明式 API,比如 kubectl apply 就是声明式 API的实现。
没有故事的陈师傅
2024/01/10
5750
k8s informer 是如何保证事件不丢失的?
图解K8s源码 - kube-controller-manager篇
在kubernetes master节点中最重要的三个组件是:kube-apiserver、kube-controller-manager、kube-scheduler 分别负责k8s集群的资源访问入口、集群状态管理、集群调度。我们在之前的文章介绍了集群资源访问入口kube-apiserver “图解K8s源码 - kube-apiserver篇”,本篇尝试梳理清楚 kube-controller-manager 是如何“Manage Controller”的。
才浅Coding攻略
2022/12/12
9430
图解K8s源码 - kube-controller-manager篇
相关推荐
16.深入k8s:Informer使用及其源码分析
更多 >
领券
社区富文本编辑器全新改版!诚邀体验~
全新交互,全新视觉,新增快捷键、悬浮工具栏、高亮块等功能并同时优化现有功能,全面提升创作效率和体验
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文