首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >k8s client-go 的 Reflector 源码分析

k8s client-go 的 Reflector 源码分析

原创
作者头像
rxg456
发布2025-03-10 23:10:15
发布2025-03-10 23:10:15
2780
举报

Reflector 是 Kubernetes client-go 中的一个核心组件,负责从 API Server 获取资源对象并将其同步到本地缓存中。本文将解析 Reflector 的实现原理及源码细节。

本文阅读代码链接:https://github.com/kubernetes/client-go/tree/release-1.30

一. Reflector 的基本概念

专业描述:

Reflector(反射器)的主要功能是通过 List 和 Watch 操作,将 etcd 中的数据"反射"到本地存储(DeltaFIFO)中。它首先获取资源对象的完整列表,然后持续监视资源变化并触发相应事件处理。

大白话:

Reflector 是什么?想象一下 Reflector 就像是一个秘书,它的工作是帮你持续关注 Kubernetes 集群中的资源变化,然后把这些信息记录在本地笔记本(缓存)中。

Reflector 主要做两件事:

1.先列出所有信息(List 操作):

  • 就像新秘书第一天上班,先把所有档案都复印一份
  • 获取所有当前存在的资源对象(如 Pod、Deployment 等)

2.然后持续关注变化(Watch 操作):

  • 就像秘书坐在办公室,随时记录新文件、更新和删除的文件
  • 持续监听资源的创建、更新和删除事件

二. Reflector 的结构

Reflector 的核心结构定义在 tools/cache/reflector.go 文件中:

代码语言:go
复制
type Reflector struct {
  // 反射器名称,默认为 文件:行数
  name string
  
  // 期望放到 Store 中的类型名称
  expectedTypeName string
  
  // 放到 Store 中的对象类型
  expectedType reflect.Type
  
  // 期望的 GVK(GroupVersionKind)
  expectedGVK *schema.GroupVersionKind
  
  // 存储目标,用于同步数据
  store Store
  
  // 最重要的接口,用于执行 List 和 Watch 操作
  listerWatcher ListerWatcher
  
  // 处理退避逻辑
  backoffManager wait.BackoffManager
  
  // 重新同步周期
  resyncPeriod time.Duration
  
  // 用于确定是否需要重新同步
  ShouldResync func() bool
  
  // 其他字段...
  lastSyncResourceVersion string
  isLastSyncResourceVersionGone bool
  // ...
}

三. ListerWatcher 接口

ListerWatcher 是 Reflector 的核心接口,定义了 ListWatch 操作:

代码语言:go
复制
type ListerWatcher interface {
  // List 返回资源对象列表
  List(options metav1.ListOptions) (runtime.Object, error)
  
  // Watch 从指定版本开始监视资源变化
  Watch(options metav1.ListOptions) (watch.Interface, error)
}

四. Reflector 的工作流程

Reflector 通过 Run 方法启动工作:

代码语言:go
复制
// Run 函数:不断地使用 ListAndWatch 方法获取所有对象和它们的变化
// 当 stopCh 关闭时,Run 函数才会退出
func (r *Reflector) Run(stopCh <-chan struct{}) {
	klog.V(3).Infof("Starting reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
     // 开始工作
	wait.BackoffUntil(func() {
        // 调用 ListAndWatch 方法获取数据
		if err := r.ListAndWatch(stopCh); err != nil {
             // 如果出错,处理错误
			r.watchErrorHandler(r, err)
		}
	}, r.backoffManager, true, stopCh)
    // 停止工作
	klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
}

4.1 ListAndWatch 方法

代码语言:go
复制
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    // 记录开始监视的资源类型和来源
    klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name)
    var err error
    var w watch.Interface
    
    // 判断是否使用 watchList 功能(这是一个新特性)
    useWatchList := ptr.Deref(r.UseWatchList, false)
    fallbackToList := !useWatchList

    // 尝试使用 watchList 功能
    if useWatchList {
        w, err = r.watchList(stopCh)
        // 处理各种错误情况,如果watchList失败回退到标准的 LIST/WATCH 模式
        if err != nil {
            fallbackToList = true
            w = nil
        }
    }

    // 如果需要使用标准的 LIST 方式
    if fallbackToList {
        err = r.list(stopCh)  // 执行列表操作
        if err != nil {
            return err
        }
    }

    // 缓存填充完成的日志
    klog.V(2).Infof("Caches populated for %v from %s", r.typeDescription, r.name)

    // 设置 resync 通道和取消通道
    resyncerrc := make(chan error, 1)
    cancelCh := make(chan struct{})
    defer close(cancelCh)
    
    // 启动重新同步协程
    go r.startResync(stopCh, cancelCh, resyncerrc)
    
    // 开始 watch 操作
    return r.watch(w, stopCh, resyncerrc)
}
1. 列表操作(传统List)

当 fallbackToList 为 true 时,执行标准的 List 操作:

  • 调用 r.list(stopCh) 方法,该方法会:
  • 通过 r.listerWatcher.List() 获取资源的完整列表
  • 从结果中提取 ResourceVersion(资源版本号)
  • 将资源列表转换为对象列表
  • 通过 r.syncWith() 方法将这些对象存储到本地缓存
  • 调用 r.setLastSyncResourceVersion() 更新最后同步的资源版本号
