前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Controller Runtime 的四种使用姿势

Controller Runtime 的四种使用姿势

作者头像
CS实验室
发布2022-04-27 19:03:12
2.6K0
发布2022-04-27 19:03:12
举报
文章被收录于专栏:CS实验室

随着云原生生态的不断发展,目前大多数基于 Kubernetes 的云原生技术,几乎都采用了 CRD + Controller 的模式。即使没有自定义 CRD,也会有需要 Controller 来检测自己感兴趣的资源,在其状态发生变更时,做一些业务所需工作。

controller-runtime 是 Kubernetes 社区提供的相对较好用的能够快速搭建一套对 ApiServer 进行 watch 的工具。本文会对 controller-runtime 的工作原理及不同场景下的使用方法做一个简单的总结和介绍。

架构

controller-runtime 的架构可以用下图概括。注:Webhook 不在本文讨论范围内,故图中舍去了 Webhook。

主要分为用户创建的 Manager 和 Reconciler 以及 Controller Runtime 自己启动的 Cache 和 Controller。先看用户侧的,Manager 是用户初始化的时候需要创建的,用来启动 Controller Runtime 的组件;Reconciler 是用户自己需要提供的组件,用于处理自己的业务逻辑。

而 controller-runtime 侧的组件,Cache 顾名思义就是缓存,用于建立 Informer 对 ApiServer 进行连接 watch 资源,并将 watch 到的 object 推入队列;Controller 一方面会向 Informer 注册 eventHandler,另一方面会从队列中拿数据并执行用户侧 Reconciler 的函数。

controller-runtime 侧整个工作流程如下:

首先 Controller 会先向 Informer 注册特定资源的 eventHandler;然后 Cache 会启动 Informer,Informer 向 ApiServer 发出请求,建立连接;当 Informer 检测到有资源变动后,使用 Controller 注册进来的 eventHandler 判断是否推入队列中;当队列中有元素被推入时,Controller 会将元素取出,并执行用户侧的 Reconciler。

用法

下面介绍几种不同场景下的使用方法。

一般用法

controller-runtime 的用法我们已经很熟悉了,最简单的用法可以用下面的代码表达:

代码语言:javascript
复制
func start() {
  scheme := runtime.NewScheme()
  _ = corev1.AddToScheme(scheme)
  // 1. init Manager
  mgr, _ := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
    Scheme: scheme,
    Port:   9443,
  })
  // 2. init Reconciler(Controller)
  _ = ctrl.NewControllerManagedBy(mgr).
    For(&corev1.Pod{}).
    Complete(&ApplicationReconciler{})

  // 3. start Manager
  if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
  }
}

type ApplicationReconciler struct {
}

func (a ApplicationReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
  return reconcile.Result{}, nil
}

第一步即初始化 Manager,同时生成一个默认配置的 Cache。

第二步是初始化 Controller。

ctrl.NewControllerManagedBy:用于创建 Controller,同时将第一步生成的 Manager 的一些配置注入到 Controller 中;•For:Controller Runtime 提供的快捷方法,用来指定 watch 的资源类型;•Owns:有时候也会用到 Owns 方法,表示某资源是我关心资源的从属,其 event 也会进去 Controller 的队列中;•Complete 也是一种快捷方法,用于生成 Controller,将用户的 Reconciler 注册进 Controller,并生成 watch 资源的默认 eventHandler,同时执行 Controller 的 watch 函数;

用户的 Reconciler 只需要实现 reconcile.Reconciler 接口即可。

最后一步就是启动 Manager,这一步中会同时启动 Cache,即启动 Informer,以及启动 Controller。

设置 EventHandler

在整个架构中,Informer 扮演的角色是对 ApiServer 进行 ListWatch,检测到自己感兴趣的资源变化时,会根据注册的 eventHandler 进行处理,并判断是否需要推入队列。

所以,在使用过程中,我们可以在创建 Controller 时,将 Informer 的 eventHandler 函数注册进去,如下:

代码语言:javascript
复制
func start() {
  scheme := runtime.NewScheme()
  _ = corev1.AddToScheme(scheme)
  // 1. init Manager
  mgr, _ := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
    Scheme: scheme,
    Port:   9443,
  })
  // 2. init Reconciler(Controller)
  c, _ := controller.New("app", mgr, controller.Options{})
  _ = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}, predicate.Funcs{
    CreateFunc: func(event event.CreateEvent) bool {
      ...
    },
    UpdateFunc: func(updateEvent event.UpdateEvent) bool {
      ...
    },
    DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
      ...
    },
  })
  // 3. start Manager
  if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
  }
}

在 predicate 中添加资源入 Queue 前的判断逻辑,可以有效防止队列被推入过多无用的资源。若我们 Reconciler 需要检测多种资源,这里 Controller 可以针对不同的资源类型,分别执行 watch,每次注册不同的 eventHandler。

