前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kubernetes Endpoints Controller源码分析

Kubernetes Endpoints Controller源码分析

原创
作者头像
Walton
发布2018-11-06 10:09:18
2.8K0
发布2018-11-06 10:09:18
举报
文章被收录于专栏:Kubernetes

摘要:最近我们在写自己的Kubernetes服务路由组件对接公司自研的负载均衡器,这其中涉及到非常核心的Endpoints相关的逻辑,因此对Endpoints Controller的深入分析是非常有必要的,比如Pod Label发生变更、孤立Pod、Pod HostName发生变更等情况下,Endpoints Controller的处理逻辑是否与我们想要的一致。

Endpoints Controller相关的配置项

  • --concurrent-endpoint-syncs int32 Default: 5 The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load.
  • --leader-elect-resource-lock endpoints Default: "endpoints" The type of resource object that is used for locking during leader election. Supported options are endpoints (default) and configmaps.

Endpoints Controller Watch的GVK

  • Core/V1/Pods
  • Core/V1/Services
  • Core/V1/Endpoints

Endpoints Controller Event Handler

  • Add Service Event --> enqueueService
  • Update Service Event --> enqueueService(new)
  • Delete Service Event --> enqueueService
  • Add Pod Event --> addPod
  • Update Pod Event --> updatePod
  • Delete Pod Event --> deletePod
  • Add/Update/Delete Endpoints Event --> nil

Run Endpoints Controller

启动两类go协程:

  • 一类协程数为--concurrent-endpoint-syncs配置值(default 5),每个worker负责从service queue中pop service进行syncService同步,完成一次sync后等待1s再从service queue中pop一个service进行sync,如此反复。
  • 另一类协程只有一个协程,负责checkLeftoverEndpoints,只有启动时会执行一次。
代码语言:txt
复制
// Run will not return until stopCh is closed. workers determines how many
// endpoints will be handled in parallel.
func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	defer e.queue.ShutDown()

	glog.Infof("Starting endpoint controller")
	defer glog.Infof("Shutting down endpoint controller")

	if !controller.WaitForCacheSync("endpoint", stopCh, e.podsSynced, e.servicesSynced, e.endpointsSynced) {
		return
	}

	// workers = --concurrent-endpoint-syncs's value (default 5)
	for i := 0; i < workers; i++ {
		// workerLoopPeriod = 1s
		go wait.Until(e.worker, e.workerLoopPeriod, stopCh)
	}

	go func() {
		defer utilruntime.HandleCrash()
		e.checkLeftoverEndpoints()
	}()

	<-stopCh
}

checkLeftoverEndpoints

checkLeftoverEndpoints负责List所有当前集群中的endpoints并将它们对应的services添加到queue中,由workers进行syncService同步。

这是为了防止在controller-manager发生重启时时,用户删除了某些Services或者某些Endpoints还没删除干净,Endpoints Controller没有处理的情况下,在Endpoints Controller再次启动时能通过checkLeftoverEndpoints检测到那些孤立的endpionts(没有对应services),将虚构的Services重新加入到队列进行syncService操作,从而完成这些孤立endpoint的清理工作。

上面提到的虚构Services其实是把Endpoints的Key(namespace/name)作为Services的Key,因此这就是为什么要求Endpiont和Service的名字要一致的原因之一。

代码语言:txt
复制
func (e *EndpointController) checkLeftoverEndpoints() {
	list, err := e.endpointsLister.List(labels.Everything())
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err))
		return
	}
	for _, ep := range list {
		if _, ok := ep.Annotations[resourcelock.LeaderElectionRecordAnnotationKey]; ok {
			// when there are multiple controller-manager instances,
			// we observe that it will delete leader-election endpoints after 5min
			// and cause re-election
			// so skip the delete here
			// as leader-election only have endpoints without service
			continue
		}
		key, err := keyFunc(ep)
		if err != nil {
			utilruntime.HandleError(fmt.Errorf("Unable to get key for endpoint %#v", ep))
			continue
		}
		e.queue.Add(key)
	}
}

