前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >《一起读 kubernetes 源码》简单的 DaemonSet

《一起读 kubernetes 源码》简单的 DaemonSet

作者头像
LinkinStar
发布2024-09-15 12:44:54
760
发布2024-09-15 12:44:54
举报
文章被收录于专栏:LinkinStar's Blog

📢 注意,该文本非最终版本,正在更新中,版权所有,请勿转载!!

前言

相比较于 deployment 和 StatefulSet,DaemonSet 是更简单的一个,也是最不常用的一个对象了。对于应用开发的同学来说可能几乎见不到它,而对于运维或者 SRE 的同学可能会熟悉一些。DaemonSet 用于确保集群中的每个节点运行有且仅有一个 pod 实例的场景。两个最常见的场景是:日志收集和监控。日志收集是为了收集每个节点上的日志,而监控则是为了监控每个节点的一些数据指标。通常来说以全局平台或者节点为场景的情况下才会想到它。那么 DaemonSet 的如何保证每个节点 pod 的数量呢?这一节让我从源码的角度看看它是如何实现的。

前置知识

  • DaemonSet 的基本使用

码前讨论

首先代码位置就不多说了,有前面的经验。

代码语言:javascript
复制
kubernetes/pkg/controller/daemon

由于前面我们已经看过了 deployment、rs、StatefulSet,那么其实对于 DaemonSet,我们也是一样几乎大致的形态结构都已经可以八九不离十了,而且它只有 daemon_controller.goupdate.go 两个文件,就像我前面说的也它其实很简单,并且功能也不复杂。所以这次我们换一种方式来认识源码,放大之前提问的部分。我们在最开始第一节的时候就提到过,看源码之前提几个问题能帮助我们快速进入状态和定位关键。而对于熟悉的结构,我们更可以通过这样的方式来快速阅读源码,而非逐字逐句去做翻译。

码前提问

问题 1

我们知道 DaemonSet 确保集群中每个节点有且仅有一个 pod ,那么当节点数量变化的时候,它一定会随之改变,那么 DaemonSet 的 controller 是如何感知这个变化的呢?如果是你去编写,你会从何处入手?在看源码之前你可以先大胆假设一下。

问题 2

关键的问题在于 DaemonSet 是如何保证集群中每个节点有且仅有一个 pod 的呢?需要做哪些设置呢。同样的,再看源码之前,你可以先问问自己,不是 DaemonSet 的情况,如果是一个普通的 deployment 你能否做的让 pod 调度到每个节点一个?如果可以,那么 DaemonSet 或许就是类似的思路。

问题 3

为了保证 pod 的关系和数量,我会猜测 DaemonSet 可能需要存 node 和 pod 的对应关系,如果有,是存在了哪里?

你可以先不看下面的分析,自己去寻找这三个问题的答案,找到之后再回来核对,看看是否与你的想法一致。

源码分析

问题 1

DaemonSet 是如何感知节点的变化的?

第一个问题相对来说比较简单。由于我们之前看过的所有对象来说,无论是对象本身的变化,还是 pod 的变化都是通过 informer 机制来告诉 controller 的。所以 node 的变化也无意外,也是通过这样的事件机制来做的。

代码语言:javascript
复制
// pkg/controller/daemon/daemon_controller.go:134
// NewDaemonSetsController creates a new DaemonSetsController
func NewDaemonSetsController(
	ctx context.Context,
	daemonSetInformer appsinformers.DaemonSetInformer,
	historyInformer appsinformers.ControllerRevisionInformer,
	podInformer coreinformers.PodInformer,
	nodeInformer coreinformers.NodeInformer,
	kubeClient clientset.Interface,
	failedPodsBackoff *flowcontrol.Backoff,
) (*DaemonSetsController, error) {
	eventBroadcaster := record.NewBroadcaster()
	// ...

	nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			dsc.addNode(logger, obj)
		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			dsc.updateNode(logger, oldObj, newObj)
		},
	},
	)
	dsc.nodeStoreSynced = nodeInformer.Informer().HasSynced
	dsc.nodeLister = nodeInformer.Lister()

	// ...

	return dsc, nil
}

NewDaemonSetsController 方法中可以明确看到,通过 nodeInformer 添加了有关节点变化的 event 处理方法,当有对应事件的时候,也就是 node 有变化的时候我们就能知道,并做出相应的调整。

如果这部分你能在看源码之前猜测到,那我觉得对于 informer 整个机制应该是真的掌握了。

问题 2

DaemonSet 是如何保证集群中每个节点有且仅有一个 pod 的?

这个问题稍微复杂一些,考查了你对于 k8s 一些基础概念的了解。我特别也没有在前置知识里面提及是怕过早公布答案。首先,让我们来想一下后面一个小问题,也就是如何让 deployment 能均匀分布到各个节点上去。