设置 Cache selector

另外,我们还可以在 Informer 的 ListWatch 函数中添加有效的 LabelSelector 或 FieldSelector,进一步减少检测到的无效资源,在集群资源量大的情况下,也可以起到减少 ApiServer 压力的作用。具体如下:

代码语言:javascript
复制
func start() {
  scheme := runtime.NewScheme()
  _ = corev1.AddToScheme(scheme)
  // 1. init Manager
  mgr, _ := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
    Scheme: scheme,
    Port:   9443,
    NewCache: cache.BuilderWithOptions(cache.Options{
      Scheme: scheme,
      SelectorsByObject: cache.SelectorsByObject{
        &corev1.Pod{}: {
          Label: labels.SelectorFromSet(labels.Set{}),
        },
        &corev1.Node{}: {
          Field: fields.SelectorFromSet(fields.Set{"metadata.name": "node01"}),
        },
      },
    }),
  })
  // 2. init Reconciler(Controller)
  c, _ := controller.New("app", mgr, controller.Options{})
  _ = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}, predicate.Funcs{
    CreateFunc: func(event event.CreateEvent) bool {
      ...
    },
    UpdateFunc: func(updateEvent event.UpdateEvent) bool {
      ...
    },
    DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
      ...
    },
  })
  // 3. start Manager
  if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
  }
}

这里需要注意的是,controller-runtime 在 v0.11.0[1] 版本中才开放设置 cache selector。

方法是在初始化 Manager 时,使用 cache.BuilderWithOptions 函数,将 LabelSelector 或 FieldSelector 注册进去,同时需要将 scheme 注册进去,以便 cache 生成的 Informer 对 ApiServer 发出请求时,同时给出资源 scheme。

这里可以看下源码,Cache 会生成 3 种 Informer,分别为 structured unstructuredmetadata。启动时也会同时启动这 3 种 Informer。如下:

代码语言:javascript
复制
func NewInformersMap(config *rest.Config,
  scheme *runtime.Scheme,
  mapper meta.RESTMapper,
  resync time.Duration,
  namespace string,
  selectors SelectorsByGVK,
  disableDeepCopy DisableDeepCopyByGVK,
) *InformersMap {
  return &InformersMap{
    structured:   newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),
    unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),
    metadata:     newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),

    Scheme: scheme,
  }
}

// Start calls Run on each of the informers and sets started to true.  Blocks on the context.
func (m *InformersMap) Start(ctx context.Context) error {
  go m.structured.Start(ctx)
  go m.unstructured.Start(ctx)
  go m.metadata.Start(ctx)
  <-ctx.Done()
  return nil
}

其中,structured 为确定类型的资源,需要在 scheme 中注册对应的资源类型;unstructured 是不确定类型的资源;metadata 则是采用 protobuf 形式请求 ApiServer。

structured 为例:

代码语言:javascript
复制
func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
  // Kubernetes APIs work against Resources, not GroupVersionKinds.  Map the
  // groupVersionKind to the Resource API we will use.
  mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
  if err != nil {
    return nil, err
  }

  client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs)
  if err != nil {
    return nil, err
  }
  listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List")
  listObj, err := ip.Scheme.New(listGVK)
  if err != nil {
    return nil, err
  }

  // TODO: the functions that make use of this ListWatch should be adapted to
  //  pass in their own contexts instead of relying on this fixed one here.
  ctx := context.TODO()
  // Create a new ListWatch for the obj
  return &cache.ListWatch{
    ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
      ip.selectors(gvk).ApplyToList(&opts)
      res := listObj.DeepCopyObject()
      namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk))
      isNamespaceScoped := namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
      err := client.Get().NamespaceIfScoped(namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do(ctx).Into(res)
      return res, err
    },
    // Setup the watch function
    WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
      ip.selectors(gvk).ApplyToList(&opts)
      // Watch needs to be set to true separately
      opts.Watch = true
      namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk))
      isNamespaceScoped := namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
      return client.Get().NamespaceIfScoped(namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch(ctx)
    },
  }, nil
}

可以看到,在 Informer 的 ListWatch 接口中,p.selectors(gvk).ApplyToList(&opts) 会将我们一开始注册进来的 selector 添加到后面的 list/watch 请求中。

使用 Metadata

在上面一个例子中,我们提到 metadata 采用 protobuf[2] 序列化形式请求 ApiServer,相比默认的序列化类型 json,protobuf 形式的请求效率更高,在大规模环境中性能更好。不过,不是所有的资源类型都支持 protobuf 格式,比如 CRD 就不支持。

还有一个需要注意的点是,在 Metadata 的数据中,watch 到的数据只有 metadata,没有 spec 和 status。使用示例如下:

代码语言:javascript
复制
func start() {
  scheme := runtime.NewScheme()
  // 1. init Manager
  mgr, _ := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
    Scheme: scheme,
    Port:   9443,
    NewCache: cache.BuilderWithOptions(cache.Options{
      Scheme: scheme,
      SelectorsByObject: cache.SelectorsByObject{
        &corev1.Pod{}: {
          Label: labels.SelectorFromSet(labels.Set{}),
        },
        &corev1.Node{}: {
          Field: fields.SelectorFromSet(fields.Set{"metadata.name": "node01"}),
        },
      },
    }),
  })
  // 2. init Reconciler(Controller)
  c, _ := controller.New("app", mgr, controller.Options{})

  _ = ctrl.NewControllerManagedBy(mgr).
    For(&corev1.Pod{}).
    Complete(&ApplicationReconciler{})

  u := &metav1.PartialObjectMetadata{}
  u.SetGroupVersionKind(schema.GroupVersionKind{
    Kind:    "Pod",
    Group:   "",
    Version: "v1",
  })
  _ = c.Watch(&source.Kind{Type: u}, &handler.EnqueueRequestForObject{}, predicate.Funcs{
    CreateFunc: func(event event.CreateEvent) bool {
      return true
    },
    UpdateFunc: func(updateEvent event.UpdateEvent) bool {
      return true
    },
    DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
      return true
    },
  })
  // 3. start Manager
  if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
  }
}

在 Cache 的 metadata 数据中,采用的数据格式是 meta.v1.PartialObjectMetadata,其使用前提是用户只关心资源的 metadata,对其 spec 及 status 并不关心,所以在对 ApiServer 的 ListWatch 函数中,只获取其 metadata。源码如下:

代码语言:javascript
复制
// PartialObjectMetadata is a generic representation of any object with ObjectMeta. It allows clients
// to get access to a particular ObjectMeta schema without knowing the details of the version.
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type PartialObjectMetadata struct {
  TypeMeta `json:",inline"`
  // Standard object's metadata.
  // More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata
  // +optional
  ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
}

func createMetadataListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
  // Kubernetes APIs work against Resources, not GroupVersionKinds.  Map the
  // groupVersionKind to the Resource API we will use.
  mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
  if err != nil {
    return nil, err
  }

  // Always clear the negotiated serializer and use the one
  // set from the metadata client.
  cfg := rest.CopyConfig(ip.config)
  cfg.NegotiatedSerializer = nil

  // grab the metadata client
  client, err := metadata.NewForConfig(cfg)
  if err != nil {
    return nil, err
  }
  ctx := context.TODO()
  // create the relevant listwatch
  return &cache.ListWatch{
    ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
      ip.selectors(gvk).ApplyToList(&opts)

      var (
        list *metav1.PartialObjectMetadataList
        err  error
      )
      namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk))
      if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
        list, err = client.Resource(mapping.Resource).Namespace(namespace).List(ctx, opts)
      } else {
        list, err = client.Resource(mapping.Resource).List(ctx, opts)
      }
      if list != nil {
        for i := range list.Items {
          list.Items[i].SetGroupVersionKind(gvk)
        }
      }
      return list, err
    },
    // Setup the watch function
    WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
      ip.selectors(gvk).ApplyToList(&opts)
      // Watch needs to be set to true separately
      opts.Watch = true

      var (
        watcher watch.Interface
        err     error
      )
      namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk))
      if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
        watcher, err = client.Resource(mapping.Resource).Namespace(namespace).Watch(ctx, opts)
      } else {
        watcher, err = client.Resource(mapping.Resource).Watch(ctx, opts)
      }
      if watcher != nil {
        watcher = newGVKFixupWatcher(gvk, watcher)
      }
      return watcher, err
    },
  }, nil
}

可以看到,controller-runtime 使用的是 client-go.metadata.Client,这个 Client 的接口返回的数据格式是 PartialObjectMetadata

总结

controller-runtime 是一种很好用的生成资源控制器的工具,在平时的开发过程中,我们可以利用 controller-runtime 快速生成我们需要的资源控制器。同时,controller-runtime 也提供了很多方法,让我们不仅可以快速构建控制器,也可以针对不同的业务需求,进行灵活的配置,达到预期的效果。

References

[1] v0.11.0: https://github.com/kubernetes-sigs/controller-runtime/releases/tag/v0.11.0 [2] protobuf: https://kubernetes.io/docs/reference/using-api/api-concepts/#alternate-representations-of-resources

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-04-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 CS实验室 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 架构
  • 用法
    • 一般用法
      • 设置 EventHandler
        • 设置 Cache selector
          • 使用 Metadata
          • 总结
            • References
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档