摘要:最近我们在写自己的Kubernetes服务路由组件对接公司自研的负载均衡器,这其中涉及到非常核心的Endpoints相关的逻辑,因此对Endpoints Controller的深入分析是非常有必要的,比如Pod Label发生变更、孤立Pod、Pod HostName发生变更等情况下,Endpoints Controller的处理逻辑是否与我们想要的一致。
--concurrent-endpoint-syncs
int32 Default: 5
The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load.--leader-elect-resource-lock
endpoints Default: "endpoints"
The type of resource object that is used for locking during leader election. Supported options are endpoints (default) and configmaps
.启动两类go协程:
// Run will not return until stopCh is closed. workers determines how many
// endpoints will be handled in parallel.
func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer e.queue.ShutDown()
glog.Infof("Starting endpoint controller")
defer glog.Infof("Shutting down endpoint controller")
if !controller.WaitForCacheSync("endpoint", stopCh, e.podsSynced, e.servicesSynced, e.endpointsSynced) {
return
}
// workers = --concurrent-endpoint-syncs's value (default 5)
for i := 0; i < workers; i++ {
// workerLoopPeriod = 1s
go wait.Until(e.worker, e.workerLoopPeriod, stopCh)
}
go func() {
defer utilruntime.HandleCrash()
e.checkLeftoverEndpoints()
}()
<-stopCh
}
checkLeftoverEndpoints负责List所有当前集群中的endpoints并将它们对应的services添加到queue中,由workers进行syncService同步。
这是为了防止在controller-manager发生重启时时,用户删除了某些Services或者某些Endpoints还没删除干净,Endpoints Controller没有处理的情况下,在Endpoints Controller再次启动时能通过checkLeftoverEndpoints检测到那些孤立的endpionts(没有对应services),将虚构的Services重新加入到队列进行syncService操作,从而完成这些孤立endpoint的清理工作。
上面提到的虚构Services其实是把Endpoints的Key(namespace/name)作为Services的Key,因此这就是为什么要求Endpiont和Service的名字要一致的原因之一。
func (e *EndpointController) checkLeftoverEndpoints() {
list, err := e.endpointsLister.List(labels.Everything())
if err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err))
return
}
for _, ep := range list {
if _, ok := ep.Annotations[resourcelock.LeaderElectionRecordAnnotationKey]; ok {
// when there are multiple controller-manager instances,
// we observe that it will delete leader-election endpoints after 5min
// and cause re-election
// so skip the delete here
// as leader-election only have endpoints without service
continue
}
key, err := keyFunc(ep)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to get key for endpoint %#v", ep))
continue
}
e.queue.Add(key)
}
}
另外,还需要注意一点,对于kube-controller-manager多实例HA部署时,各个contorller-manager endpoints是没有对应service的,这种情况下,我们不能把虚构的Service加入到队列触发这些“理应孤立”的endpoints被清理,因此我们给这些“理应孤立”的endpoints加上Annotation "control-plane.alpha.kubernetes.io/leader"以做跳过处理。
Service的Add/Update/Delete Event Handler都是将Service Key加入到Queue中,等待worker进行syncService处理:
1) 跳过没有pod.Status.PodIP为空的pod;
2) 当tolerate Unready Endpoints为false时,跳过那些被标记删除(DeletionTimestamp != nil)的Pods;
3) 对于Headless Service,因为没有Service Port,因此构建EndpointSubset时对应的Ports内容为空;
4)当tolerate Unready Endpoints为true(即使Pod not Ready)或者Pod isReady时,Pod对应的EndpointAddress也会被加入到(Ready)Addresses中。
5)tolerate Unready Endpoints为false且Pod isNotReady情况下:
- 当pod.Spec.RestartPolicy为Never,Pod Status.Phase为非结束状态(非Failed/Successed)时,Pod对应的EndpointAddress也会被加入到NotReadyAddresses中。
- 当pod.Spec.RestartPolicy为OnFailure, Pod Status.Phase为非Successed时,Pod对应的EndpointAddress也会被加入到NotReadyAddresses中。
- 其他情况下,Pod对应的EndpointAddress也会被加入到NotReadyAddresses中。
// When a pod is added, figure out what services it will be a member of and
// enqueue them. obj must have *v1.Pod type.
func (e *EndpointController) addPod(obj interface{}) {
pod := obj.(*v1.Pod)
services, err := e.getPodServiceMemberships(pod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err))
return
}
for key := range services {
e.queue.Add(key)
}
}
> 互相差值进行union集合的含义:`services.Difference(oldServices).Union(oldServices.Difference(services))`
// When a pod is deleted, enqueue the services the pod used to be a member of.
// obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.
func (e *EndpointController) deletePod(obj interface{}) {
if _, ok := obj.(*v1.Pod); ok {
// Enqueue all the services that the pod used to be a member
// of. This happens to be exactly the same thing we do when a
// pod is added.
e.addPod(obj)
return
}
// If we reached here it means the pod was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
return
}
pod, ok := tombstone.Obj.(*v1.Pod)
if !ok {
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Pod: %#v", obj))
return
}
glog.V(4).Infof("Enqueuing services of deleted pod %s/%s having final state unrecorded", pod.Namespace, pod.Name)
e.addPod(pod)
}
里面有几个struct,挺容易混淆的,简单用图表示下,方便比对:
通过对Endpoints Controller的源码分析,我们了解了其中很多细节,比如对Service和Pod事件处理逻辑、对孤立Pod的处理方法、Pod Labels变更带来的影响等等,这对我们通过Watch Endpoints去写自己的Ingress组件对接公司内部的路由组件时是有帮助的。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。