另外,还需要注意一点,对于kube-controller-manager多实例HA部署时,各个contorller-manager endpoints是没有对应service的,这种情况下,我们不能把虚构的Service加入到队列触发这些“理应孤立”的endpoints被清理,因此我们给这些“理应孤立”的endpoints加上Annotation "control-plane.alpha.kubernetes.io/leader"以做跳过处理。

Endpoint Contoller的核心逻辑syncService

Service的Add/Update/Delete Event Handler都是将Service Key加入到Queue中,等待worker进行syncService处理:

  1. 根据queue中得到的service key(namespace/name)去indexer中获取对应的Service Object,如果没获取到,则调api删除同Key(namespace/name)的Endpoints Object进行清理工作,这对应到checkLeftoverEndpoints中描述到的那些孤立endpoints清理工作。
  2. 因为Service是通过LabelSelector进行Pod匹配,将匹配的Pods构建对应的Endpoints Subsets加入到Endpoints中,因此这里会先过滤掉那些没有LabelSelector的Services。
  3. 然后用Service的LabelSelector获取同namespace下的所有Pods。
  4. 检查service.Spec.PublishNotReadyAddresses是否为true,或者Service Annotations "service.alpha.kubernetes.io/tolerate-unready-endpoints"是否为true(/t/T/True/TRUE/1),如果为true,则表示tolerate Unready Endpoints,即Unready的Pods信息也会被加入该Service对应的Endpoints中。注意,Annotations "service.alpha.kubernetes.io/tolerate-unready-endpoints"在Kubernetes 1.13中将被弃用,后续只使用.Spec.PublishNotReadyAddresses Field。
  5. 接下来就是遍历前面获取到的Pods,用各个Pod的IP、ContainerPorts、HostName及Service的Port去构建Endpoints的Subsets,注意如下特殊处理:
代码语言:txt
复制
1) 跳过没有pod.Status.PodIP为空的pod;  
代码语言:txt
复制
2) 当tolerate Unready Endpoints为false时,跳过那些被标记删除(DeletionTimestamp != nil)的Pods;  
代码语言:txt
复制
3) 对于Headless Service,因为没有Service Port,因此构建EndpointSubset时对应的Ports内容为空;  
代码语言:txt
复制
4)当tolerate Unready Endpoints为true(即使Pod not Ready)或者Pod isReady时,Pod对应的EndpointAddress也会被加入到(Ready)Addresses中。
代码语言:txt
复制
5)tolerate Unready Endpoints为false且Pod isNotReady情况下: 
代码语言:txt
复制
	- 当pod.Spec.RestartPolicy为Never,Pod Status.Phase为非结束状态(非Failed/Successed)时,Pod对应的EndpointAddress也会被加入到NotReadyAddresses中。
	- 当pod.Spec.RestartPolicy为OnFailure, Pod Status.Phase为非Successed时,Pod对应的EndpointAddress也会被加入到NotReadyAddresses中。
	- 其他情况下,Pod对应的EndpointAddress也会被加入到NotReadyAddresses中。
  1. 从indexer中获取service对应的Endpoints Object(currentEndpoints),如果从indexer中没有返回对应的Endpoints Object,则构建一个与该Service同名、同Labels的Endpoints对象(newEndpoints)。
  2. 如果currentEndpoints的ResourceVersion不为空,则对比currentEndpoints.Subsets、Labels与前面构建的Subsets、Service.Labels是否DeepEqual,如果是则说明不需要update,流程结束。
  3. 否则,就像currentEndpoints DeepCopy给newEndpoints,并用前面构建的Subsets和Services.Labels替换newEndpoints中对应内容。
  4. 如果currentEndpoints的ResourceVersion为空,则调用Create API去创建上一步的newEndpoints Object。如果currentEndpoints的ResourceVersion不为空,表示已经存在对应的Endpoints,则调用Update API用newEndpoints去更新该Endpoints。
  5. 流程结束。

Pod Event Hanlder

Add Pod

  1. 通过Services LabeleSelector与Pod Labels进行匹配的方法,将该Pod能匹配上的所有Services都找出来,然后将它们的Key(namespace/name)都加入到queue等待sync。
