前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >high QPS for configmap GET requests in kube-apiserver - 1

high QPS for configmap GET requests in kube-apiserver - 1

作者头像
李鹤
发布2023-09-06 14:15:00
1900
发布2023-09-06 14:15:00
举报
文章被收录于专栏:k-cloud-labsk-cloud-labs

背景

线上 k8s 集群 kube-apiserver 的 ConfigMap Get 操作 QPS 较高,且同时间段 Etcd 中 ConfigMap 资源的 Get 操作 QPS 也较高,看日志多数请求的发起方是 kubelet。对应 k8s v1.22.13 版本代码,同时在 v1.28.0 测试现象相同。kube-apiserver 日志大致如下:

由来

定位此问题的过程中花了一定的时间,同时也纠正了一些有关 kubelet 内 Pod 处理的错误理解。本篇旨在描述上述现象产生的原因及潜在问题,同时也希望能帮助大家更进一步的理解 kubelet 对 Pod 的处理逻辑。

由于涉及到的逻辑较多,因此将拆分成三篇来写:

  1. ConfigMap Get 请求的来源?
  2. 为什么 QPS 高?为什么没有走 kube-apiserver 缓存?
  3. 问题如何解决?

本篇主要介绍 ConfigMap Get 请求的整个调用链路

追踪溯源

在找到具体原因之前,大概有个排查方向,因为日志中显示的 ConfigMap 都是挂载到 Pod 中作为 Volume 使用的,问题的根源大概率与此有关,因此先从这个方向来。