如果想把某个 pod 直接调度到特定的节点上,我们可以直接在 spec 下配置 nodeName 来解决。

代码语言:javascript
复制
apiVersion: v1
kind: Pod
metadata:
  name: nginx
spec:
  nodeName: foo-node # 调度 Pod 到特定的节点

而对于整个对象 deployment 或者是 statefulset,那么答案是 亲和性 。比如官方就给出过对于 zk 的部署最佳实践中就提到,让 statefulset 的 pod 分布到不同的节点,以保证更好的高可用,不会因为所有 pod 都在一个节点,而这个节点挂了就一起挂了的情况。如下:https://kubernetes.io/zh-cn/docs/tutorials/stateful-application/zookeeper/#tolerating-node-failure

代码语言:javascript
复制
affinity:
  podAntiAffinity:
    requiredDuringSchedulingIgnoredDuringExecution:
      - labelSelector:
          matchExpressions:
            - key: "app"
              operator: In
              values:
                - zk
        topologyKey: "kubernetes.io/hostname"

那么 DaemonSet 很大程度上会参考这样的规则,让调度器能把 pod 按照我们的要求每个节点调度一个。

于是乎,我们可以在源码中寻找来印证我们的想法:

代码语言:javascript
复制
// pkg/controller/daemon/daemon_controller.go:993
// syncNodes deletes given pods and creates new daemon set pods on the given nodes
// returns slice with errors if any
func (dsc *DaemonSetsController) syncNodes(ctx context.Context, ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error {
	// ...

	batchSize := integer.IntMin(createDiff, controller.SlowStartInitialBatchSize)
	for pos := 0; createDiff > pos; batchSize, pos = integer.IntMin(2*batchSize, createDiff-(pos+batchSize)), pos+batchSize {
		errorCount := len(errCh)
		createWait.Add(batchSize)
		for i := pos; i < pos+batchSize; i++ {
			go func(ix int) {
				defer createWait.Done()

				podTemplate := template.DeepCopy()
				// The pod's NodeAffinity will be updated to make sure the Pod is bound
				// to the target node by default scheduler. It is safe to do so because there
				// should be no conflicting node affinity with the target node.
				podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity(
					podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix])

				err := dsc.podControl.CreatePods(ctx, ds.Namespace, podTemplate,
					ds, metav1.NewControllerRef(ds, controllerKind))

				// ...
			}(i)
		}
		createWait.Wait()
		// any skipped pods that we never attempted to start shouldn't be expected.
		skippedPods := createDiff - (batchSize + pos)
		// ...
	}

	// ...
}

我们可以看到在 syncNodes 方法中 dsc.podControl.CreatePods 之前,除了将原有的所有 template 属性 DeepCopy 了一份之外,单独处理了 Affinity (亲和性)并且处理的条件是什么呢?也就是 ReplaceDaemonSetPodNodeNameNodeAffinity 的第二个参数

代码语言:javascript
复制
func ReplaceDaemonSetPodNodeNameNodeAffinity(affinity *v1.Affinity, nodename string) *v1.Affinity {

nodename 破案了~ 所以,其实 DaemonSet 就是靠着来实现的,其他都是浮云,本质其实挺简单的。其实复杂的部分都给调度器了。

问题 3

为了保证 pod 的关系和数量,我会猜测 DaemonSet 可能需要存 node 和 pod 的对应关系,如果有,是存在了哪里?

这是一个很容易被疑惑和误导的问题,其实有了问题 2 做铺垫,这个问题也就能瞥见一点了。如果没有看过源码,你或许就可能会想,DaemonSet 应该存储了节点和 pod 的对应关系,方便在选择的时候选择合适的节点,并且当新来的时候可以确认当前没有 pod 的节点是哪一个。而事实并不是这样。DaemonSet 并不会保存这样的对应关系。有一个显然的理由是,在问题 2 中我们已经看到,pod 的调度完全是依靠调度器去完成的,控制器仅仅只是描述信息罢了,最终 pod 会调度到哪里其实并不归他管。

但是,DaemonSet 也必须要知道这个对应关系,没有这个关系,无论是后续更新还是本身的状态变化都需要依赖这个部分。于是乎,我们可以在 rollingUpdate 的时候发现它是如何操作的。

代码语言:javascript
复制
// pkg/controller/daemon/update.go:42
// rollingUpdate identifies the set of old pods to delete, or additional pods to create on nodes,
// remaining within the constraints imposed by the update strategy.
func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
	logger := klog.FromContext(ctx)
	nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds, false)
	if err != nil {
		return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
	}
	maxSurge, maxUnavailable, desiredNumberScheduled, err := dsc.updatedDesiredNodeCounts(ctx, ds, nodeList, nodeToDaemonPods)