代码语言:txt
复制
// When a pod is added, figure out what services it will be a member of and
// enqueue them. obj must have *v1.Pod type.
func (e *EndpointController) addPod(obj interface{}) {
	pod := obj.(*v1.Pod)
	services, err := e.getPodServiceMemberships(pod)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err))
		return
	}
	for key := range services {
		e.queue.Add(key)
	}
}

Update Pod

  • 如果newPod.ResourceVersion等于oldPod.ResourceVersion,则跳过,不进行任何update。
  • 检查新老Pod的DeletionTimestamp、Ready Condition以及由PodIP,Hostname等建构的EndpointAddress是否发生变更,只要其中之一发生变更,podChangedFlag就为true。
  • 检查新老Pod Spec的Labels、HostName、Subdomain是否发生变更,只要其中之一发生变更,labelChangedFlag就为true。
  • 如果podChangedFlag和labelChangedFlag都为false,则跳过,不做任何update。
  • 通过Services LabeleSelector与Pod Labels进行匹配的方法,将newPod能匹配上的所有Services都找出来(services记录),如果labelChangedFlag为true,则根据LabelSelector匹配找出oldPod对应的oldServices: - 如果podChangedFlag为true,则将services和oldServices进行union集合,将集合内的所有Services Key都加入到queue中等待sync; - 如果podChangedFlag为false,则将services和oldServices的互相差值进行union集合,将集合内的所有Services Key都加入到queue中等待sync;undefined
代码语言:txt
复制
> 互相差值进行union集合的含义:`services.Difference(oldServices).Union(oldServices.Difference(services))`

Delete Pod

  • 如果该pod还是个完整记录的pod,则跟addPod逻辑一样:通过Services LabeleSelector与Pod Labels进行匹配的方法,将该Pod能匹配上的所有Services都找出来,然后将它们的Key(namespace/name)都加入到queue等待sync。
  • 如果该pod是tombstone object(final state is unrecorded),则将其转换成v1.pod后,再调用addPod。相比正常的Pod,就是多了一步:从tombstone到v1.pod的转换。
代码语言:txt
复制
// When a pod is deleted, enqueue the services the pod used to be a member of.
// obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.
func (e *EndpointController) deletePod(obj interface{}) {
	if _, ok := obj.(*v1.Pod); ok {
		// Enqueue all the services that the pod used to be a member
		// of. This happens to be exactly the same thing we do when a
		// pod is added.
		e.addPod(obj)
		return
	}
	// If we reached here it means the pod was deleted but its final state is unrecorded.
	tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
	if !ok {
		utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
		return
	}
	pod, ok := tombstone.Obj.(*v1.Pod)
	if !ok {
		utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Pod: %#v", obj))
		return
	}
	glog.V(4).Infof("Enqueuing services of deleted pod %s/%s having final state unrecorded", pod.Namespace, pod.Name)
	e.addPod(pod)
}

核心Struct

里面有几个struct,挺容易混淆的,简单用图表示下,方便比对:

Kubernetes Endpoints Controller源码分析.png
Kubernetes Endpoints Controller源码分析.png

总结

通过对Endpoints Controller的源码分析,我们了解了其中很多细节,比如对Service和Pod事件处理逻辑、对孤立Pod的处理方法、Pod Labels变更带来的影响等等,这对我们通过Watch Endpoints去写自己的Ingress组件对接公司内部的路由组件时是有帮助的。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Endpoints Controller相关的配置项
  • Endpoints Controller Watch的GVK
  • Endpoints Controller Event Handler
  • Run Endpoints Controller
  • checkLeftoverEndpoints
  • Endpoint Contoller的核心逻辑syncService
  • Pod Event Hanlder
    • Add Pod
      • Update Pod
        • Delete Pod
        • 核心Struct
        • 总结
        相关产品与服务
        负载均衡
        负载均衡(Cloud Load Balancer,CLB)提供安全快捷的四七层流量分发服务,访问流量经由 CLB 可以自动分配到多台后端服务器上,扩展系统的服务能力并消除单点故障。轻松应对大流量访问场景。 网关负载均衡(Gateway Load Balancer,GWLB)是运行在网络层的负载均衡。通过 GWLB 可以帮助客户部署、扩展和管理第三方虚拟设备,操作简单,安全性强。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档