
Reflector 是 Kubernetes client-go 中的一个核心组件,负责从 API Server 获取资源对象并将其同步到本地缓存中。本文将解析 Reflector 的实现原理及源码细节。
本文阅读代码链接:https://github.com/kubernetes/client-go/tree/release-1.30
专业描述:
Reflector(反射器)的主要功能是通过 List 和 Watch 操作,将 etcd 中的数据"反射"到本地存储(DeltaFIFO)中。它首先获取资源对象的完整列表,然后持续监视资源变化并触发相应事件处理。
大白话:
Reflector 是什么?想象一下 Reflector 就像是一个秘书,它的工作是帮你持续关注 Kubernetes 集群中的资源变化,然后把这些信息记录在本地笔记本(缓存)中。
Reflector 主要做两件事:
1.先列出所有信息(List 操作):
2.然后持续关注变化(Watch 操作):
Reflector 的核心结构定义在 tools/cache/reflector.go 文件中:
type Reflector struct {
// 反射器名称,默认为 文件:行数
name string
// 期望放到 Store 中的类型名称
expectedTypeName string
// 放到 Store 中的对象类型
expectedType reflect.Type
// 期望的 GVK(GroupVersionKind)
expectedGVK *schema.GroupVersionKind
// 存储目标,用于同步数据
store Store
// 最重要的接口,用于执行 List 和 Watch 操作
listerWatcher ListerWatcher
// 处理退避逻辑
backoffManager wait.BackoffManager
// 重新同步周期
resyncPeriod time.Duration
// 用于确定是否需要重新同步
ShouldResync func() bool
// 其他字段...
lastSyncResourceVersion string
isLastSyncResourceVersionGone bool
// ...
}ListerWatcher 是 Reflector 的核心接口,定义了 List 和 Watch 操作:
type ListerWatcher interface {
// List 返回资源对象列表
List(options metav1.ListOptions) (runtime.Object, error)
// Watch 从指定版本开始监视资源变化
Watch(options metav1.ListOptions) (watch.Interface, error)
}Reflector 通过 Run 方法启动工作:
// Run 函数:不断地使用 ListAndWatch 方法获取所有对象和它们的变化
// 当 stopCh 关闭时,Run 函数才会退出
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
// 开始工作
wait.BackoffUntil(func() {
// 调用 ListAndWatch 方法获取数据
if err := r.ListAndWatch(stopCh); err != nil {
// 如果出错,处理错误
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
// 停止工作
klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
}func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
// 记录开始监视的资源类型和来源
klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name)
var err error
var w watch.Interface
// 判断是否使用 watchList 功能(这是一个新特性)
useWatchList := ptr.Deref(r.UseWatchList, false)
fallbackToList := !useWatchList
// 尝试使用 watchList 功能
if useWatchList {
w, err = r.watchList(stopCh)
// 处理各种错误情况,如果watchList失败回退到标准的 LIST/WATCH 模式
if err != nil {
fallbackToList = true
w = nil
}
}
// 如果需要使用标准的 LIST 方式
if fallbackToList {
err = r.list(stopCh) // 执行列表操作
if err != nil {
return err
}
}
// 缓存填充完成的日志
klog.V(2).Infof("Caches populated for %v from %s", r.typeDescription, r.name)
// 设置 resync 通道和取消通道
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
// 启动重新同步协程
go r.startResync(stopCh, cancelCh, resyncerrc)
// 开始 watch 操作
return r.watch(w, stopCh, resyncerrc)
}当 fallbackToList 为 true 时,执行标准的 List 操作:
list 完成后,调用 r.watch() 方法:
同时启动一个 r.startResync() 协程:
代码中的 watchList 是一个较新的特性:
消耗更少的服务器资源关键流程:
temporaryStore = NewStore(...)SendInitialEvents=true// watchList establishes a stream to get a consistent snapshot of data
// from the server as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#proposal
//
// case 1: start at Most Recent (RV="", ResourceVersionMatch=ResourceVersionMatchNotOlderThan)
// Establishes a consistent stream with the server.
// That means the returned data is consistent, as if, served directly from etcd via a quorum read.
// It begins with synthetic "Added" events of all resources up to the most recent ResourceVersion.
// It ends with a synthetic "Bookmark" event containing the most recent ResourceVersion.
// After receiving a "Bookmark" event the reflector is considered to be synchronized.
// It replaces its internal store with the collected items and
// reuses the current watch requests for getting further events.
//
// case 2: start at Exact (RV>"0", ResourceVersionMatch=ResourceVersionMatchNotOlderThan)
// Establishes a stream with the server at the provided resource version.
// To establish the initial state the server begins with synthetic "Added" events.
// It ends with a synthetic "Bookmark" event containing the provided or newer resource version.
// After receiving a "Bookmark" event the reflector is considered to be synchronized.
// It replaces its internal store with the collected items and
// reuses the current watch requests for getting further events.源码注释描述了两种使用场景:
1.从最新资源开始 (RV=""):
2.从特定版本开始 (RV>"0"):
watchHandler 是 Reflector 中处理 watch 事件的核心方法,负责接收 API Server 发送的事件并更新本地存储。
func watchHandler(
start time.Time, // 开始时间,用于计算持续时间
w watch.Interface, // watch 接口,事件来源
store Store, // 存储接口,保存事件对象
expectedType reflect.Type, // 期望的对象类型
expectedGVK *schema.GroupVersionKind, // 期望的 GVK
name string, // 反射器名称
expectedTypeName string, // 期望类型名称
setLastSyncResourceVersion func(string), // 设置最后同步资源版本的函数
exitOnInitialEventsEndBookmark *bool, // WatchList 模式特有参数
clock clock.Clock, // 时钟接口
errc chan error, // 错误通道
stopCh <-chan struct{}, // 停止信号通道
) eventCount := 0
if exitOnInitialEventsEndBookmark != nil {
*exitOnInitialEventsEndBookmark = false
} loop:
for {
select {
case <-stopCh:
return errorStopRequested
case err := <-errc:
return err
case event, ok := <-w.ResultChan():
// 处理事件...
}
} if expectedType != nil {
if e, a := expectedType, reflect.TypeOf(event.Object); e != a {
// 处理类型不匹配
utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", name, e, a))
continue
}
} switch event.Type {
case watch.Added:
err := store.Add(event.Object)
// ...
case watch.Modified:
err := store.Update(event.Object)
// ...
case watch.Deleted:
err := store.Delete(event.Object)
// ...
case watch.Bookmark:
// 处理 Bookmark 事件
// ...
} if meta.GetAnnotations()["k8s.io/initial-events-end"] == "true" {
if exitOnInitialEventsEndBookmark != nil {
*exitOnInitialEventsEndBookmark = true
}
} setLastSyncResourceVersion(resourceVersion)
if rvu, ok := store.(ResourceVersionUpdater); ok {
rvu.UpdateResourceVersion(resourceVersion)
} if exitOnInitialEventsEndBookmark != nil && *exitOnInitialEventsEndBookmark {
watchDuration := clock.Since(start)
klog.V(4).Infof("exiting %v Watch because received the bookmark...", name)
return nil
} watchDuration := clock.Since(start)
if watchDuration < 1*time.Second && eventCount == 0 {
return fmt.Errorf("very short watch: %s: Unexpected watch close...", name)
}通用处理框架:
初始事件流结束标记:
灵活的资源版本更新:
精细的错误处理:
短 watch 检测:
watchHandler 方法是 Reflector 处理增量更新的核心,其设计既支持传统模式,又能适应新的优化策略,体现了代码的灵活性
ResourceVersion(资源版本号)是保证一致性的关键:
以 Deployment 资源为例,在创建 DeploymentInformer 时:
Reflector 的设计保证了客户端能够高效地与 API Server 同步数据,同时减轻了 API Server 的负担,是 Kubernetes 控制器模式的重要基础组件。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。