采取自下而上方式沿着函数调用链路看代码,找源头,首先需要找到最后调用 ConfigMap Get 的地方,如果熟悉 client-go 的话,很容易在 /pkg/kubelet 目录下根据关键字 ConfigMaps( 搜索到结果,在 /pkg/kubelet/configmap/configmap_manager.go 文件中,里面有三种类型的 manager,分别对用不同类型的参数设置,因为 kubelet 配置的 configMapAndSecretChangeDetectionStrategy: Cache,因此看 NewCachingConfigMapManager 即可,最终返回 cacheBasedManager 就是管理 ConfigMap 的。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17

// NewCachingConfigMapManager creates a manager that keeps a cache of all configmaps // necessary for registered pods. // It implement the following logic: // - whenever a pod is create or updated, the cached versions of all configmaps // are invalidated // - every GetObject() call tries to fetch the value from local cache; if it is // not there, invalidated or too old, we fetch it from apiserver and refresh the // value in cache; otherwise it is just fetched from cache func NewCachingConfigMapManager(kubeClient clientset.Interface, getTTL manager.GetObjectTTLFunc) Manager { getConfigMap := func(namespace, name string, opts metav1.GetOptions) (runtime.Object, error) { return kubeClient.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, opts) } configMapStore := manager.NewObjectStore(getConfigMap, clock.RealClock{}, getTTL, defaultTTL) return &configMapManager{ manager: manager.NewCacheBasedManager(configMapStore, getConfigMapNames), } }

NewCachingConfigMapManager 创建一个管理器,用于管理缓存的 ConfigMap 数据。

  • 当创建或更新 Pod 时,缓存中的所有 ConfigMap 数据都会被标记为无效。
  • 在每次调用 GetObject() 方法时,首先尝试从本地缓存中获取数据。如果数据不存在、已被标记为无效或已过期,会从 apiserver 获取最新数据,并刷新缓存中的数据。如果数据存在于缓存中且有效,则直接从缓存中获取。

注释写的很清楚,每次创建或者更新 Pod 时,缓存中 Pod 对应的 ConfigMap 会被标记为无效,等 GetObject 被调用时,发现本地缓存中对应的 ConfigMap 已被标记为无效,就去 apiserver 获取。一种典型的协作式逻辑,接下来围绕两个调用链来看分析: GetObject 的调用链,缓存无效标记链。

GetObject 调用链

投石问路

首先看下 GetObject 干了什么,核心逻辑还是在 Get 中,下面代码中的 klog 日志输出是为了方便调试自己加上去的,在官方代码中没有的。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59

// pkg/kubelet/util/manager/cache_based_manager.go func (c *cacheBasedManager) GetObject(namespace, name string) (runtime.Object, error) { return c.objectStore.Get(namespace, name) } func (s *objectStore) Get(namespace, name string) (runtime.Object, error) { key := objectKey{namespace: namespace, name: name} data := func() *objectData { s.lock.Lock() defer s.lock.Unlock() item, exists := s.items[key] if !exists { return nil } if item.data == nil { klog.V(5).InfoS("empty item data", "key", key.namespace+"/"+key.name) item.data = &objectData{} } return item.data }() if data == nil { return nil, fmt.Errorf("object %q/%q not registered", namespace, name) } // After updating data in objectStore, lock the data, fetch object if // needed and return data. data.Lock() defer data.Unlock() if data.err != nil || !s.isObjectFresh(data) { opts := metav1.GetOptions{} if data.object != nil && data.err == nil { // This is just a periodic refresh of an object we successfully fetched previously. // In this case, server data from apiserver cache to reduce the load on both // etcd and apiserver (the cache is eventually consistent). util.FromApiserverCache(&opts) klog.V(5).InfoS("set resourceversion 0", "key", key.namespace+"/"+key.name) } object, err := s.getObject(namespace, name, opts) if err != nil && !apierrors.IsNotFound(err) && data.object == nil && data.err == nil { // Couldn't fetch the latest object, but there is no cached data to return. // Return the fetch result instead. return object, err } if (err == nil && !isObjectOlder(object, data.object)) || apierrors.IsNotFound(err) { // If the fetch succeeded with a newer version of the object, or if the // object could not be found in the apiserver, update the cached data to // reflect the current status. data.object = object data.err = err data.lastUpdateTime = s.clock.Now() } } else { klog.V(5).InfoS("return from cache directly", "key", key.namespace+"/"+key.name) } return data.object, data.err }

objectStore 用来存储所用到的所有 ConfigMap,key 是一个由 ConfigMap 的 namespace 和 name 组成的结构体,value 是 objectStoreItem 结构体,其内维护了对应 ConfigMap 的引用计数、ConfigMap 自身、最后更新时间等属性。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33

// pkg/kubelet/util/manager/cache_based_manager.go type objectKey struct { namespace string name string uid types.UID } // objectStoreItems is a single item stored in objectStore. type objectStoreItem struct { refCount int data *objectData } type objectData struct { sync.Mutex object runtime.Object err error lastUpdateTime time.Time } // objectStore is a local cache of objects. type objectStore struct { getObject GetObjectFunc clock clock.Clock lock sync.Mutex items map[objectKey]*objectStoreItem defaultTTL time.Duration getTTL GetObjectTTLFunc }

顺藤摸瓜

沿着 GetObject 调用链路向上找,直到 configMapVolumeMounter 结构,在他的 SetUpAt 方法中有一行代码来获取 ConfigMap。

1

configMap, err := b.getConfigMap(b.pod.Namespace, b.source.Name)

很明显,结构体名字就是和挂载有关的。继续往上找看又是谁在调用 SetUpAt ,最终在 reconiler 结构的 mountAttachVolumes 方法中找到了最终的触发者 operationExecutor.MountVolume,忽略了部分不相关代码,如下

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42

// pkg/kubelet/volumemanager/reconciler/reconciler.go func (rc *reconciler) mountAttachVolumes() { // Ensure volumes that should be attached/mounted are attached/mounted. volumesToMount := rc.desiredStateOfWorld.GetVolumesToMount() for _, volumeToMount := range volumesToMount { volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName) volumeToMount.DevicePath = devicePath if cache.IsVolumeNotAttachedError(err) { ... } else if !volMounted || cache.IsRemountRequiredError(err) { // Volume is not mounted, or is already mounted, but requires remounting remountingLogStr := "" isRemount := cache.IsRemountRequiredError(err) if isRemount { remountingLogStr = "Volume is already mounted to pod, but remount was requested." } klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr)) err := rc.operationExecutor.MountVolume( rc.waitForAttachTimeout, volumeToMount.VolumeToMount, rc.actualStateOfWorld, isRemount) if err != nil && !isExpectedError(err) { // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. // Log all other errors. klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.MountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error()) } if err == nil { if remountingLogStr == "" { klog.V(1).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr)) } else { klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr)) } } } else if cache.IsFSResizeRequiredError(err) && utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes) { ... } } }

逻辑大致如下:

  1. 首先从 desiredStateOfWorld 获取需要挂载的所有的卷(ConfigMap)
  2. 遍历每个卷,看 Pod 是否存在于 actualStateOfWorld 中保存的已经挂载了卷的 Pod 列表中
  3. 如果 2 返回不存在(尚未挂载),或者需要重新挂载,则会调用 operationExecutor.MountVolume 进行挂载

之后就会一步步触发最终 GetObject 去获取要挂载的 ConfigMap,继续往上找 mountAttachVolumes 调用方,如下

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24

// pkg/kubelet/volumemanager/reconciler/reconciler.go func (rc *reconciler) Run(stopCh <-chan struct{}) { wait.Until(rc.reconciliationLoopFunc(), rc.loopSleepDuration, stopCh) } func (rc *reconciler) reconciliationLoopFunc() func() { return func() { rc.reconcile() ... } } func (rc *reconciler) reconcile() { ... // Next we mount required volumes. This function could also trigger // attach if kubelet is responsible for attaching volumes. // If underlying PVC was resized while in-use then this function also handles volume // resizing. rc.mountAttachVolumes() ... }

至此找到了最上层触发 GetObject 调用的调用者 reconciler,他在 kubelet 启动时启动,每间隔 loopSleepDuration (代码中写死的 100ms )执行一次,每次执行时都会去调用 mountAttachVolumes 最终调用 GetObject

上述流程总结如下图

缓存无效标记链

蛛丝马迹

思路同上,需要先找到标记是什么,还是得回到 GetObject 中(开头代码注释中写了)。因为从 kube-apiserver 日志中看到的现象是请求直接穿透到了 Etcd,再结合上面 Get 方法,可以知道是命中了 item.data == nil 之后执行 item.data = &objectData{},因此才没有命中 data.object != nil 的逻辑,也就没有给请求设置 ResourceVersion: 0 的参数,最终导致请求穿透到了 Etcd。在每次请求结束后,会将 data.object 赋值为刚获取到的 ConfigMap。又因为日志中已知出现请求,基本可以得到标记无效就是靠设置 item.data = nil 实现的。

代码中搜索 .data = nil 会发现只有一条匹配的

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21

func (s *objectStore) AddReference(namespace, name string) { key := objectKey{namespace: namespace, name: name} // AddReference is called from RegisterPod, thus it needs to be efficient. // Thus Add() is only increasing refCount and generation of a given object. // Then Get() is responsible for fetching if needed. s.lock.Lock() defer s.lock.Unlock() item, exists := s.items[key] if !exists { item = &objectStoreItem{ refCount: 0, data: &objectData{}, } s.items[key] = item } item.refCount++ // This will trigger fetch on the next Get() operation. item.data = nil }

注释写的也很清晰,设置 item.data = nil 会触发下次执行 Get 操作时去访问 apiserver。标记找到了,接下来就是去看下整个调用链了。

抽丝剥茧

沿着 AddReference 调用链路往上找,发现最终调用方在 syncPod 中,如下

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15

func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) { ... // ensure the kubelet knows about referenced secrets or configmaps used by the pod if !kl.podWorkers.IsPodTerminationRequested(pod.UID) { if kl.secretManager != nil { kl.secretManager.RegisterPod(pod) } if kl.configMapManager != nil { kl.configMapManager.RegisterPod(pod) } } ... }

syncPod 调用 configMapManager.RegisterPod 最终触发 AddReference 设置 item.data 为 nil,相当于标记缓存无效。到这里熟悉 kubelet 主流程的话应该就比较清楚是怎么回事了,Kubelet 会为每个 Pod 启动一个单独的 goroutine PodWorker 负责 Pod 生命周期的管理,这里盗用一个网图

有四个 chan:

  • configCh 对应从外界获取到的 Pod 信息,有三种来源 File,Http,Apiserver;
  • syncCh 对应一个 ticker,每秒触发一次,时间写死在代码中;
  • housekeepingCh 也对应一个 ticker,每 2 秒触发一次,时间同样写死在代码中;
  • plegCh 对应从 container runtime 获取到的 container 的真实信息,有个对应的结构,每秒从 container runtime 获取所有 container 信息,经过处理之后发到 plegCh;

任意 chan 内有数据了之后都会最终触发 syncPod

总结

至此整个流程已经清楚了,一句话概括就是每有 Pod 需要 Sync 时,会触发 syncPod,在 syncPod 时会调用 configMapManager.RegisterPod 标记缓存无效,reconciler goroutine 每 100ms 执行一次 mountAttachVolumes 去挂载 Pod 所有的 Volume,发现被标记失效就会重新去 apiserver 获取。

至于具体哪些 Pod 需要去挂载哪些 Volume,有另外两个数据结构存储相关信息:DesiredStateOfWorldActualStateOfWorld。上文在执行 mountAttachVolumes 的时候出现过,将会在下一篇中分析为什么 QPS 会高,为什么没有走 apiserver cache 的时候进行更详细的分析。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-08-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • 由来
  • 追踪溯源
    • GetObject 调用链
      • 投石问路
      • 顺藤摸瓜
    • 缓存无效标记链
      • 蛛丝马迹
      • 抽丝剥茧
  • 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档