rollingUpdate 方法显然是用于执行 DaemonSet 滚动更新的时候用的,也就是 pod 不断更新的过程。而这个方法本身是用来计算出需要更新哪些 pod ,哪一些要删,哪一些要新增。具体就不再展开。关键是这个部分

代码语言:javascript
复制
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds, false)

getNodesToDaemonPods 返回了一个 map,nodeToDaemonPods,key 是 NodeName 而 value 则是对应的 pod 列表。内部的实现其实也非常简单。

代码语言:javascript
复制
// pkg/controller/daemon/daemon_controller.go:755
func (dsc *DaemonSetsController) getNodesToDaemonPods(ctx context.Context, ds *apps.DaemonSet, includeDeletedTerminal bool) (map[string][]*v1.Pod, error) {
	claimedPods, err := dsc.getDaemonPods(ctx, ds)
	if err != nil {
		return nil, err
	}
	// Group Pods by Node name.
	nodeToDaemonPods := make(map[string][]*v1.Pod)
	logger := klog.FromContext(ctx)
	for _, pod := range claimedPods {
		if !includeDeletedTerminal && podutil.IsPodTerminal(pod) && pod.DeletionTimestamp != nil {
			// This Pod has a finalizer or is already scheduled for deletion from the
			// store by the kubelet or the Pod GC. The DS controller doesn't have
			// anything else to do with it.
			continue
		}
		nodeName, err := util.GetTargetNodeName(pod)
		if err != nil {
			logger.V(4).Info("Failed to get target node name of Pod in DaemonSet",
				"pod", klog.KObj(pod), "daemonset", klog.KObj(ds))
			continue
		}

		nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], pod)
	}

	return nodeToDaemonPods, nil
}

可以看到就是将 DaemonPods 拿出来,通过 GetTargetNodeName 拿到对应的 nodeName 然后分好就可以了。其中内部就是通过 dsc.podLister.Pods(ds.Namespace).List(labels.Everything()) 来完成的。总结一下,就是其实当时直接查出来的。

看到这里你也许会好奇为什么我会单独把这个部分拿出来看,而不是去看其他创建或者计算的过程。首先我会觉得其他的部分可以算是 “业务” 它有着自己的逻辑,按部就班,并且正确计算条件即可。而之所以看这部分是想强化一下我们对于控制循环的理解,我们在这个大章节最开始就提到了它。控制循环的本质是根据当前状态和期望状态不一致,从而触发改变,让目标状态最终能变成期望状态,而关键在于是 ”当前状态“,这个状态可能会由于整个集群任何操作变化的改变而变动,所以只有当下去看,才能知道目前的状态是什么样的,改变的因素太多了。

总结提升

这一节我们看了 DaemonPods 的源码部分,如果你已经可以自己在源码中寻找到前面提出问题的答案,那么我相信对于各种其他的对象你也可以轻车熟路了。并且看到这里,你应该就能感觉到,其实看源码本身并不难,找准目标一步步往下走就可以了,虽然代码量很多,但是设计绝大多数其实都是相通的,一个类型看一个,都能举一反三。相信你渐渐能有这样的体会。

编码上

最后,在编码上,我们可以总结一个小点。

代码语言:javascript
复制
func (dsc *DaemonSetsController) syncNodes(ctx context.Context, ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error {
	// ...

	errCh := make(chan error, createDiff+deleteDiff)
	createWait := sync.WaitGroup{}
	// ...
	batchSize := integer.IntMin(createDiff, controller.SlowStartInitialBatchSize)
	for pos := 0; createDiff > pos; batchSize, pos = integer.IntMin(2*batchSize, createDiff-(pos+batchSize)), pos+batchSize {
		errorCount := len(errCh)
		createWait.Add(batchSize)
		for i := pos; i < pos+batchSize; i++ {
			go func(ix int) {
				defer createWait.Done()

				// ...
				if err != nil {
					errCh <- err
				}
			}(i)
		}
		createWait.Wait()
		// ...
	}

	// ...
	errors := []error{}
	close(errCh)
	for err := range errCh {
		errors = append(errors, err)
	}
	return utilerrors.NewAggregate(errors)
}

在我们前面看到的 syncNodes 方法中有一个非常标准的利用 WaitGroup 去并发处理任务并等待任务处理完毕,同时利用 chan error 将错误统一发送到 channel 最后一并处理合并的最佳实践。这一部分的编码我相信很多地方都是可以使用的,希望你也能学到。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-09-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 前置知识
  • 码前讨论
  • 码前提问
    • 问题 1
      • 问题 2
        • 问题 3
        • 源码分析
          • 问题 1
            • 问题 2
              • 问题 3
              • 总结提升
                • 编码上
                相关产品与服务
                容器服务
                腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档