2. 监视操作(Watch)

list 完成后,调用 r.watch() 方法:

  • 使用前面获取的 ResourceVersion 启动 Watch 操作
  • 如果 Watch 不存在,则创建新的 Watch,使用最后同步的资源版本号
  • 通过 r.listerWatcher.Watch() 建立与 API Server 的长连接
  • 调用 watchHandler 函数处理来自 Watch 的事件
  • 根据事件类型(Added、Modified、Deleted、Bookmark)更新本地缓存
  • 在处理每个事件后更新 ResourceVersion
3. 重新同步(Resync)

同时启动一个 r.startResync() 协程:

  • 根据配置的 resyncPeriod 定期触发重新同步
  • 如果 ShouldResync 返回 true,则调用 r.store.Resync()
  • 这确保即使对象没有变化,也能定期处理所有对象
WatchList 新特性

代码中的 watchList 是一个较新的特性:

  • 它试图通过单个流式请求获取初始状态和后续变更
  • 与传统的 List+Watch 相比,这种方式消耗更少的服务器资源
  • 如果 watchList 失败,会回退到标准的 List+Watch 模式
4. 列表操作(WatchList)

关键流程:

  1. 创建临时存储:temporaryStore = NewStore(...)
  2. 发送特殊 Watch 请求:设置 SendInitialEvents=true
  3. 服务器响应流程:
  • 先发送所有现有对象的 "Added" 事件
  • 最后发送带有特殊标记的 "Bookmark" 事件
  1. 同步完成后:
  • 将临时存储中的对象替换到正式存储
  • 继续使用此 Watch 连接获取后续事件
两种场景
代码语言:go
复制
// watchList establishes a stream to get a consistent snapshot of data
// from the server as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#proposal
//
// case 1: start at Most Recent (RV="", ResourceVersionMatch=ResourceVersionMatchNotOlderThan)
// Establishes a consistent stream with the server.
// That means the returned data is consistent, as if, served directly from etcd via a quorum read.
// It begins with synthetic "Added" events of all resources up to the most recent ResourceVersion.
// It ends with a synthetic "Bookmark" event containing the most recent ResourceVersion.
// After receiving a "Bookmark" event the reflector is considered to be synchronized.
// It replaces its internal store with the collected items and
// reuses the current watch requests for getting further events.
//
// case 2: start at Exact (RV>"0", ResourceVersionMatch=ResourceVersionMatchNotOlderThan)
// Establishes a stream with the server at the provided resource version.
// To establish the initial state the server begins with synthetic "Added" events.
// It ends with a synthetic "Bookmark" event containing the provided or newer resource version.
// After receiving a "Bookmark" event the reflector is considered to be synchronized.
// It replaces its internal store with the collected items and
// reuses the current watch requests for getting further events.

源码注释描述了两种使用场景:

1.从最新资源开始 (RV=""):

  • 建立与服务器的一致性流
  • 服务器发送所有现有资源的 "Added" 事件
  • 最后发送包含最新资源版本的 "Bookmark" 事件

2.从特定版本开始 (RV>"0"):

  • 从指定的资源版本开始建立流
  • 服务器发送该版本之后的所有资源的 "Added" 事件
  • 最后发送包含指定或更新版本的 "Bookmark" 事件

4.2 watchHandler 方法

watchHandler 是 Reflector 中处理 watch 事件的核心方法,负责接收 API Server 发送的事件并更新本地存储。

方法参数分析
代码语言:go
复制
func watchHandler(
    start time.Time,                            // 开始时间,用于计算持续时间
    w watch.Interface,                          // watch 接口,事件来源
    store Store,                                // 存储接口,保存事件对象
    expectedType reflect.Type,                  // 期望的对象类型
    expectedGVK *schema.GroupVersionKind,       // 期望的 GVK
    name string,                                // 反射器名称
    expectedTypeName string,                    // 期望类型名称
    setLastSyncResourceVersion func(string),    // 设置最后同步资源版本的函数
    exitOnInitialEventsEndBookmark *bool,       // WatchList 模式特有参数
    clock clock.Clock,                          // 时钟接口
    errc chan error,                            // 错误通道
    stopCh <-chan struct{},                     // 停止信号通道
)
核心工作流程
1.初始化
  • 初始化事件计数器
  • 重置 WatchList 模式的特殊标记
代码语言:go
复制
   eventCount := 0
   if exitOnInitialEventsEndBookmark != nil {
       *exitOnInitialEventsEndBookmark = false
   }
2.事件循环
  • 通过 select 多路复用监听多个通道
  • 处理停止信号、错误信号和事件信号
代码语言:go
复制
   loop:
   for {
       select {
       case <-stopCh:
           return errorStopRequested
       case err := <-errc:
           return err
       case event, ok := <-w.ResultChan():
           // 处理事件...
       }
   }
3.事件类型验证
  • 验证事件对象类型是否符合预期
  • 验证 GVK 是否符合预期
