线上 k8s 集群 kube-apiserver 的 ConfigMap Get 操作 QPS 较高,且同时间段 Etcd 中 ConfigMap 资源的 Get 操作 QPS 也较高,看日志多数请求的发起方是 kubelet。对应 k8s v1.22.13 版本代码,同时在 v1.28.0 测试现象相同。kube-apiserver 日志大致如下:
定位此问题的过程中花了一定的时间,同时也纠正了一些有关 kubelet 内 Pod 处理的错误理解。本篇旨在描述上述现象产生的原因及潜在问题,同时也希望能帮助大家更进一步的理解 kubelet 对 Pod 的处理逻辑。
由于涉及到的逻辑较多,因此将拆分成三篇来写:
本篇主要介绍 ConfigMap Get 请求的整个调用链路
在找到具体原因之前,大概有个排查方向,因为日志中显示的 ConfigMap 都是挂载到 Pod 中作为 Volume 使用的,问题的根源大概率与此有关,因此先从这个方向来。
采取自下而上方式沿着函数调用链路看代码,找源头,首先需要找到最后调用 ConfigMap Get 的地方,如果熟悉 client-go 的话,很容易在 /pkg/kubelet 目录下根据关键字 ConfigMaps(
搜索到结果,在 /pkg/kubelet/configmap/configmap_manager.go 文件中,里面有三种类型的 manager,分别对用不同类型的参数设置,因为 kubelet 配置的 configMapAndSecretChangeDetectionStrategy: Cache
,因此看 NewCachingConfigMapManager
即可,最终返回 cacheBasedManager
就是管理 ConfigMap 的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | // NewCachingConfigMapManager creates a manager that keeps a cache of all configmaps // necessary for registered pods. // It implement the following logic: // - whenever a pod is create or updated, the cached versions of all configmaps // are invalidated // - every GetObject() call tries to fetch the value from local cache; if it is // not there, invalidated or too old, we fetch it from apiserver and refresh the // value in cache; otherwise it is just fetched from cache func NewCachingConfigMapManager(kubeClient clientset.Interface, getTTL manager.GetObjectTTLFunc) Manager { getConfigMap := func(namespace, name string, opts metav1.GetOptions) (runtime.Object, error) { return kubeClient.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, opts) } configMapStore := manager.NewObjectStore(getConfigMap, clock.RealClock{}, getTTL, defaultTTL) return &configMapManager{ manager: manager.NewCacheBasedManager(configMapStore, getConfigMapNames), } } |
---|
NewCachingConfigMapManager 创建一个管理器,用于管理缓存的 ConfigMap 数据。
GetObject()
方法时,首先尝试从本地缓存中获取数据。如果数据不存在、已被标记为无效或已过期,会从 apiserver 获取最新数据,并刷新缓存中的数据。如果数据存在于缓存中且有效,则直接从缓存中获取。注释写的很清楚,每次创建或者更新 Pod 时,缓存中 Pod 对应的 ConfigMap 会被标记为无效,等 GetObject
被调用时,发现本地缓存中对应的 ConfigMap 已被标记为无效,就去 apiserver 获取。一种典型的协作式逻辑,接下来围绕两个调用链来看分析: GetObject
的调用链,缓存无效标记链。
GetObject
调用链首先看下 GetObject
干了什么,核心逻辑还是在 Get
中,下面代码中的 klog 日志输出是为了方便调试自己加上去的,在官方代码中没有的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 | // pkg/kubelet/util/manager/cache_based_manager.go func (c *cacheBasedManager) GetObject(namespace, name string) (runtime.Object, error) { return c.objectStore.Get(namespace, name) } func (s *objectStore) Get(namespace, name string) (runtime.Object, error) { key := objectKey{namespace: namespace, name: name} data := func() *objectData { s.lock.Lock() defer s.lock.Unlock() item, exists := s.items[key] if !exists { return nil } if item.data == nil { klog.V(5).InfoS("empty item data", "key", key.namespace+"/"+key.name) item.data = &objectData{} } return item.data }() if data == nil { return nil, fmt.Errorf("object %q/%q not registered", namespace, name) } // After updating data in objectStore, lock the data, fetch object if // needed and return data. data.Lock() defer data.Unlock() if data.err != nil || !s.isObjectFresh(data) { opts := metav1.GetOptions{} if data.object != nil && data.err == nil { // This is just a periodic refresh of an object we successfully fetched previously. // In this case, server data from apiserver cache to reduce the load on both // etcd and apiserver (the cache is eventually consistent). util.FromApiserverCache(&opts) klog.V(5).InfoS("set resourceversion 0", "key", key.namespace+"/"+key.name) } object, err := s.getObject(namespace, name, opts) if err != nil && !apierrors.IsNotFound(err) && data.object == nil && data.err == nil { // Couldn't fetch the latest object, but there is no cached data to return. // Return the fetch result instead. return object, err } if (err == nil && !isObjectOlder(object, data.object)) || apierrors.IsNotFound(err) { // If the fetch succeeded with a newer version of the object, or if the // object could not be found in the apiserver, update the cached data to // reflect the current status. data.object = object data.err = err data.lastUpdateTime = s.clock.Now() } } else { klog.V(5).InfoS("return from cache directly", "key", key.namespace+"/"+key.name) } return data.object, data.err } |
---|
objectStore 用来存储所用到的所有 ConfigMap,key 是一个由 ConfigMap 的 namespace 和 name 组成的结构体,value 是 objectStoreItem 结构体,其内维护了对应 ConfigMap 的引用计数、ConfigMap 自身、最后更新时间等属性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | // pkg/kubelet/util/manager/cache_based_manager.go type objectKey struct { namespace string name string uid types.UID } // objectStoreItems is a single item stored in objectStore. type objectStoreItem struct { refCount int data *objectData } type objectData struct { sync.Mutex object runtime.Object err error lastUpdateTime time.Time } // objectStore is a local cache of objects. type objectStore struct { getObject GetObjectFunc clock clock.Clock lock sync.Mutex items map[objectKey]*objectStoreItem defaultTTL time.Duration getTTL GetObjectTTLFunc } |
---|
沿着 GetObject
调用链路向上找,直到 configMapVolumeMounter
结构,在他的 SetUpAt
方法中有一行代码来获取 ConfigMap。
1 | configMap, err := b.getConfigMap(b.pod.Namespace, b.source.Name) |
---|
很明显,结构体名字就是和挂载有关的。继续往上找看又是谁在调用 SetUpAt
,最终在 reconiler 结构的 mountAttachVolumes
方法中找到了最终的触发者 operationExecutor.MountVolume
,忽略了部分不相关代码,如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | // pkg/kubelet/volumemanager/reconciler/reconciler.go func (rc *reconciler) mountAttachVolumes() { // Ensure volumes that should be attached/mounted are attached/mounted. volumesToMount := rc.desiredStateOfWorld.GetVolumesToMount() for _, volumeToMount := range volumesToMount { volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName) volumeToMount.DevicePath = devicePath if cache.IsVolumeNotAttachedError(err) { ... } else if !volMounted || cache.IsRemountRequiredError(err) { // Volume is not mounted, or is already mounted, but requires remounting remountingLogStr := "" isRemount := cache.IsRemountRequiredError(err) if isRemount { remountingLogStr = "Volume is already mounted to pod, but remount was requested." } klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr)) err := rc.operationExecutor.MountVolume( rc.waitForAttachTimeout, volumeToMount.VolumeToMount, rc.actualStateOfWorld, isRemount) if err != nil && !isExpectedError(err) { // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. // Log all other errors. klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.MountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error()) } if err == nil { if remountingLogStr == "" { klog.V(1).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr)) } else { klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr)) } } } else if cache.IsFSResizeRequiredError(err) && utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes) { ... } } } |
---|
逻辑大致如下:
desiredStateOfWorld
获取需要挂载的所有的卷(ConfigMap)actualStateOfWorld
中保存的已经挂载了卷的 Pod 列表中operationExecutor.MountVolume
进行挂载之后就会一步步触发最终 GetObject
去获取要挂载的 ConfigMap
,继续往上找 mountAttachVolumes
调用方,如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | // pkg/kubelet/volumemanager/reconciler/reconciler.go func (rc *reconciler) Run(stopCh <-chan struct{}) { wait.Until(rc.reconciliationLoopFunc(), rc.loopSleepDuration, stopCh) } func (rc *reconciler) reconciliationLoopFunc() func() { return func() { rc.reconcile() ... } } func (rc *reconciler) reconcile() { ... // Next we mount required volumes. This function could also trigger // attach if kubelet is responsible for attaching volumes. // If underlying PVC was resized while in-use then this function also handles volume // resizing. rc.mountAttachVolumes() ... } |
---|
至此找到了最上层触发 GetObject
调用的调用者 reconciler
,他在 kubelet 启动时启动,每间隔 loopSleepDuration
(代码中写死的 100ms )执行一次,每次执行时都会去调用 mountAttachVolumes
最终调用 GetObject
。
上述流程总结如下图
思路同上,需要先找到标记是什么,还是得回到 GetObject
中(开头代码注释中写了)。因为从 kube-apiserver 日志中看到的现象是请求直接穿透到了 Etcd,再结合上面 Get
方法,可以知道是命中了 item.data == nil 之后执行 item.data = &objectData{},因此才没有命中 data.object != nil 的逻辑,也就没有给请求设置 ResourceVersion: 0 的参数,最终导致请求穿透到了 Etcd。在每次请求结束后,会将 data.object 赋值为刚获取到的 ConfigMap。又因为日志中已知出现请求,基本可以得到标记无效就是靠设置 item.data = nil 实现的。
代码中搜索 .data = nil
会发现只有一条匹配的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | func (s *objectStore) AddReference(namespace, name string) { key := objectKey{namespace: namespace, name: name} // AddReference is called from RegisterPod, thus it needs to be efficient. // Thus Add() is only increasing refCount and generation of a given object. // Then Get() is responsible for fetching if needed. s.lock.Lock() defer s.lock.Unlock() item, exists := s.items[key] if !exists { item = &objectStoreItem{ refCount: 0, data: &objectData{}, } s.items[key] = item } item.refCount++ // This will trigger fetch on the next Get() operation. item.data = nil } |
---|
注释写的也很清晰,设置 item.data = nil 会触发下次执行 Get
操作时去访问 apiserver。标记找到了,接下来就是去看下整个调用链了。
沿着 AddReference
调用链路往上找,发现最终调用方在 syncPod
中,如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) { ... // ensure the kubelet knows about referenced secrets or configmaps used by the pod if !kl.podWorkers.IsPodTerminationRequested(pod.UID) { if kl.secretManager != nil { kl.secretManager.RegisterPod(pod) } if kl.configMapManager != nil { kl.configMapManager.RegisterPod(pod) } } ... } |
---|
syncPod
调用 configMapManager.RegisterPod
最终触发 AddReference
设置 item.data 为 nil,相当于标记缓存无效。到这里熟悉 kubelet 主流程的话应该就比较清楚是怎么回事了,Kubelet 会为每个 Pod 启动一个单独的 goroutine PodWorker 负责 Pod 生命周期的管理,这里盗用一个网图
有四个 chan:
任意 chan 内有数据了之后都会最终触发 syncPod
。
至此整个流程已经清楚了,一句话概括就是每有 Pod 需要 Sync 时,会触发 syncPod
,在 syncPod
时会调用 configMapManager.RegisterPod
标记缓存无效,reconciler goroutine 每 100ms 执行一次 mountAttachVolumes
去挂载 Pod 所有的 Volume,发现被标记失效就会重新去 apiserver 获取。
至于具体哪些 Pod 需要去挂载哪些 Volume,有另外两个数据结构存储相关信息:DesiredStateOfWorld
,ActualStateOfWorld
。上文在执行 mountAttachVolumes
的时候出现过,将会在下一篇中分析为什么 QPS 会高,为什么没有走 apiserver cache 的时候进行更详细的分析。