代码语言:go
复制
   if expectedType != nil {
       if e, a := expectedType, reflect.TypeOf(event.Object); e != a {
           // 处理类型不匹配
           utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", name, e, a))
           continue
       }
   }
4.根据事件类型处理
  • 根据事件类型(Added、Modified、Deleted、Bookmark)调用不同的存储方法
  • 处理可能的存储错误
代码语言:go
复制
   switch event.Type {
   case watch.Added:
       err := store.Add(event.Object)
       // ...
   case watch.Modified:
       err := store.Update(event.Object)
       // ...
   case watch.Deleted:
       err := store.Delete(event.Object)
       // ...
   case watch.Bookmark:
       // 处理 Bookmark 事件
       // ...
   }
5.书签事件特殊处理
  • 检查书签事件是否标记了初始事件流的结束
  • 在 WatchList 模式中,这表示所有初始对象已接收完毕
代码语言:go
复制
   if meta.GetAnnotations()["k8s.io/initial-events-end"] == "true" {
       if exitOnInitialEventsEndBookmark != nil {
           *exitOnInitialEventsEndBookmark = true
       }
   }
6.资源版本更新
  • 更新最后同步的资源版本
  • 如果存储实现了 ResourceVersionUpdater 接口,也更新存储的资源版本
代码语言:go
复制
   setLastSyncResourceVersion(resourceVersion)
   if rvu, ok := store.(ResourceVersionUpdater); ok {
       rvu.UpdateResourceVersion(resourceVersion)
   }
7.WatchList 模式特殊处理
  • 如果是 WatchList 模式且收到了初始事件结束书签,退出处理
代码语言:go
复制
   if exitOnInitialEventsEndBookmark != nil && *exitOnInitialEventsEndBookmark {
       watchDuration := clock.Since(start)
       klog.V(4).Infof("exiting %v Watch because received the bookmark...", name)
       return nil
   }
8.watch 结束处理
  • 计算 watch 持续时间
  • 如果 watch 时间太短且没有收到事件,返回错误
代码语言:go
复制
   watchDuration := clock.Since(start)
   if watchDuration < 1*time.Second && eventCount == 0 {
       return fmt.Errorf("very short watch: %s: Unexpected watch close...", name)
   }
特殊设计亮点

通用处理框架:

  • 同一个 watchHandler 方法既支持传统 Watch 模式,也支持新的 WatchList 模式
  • 通过 exitOnInitialEventsEndBookmark 参数区分不同模式

初始事件流结束标记:

  • 使用带有特殊注解 k8s.io/initial-events-end 的 Bookmark 事件标记初始事件流结束
  • 这是 WatchList 模式的关键机制

灵活的资源版本更新:

  • 通过函数参数 setLastSyncResourceVersion 支持不同的资源版本更新策略
  • 支持存储对象自定义的资源版本更新机制

精细的错误处理:

  • 对每种操作的错误都进行捕获和记录
  • 通过 utilruntime.HandleError 统一处理错误

短 watch 检测:

  • 检测异常短的 watch 操作(小于1秒且无事件)
  • 防止因网络问题导致的频繁重连

watchHandler 方法是 Reflector 处理增量更新的核心,其设计既支持传统模式,又能适应新的优化策略,体现了代码的灵活性

五. ResourceVersion 的重要性

ResourceVersion(资源版本号)是保证一致性的关键:

  • 每个资源对象都有 ResourceVersion
  • 每次修改(创建、更新、删除)资源时,API Server 都会更新 ResourceVersion
  • Watch 操作使用 ResourceVersion 来确定资源是否变化
  • Reflector 持续追踪最新的 ResourceVersion

六. 实际应用示例

以 Deployment 资源为例,在创建 DeploymentInformer 时:

  1. 初始化时传入 ListWatch 对象,包含 List 和 Watch 的实现
  2. List 通过 client.AppsV1().Deployments(namespace).List() 实现
  3. Watch 通过 client.AppsV1().Deployments(namespace).Watch() 实现
  4. Reflector 使用这些方法获取 Deployment 资源并监视变化

七. 总结

  1. Reflector 是客户端缓存机制的核心组件
  2. 通过 List 获取资源的完整状态,通过 Watch 监视增量变化
  3. 将数据存储到本地缓存(DeltaFIFO)中
  4. 通过 ResourceVersion 维护数据一致性
  5. 为 Informer 模式提供基础数据同步能力

Reflector 的设计保证了客户端能够高效地与 API Server 同步数据,同时减轻了 API Server 的负担,是 Kubernetes 控制器模式的重要基础组件。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一. Reflector 的基本概念
  • 二. Reflector 的结构
  • 三. ListerWatcher 接口
  • 四. Reflector 的工作流程
    • 4.1 ListAndWatch 方法
      • 1. 列表操作(传统List)
      • 2. 监视操作(Watch)
      • 3. 重新同步(Resync)
      • WatchList 新特性
      • 4. 列表操作(WatchList)
    • 4.2 watchHandler 方法
      • 方法参数分析
      • 核心工作流程
      • 特殊设计亮点
  • 五. ResourceVersion 的重要性
  • 六. 实际应用示例
  • 七. 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档