HAMi 的 scheduler 比较复杂,这里会用比较长的篇幅介绍。在介绍 HAMi 的 scheduler 前会提及一些其他的内容以便更好理解 scheduler 的逻辑。
K8s 的调度就是给 Pod 的 spec.nodeName 进行赋值,待调度 Pod 的这个值是空的。
K8s 有如下几种自定义调度逻辑的实现:
Scheduler Extender 需要外置调度器实现一系列 http 接口:
接口名 | 路径 | 方法 | 功能 | 输入 | 输出 | 必选/可选 |
|---|---|---|---|---|---|---|
Filter | /filter | POST | 过滤掉不符合条件的节点 | 包含 Pod 和节点列表的 JSON | 过滤后的节点列表和错误信息 | 必选 |
Prioritize | /prioritize | POST | 为节点打分 | 包含 Pod 和节点列表的 JSON | 每个节点的分数和错误信息 | 可选 |
Bind | /bind | POST | 将 Pod 绑定到指定节点 | 包含 Pod 和节点名的 JSON | 操作结果和错误信息 | 可选 |
Preempt | /preempt | POST | 抢占逻辑 | 包含 Pod 和候选节点列表的 JSON | 抢占结果和错误信息 | 可选 |
HAMi 的 scheduler 只实现了 Filter 和 Bind。
接口定义参见 K8s Extender

HAMi
├── cmd
│ ├── scheduler
│ ├── main.go
│ ├── metrics.go
├── pkg
│ ├── device
│ │ ├── ascend
│ │ │ ├── device_test.go
│ │ │ ├── device.go
│ │ │ ├── vnpu.go
│ │ ├── cambricon
│ │ │ ├── device_test.go
│ │ │ ├── device.go
│ │ ├── hygon
│ │ │ ├── device_test.go
│ │ │ ├── device.go
│ │ ├── iluvatar
│ │ │ ├── device_test.go
│ │ │ ├── device.go
│ │ ├── metax
│ │ │ ├── device_test.go
│ │ │ ├── device.go
│ │ ├── mthreads
│ │ │ ├── device_test.go
│ │ │ ├── device.go
│ │ ├── nvidia
│ │ │ ├── device_test.go
│ │ │ ├── device.go
│ │ ├── device_test.go
│ │ ├── device.go
│ ├── scheduler
│ ├── config
│ │ ├── config.go
│ ├── policy
│ │ ├── constant.go
│ │ ├── gpu_policy_test.go
│ │ ├── gpu_policy.go
│ │ ├── node_policy_test.go
│ │ ├── node_policy.go
│ ├── routes
│ │ ├── route.go
│ ├── event_test.go
│ ├── event.go
│ ├── nodes_test.go
│ ├── nodes.go
│ ├── pod_test.go
│ ├── pods.go
│ ├── scheduler_test.go
│ ├── scheduler.go
│ ├── score_test.go
│ ├── score.go
│ ├── webhook_test.go
│ ├── webhook.go本篇除了介绍以上代码外还会有用到 charts/hami 下的部署文件。
scheduler 的启动流程相对比较简单直观:
// cmd\scheduler\main.go
var (
sher *scheduler.Scheduler
tlsKeyFile string
tlsCertFile string
rootCmd = &cobra.Command{
Use: "scheduler",
Short: "kubernetes vgpu scheduler",
Run: func(cmd *cobra.Command, args []string) {
flag.PrintPFlags(cmd.Flags())
start()
},
}
)
func start() {
// 初始化 device,主要用于 webhook
device.InitDevices()
// 启动 scheduler
sher = scheduler.NewScheduler()
sher.Start()
defer sher.Stop()
// 不断获取 device plugin 添加到 Node 对象上的 Annotations,用来解析 GPU 信息
go sher.RegisterFromNodeAnnotations()
go initMetrics(config.MetricsBindAddress)
// 启动 http 服务
router := httprouter.New()
router.POST("/filter", routes.PredicateRoute(sher)) // 用于 Scheduler Extender
router.POST("/bind", routes.Bind(sher)) // 用于 Scheduler Extender
router.POST("/webhook", routes.WebHookRoute()) // 用于 Webhook
router.GET("/healthz", routes.HealthzRoute()) // 用于健康检查
klog.Info("listen on ", config.HTTPBind)
if len(tlsCertFile) == 0 || len(tlsKeyFile) == 0 {
if err := http.ListenAndServe(config.HTTPBind, router); err != nil {
klog.Fatal("Listen and Serve error, ", err)
}
} else {
if err := http.ListenAndServeTLS(config.HTTPBind, tlsCertFile, tlsKeyFile, router); err != nil {
klog.Fatal("Listen and Serve error, ", err)
}
}
}
func main() {
// 使用 cobra 框架启动进程
if err := rootCmd.Execute(); err != nil {
klog.Fatal(err)
}
}这里的 webhook 是一个 Mutating Webhook,也是服务于 scheduler的。其核心功能是:根据 Pod Resource 字段中的 ResourceName 判断该 Pod 是否使用了 HAMi vGPU,如果是则修改 Pod 的 SchedulerName 为 hami-scheduler,由 hami-scheduler 进行调度,否则不做处理。
注:Mutating Webhook 和 Validating Webhook 区别:
特性 | Mutating Webhook | Validating Webhook |
|---|---|---|
主要目的 | 修改对象 | 验证对象 |
执行顺序 | 先执行 | 后执行 |
可修改请求 | 是 | 否 |
典型应用 | 对象增强 | 对象验证 |
更多 webhook 知识还请自行了解。
这个对象是为了让 webhook 生效,这里可以展开 HAMi 的部署配置文件:
# charts\hami\templates\scheduler\webhook.yaml
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
name: {{ include "hami-vgpu.scheduler.webhook" . }}
webhooks:
- admissionReviewVersions:
- v1beta1
clientConfig:
{{- if .Values.scheduler.admissionWebhook.customURL.enabled }}
url: https://{{ .Values.scheduler.admissionWebhook.customURL.host}}:{{.Values.scheduler.admissionWebhook.customURL.port}}{{.Values.scheduler.admissionWebhook.customURL.path}}
{{- else }}
service:
name: {{ include "hami-vgpu.scheduler" . }}
namespace: {{ .Release.Namespace }}
path: /webhook
port: {{ .Values.scheduler.service.httpPort }}
{{- end }}
failurePolicy: {{ .Values.scheduler.admissionWebhook.failurePolicy }}
matchPolicy: Equivalent
name: vgpu.hami.io
namespaceSelector: # 如果 namespace 带上了hami.io/webhook=ignore 标签不走该 webhook 逻辑
matchExpressions:
- key: hami.io/webhook
operator: NotIn
values:
- ignore
{{- if .Values.scheduler.admissionWebhook.whitelistNamespaces }}
- key: kubernetes.io/metadata.name
operator: NotIn
values:
{{- toYaml .Values.scheduler.admissionWebhook.whitelistNamespaces | nindent 10 }}
{{- end }}
objectSelector: # 如果资源对象带上了hami.io/webhook=ignore 标签不走该 webhook 逻辑
matchExpressions:
- key: hami.io/webhook
operator: NotIn
values:
- ignore
reinvocationPolicy: {{ .Values.scheduler.admissionWebhook.reinvocationPolicy }}
rules:
- apiGroups:
- ""
apiVersions:
- v1
operations:
- CREATE # 这里表示关注的是 Pod 创建的事件
resources:
- pods
scope: '*'
sideEffects: None
timeoutSeconds: 10当满足条件的 Pod 被创建时,会触发 K8s 的 api-server 调用这个 webhook 接口。
webhook 使用了 sigs.k8s.io/controller-runtime/pkg/webhook/admission 的 webhook 框架,需要实现 admission.Handler 这个接口,接口请求会调用实现的 Handle 方法:
// pkg\scheduler\webhook.go
func (h *webhook) Handle(_ context.Context, req admission.Request) admission.Response {
pod := &corev1.Pod{}
err := h.decoder.Decode(req, pod)
if err != nil {
klog.Errorf("Failed to decode request: %v", err)
return admission.Errored(http.StatusBadRequest, err)
}
if len(pod.Spec.Containers) == 0 {
klog.Warningf(template+" - Denying admission as pod has no containers", req.Namespace, req.Name, req.UID)
return admission.Denied("pod has no containers")
}
klog.Infof(template, req.Namespace, req.Name, req.UID)
hasResource := false
for idx, ctr := range pod.Spec.Containers {
c := &pod.Spec.Containers[idx]
if ctr.SecurityContext != nil {
// 对于特权模式的 Pod,直接忽略
if ctr.SecurityContext.Privileged != nil && *ctr.SecurityContext.Privileged {
klog.Warningf(template+" - Denying admission as container %s is privileged", req.Namespace, req.Name, req.UID, c.Name)
continue
}
}
for _, val := range device.GetDevices() { // 这里的 device 信息就是在启动时初始化的
// 判断 Pod Resource 中是否有申请 HAMi 支持的 vGPU 资源,这个 MutateAdmission 方法,不同的显卡实现有差异,这里不一一展开看了
found, err := val.MutateAdmission(c, pod)
if err != nil {
klog.Errorf("validating pod failed:%s", err.Error())
return admission.Errored(http.StatusInternalServerError, err)
}
hasResource = hasResource || found
}
}
if !hasResource {
klog.Infof(template+" - Allowing admission for pod: no resource found", req.Namespace, req.Name, req.UID)
//return admission.Allowed("no resource found")
} else if len(config.SchedulerName) > 0 {
// 如果 Pod Resource 申请了 HAMi 支持的 vGPU 资源,则使用 hami-scheduler 进行调度,这里修改了 Pod 的 spec.schedulerName
pod.Spec.SchedulerName = config.SchedulerName
if pod.Spec.NodeName != "" { // 如果 Pod 的 nodeName 不为空,不能用 hami-scheduler 进行调度
klog.Infof(template+" - Pod already has node assigned", req.Namespace, req.Name, req.UID)
return admission.Denied("pod has node assigned")
}
}
marshaledPod, err := json.Marshal(pod)
if err != nil {
klog.Errorf(template+" - Failed to marshal pod, error: %v", req.Namespace, req.Name, req.UID, err)
return admission.Errored(http.StatusInternalServerError, err)
}
return admission.PatchResponseFromRaw(req.Object.Raw, marshaledPod)
}HAMi 的 scheduler 包含两个部分,scheduler 的 Pod 里面有两个容器:其中一个容器是使用 kube-scheduler 的镜像拉起的,另一个是 HAMi 实现的 Scheduler Extender 的 http 服务,通过 hami-scheduler 的部署文件可以看出来(由于文件比较长,这里会略过部分):
# charts\hami\templates\scheduler\deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "hami-vgpu.scheduler" . }}
...
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/component: hami-scheduler
{{- include "hami-vgpu.selectorLabels" . | nindent 6 }}
template:
metadata:
...
spec:
...
containers:
{{- if .Values.scheduler.kubeScheduler.enabled }}
- name: kube-scheduler # 这里用了 kube-scheduler
image: "{{ .Values.scheduler.kubeScheduler.image }}:{{ include "resolvedKubeSchedulerTag" . }}"
imagePullPolicy: {{ .Values.scheduler.kubeScheduler.imagePullPolicy | quote }}
command:
。。。
resources:
{{- toYaml .Values.scheduler.kubeScheduler.resources | nindent 12 }}
volumeMounts:
- name: scheduler-config
mountPath: /config
{{- end }}
...
- name: vgpu-scheduler-extender # 这个是 HAMi 自己实现的 extender
image: {{ .Values.scheduler.extender.image }}:{{ .Values.version }}
imagePullPolicy: {{ .Values.scheduler.extender.imagePullPolicy | quote }}
env:
...而 KubeSchedulerConfiguration 的配置文件如下:
# charts\hami\templates\scheduler\configmapnew.yaml
{{- if .Values.scheduler.kubeScheduler.enabled }}
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ include "hami-vgpu.scheduler" . }}-newversion
labels:
app.kubernetes.io/component: hami-scheduler
{{- include "hami-vgpu.labels" . | nindent 4 }}
data:
config.yaml: |
{{- if gt (regexReplaceAll "[^0-9]" .Capabilities.KubeVersion.Minor "" | int) 25}}
apiVersion: kubescheduler.config.k8s.io/v1
{{- else }}
apiVersion: kubescheduler.config.k8s.io/v1beta2
{{- end }}
kind: KubeSchedulerConfiguration
leaderElection:
leaderElect: false
profiles:
- schedulerName: {{ .Values.schedulerName }}
extenders: # 这里配置了 extender
- urlPrefix: "https://127.0.0.1:443" # 指定相同网络命名空间下的地址,即同一个 Pod 内
filterVerb: filter # filter 接口,指示了调度器会调用这个扩展器服务来过滤节点
bindVerb: bind # bind 接口,调度器扩展器可以执行绑定操作
nodeCacheCapable: true
weight: 1
httpTimeout: 30s
enableHTTPS: true
tlsConfig:
insecure: true
managedResources: # 这部分指定这个扩展调度器 hami-cheduler 管理的资源,只有 Pod Resource 中申请了 managedResources 中指定的资源时,Scheduler 才会请求我们配置的 Extender
- name: {{ .Values.resourceName }}
ignoredByScheduler: true # 这里的 true 表示当前调度器忽略这个资源,由 extender 进行调度
- name: {{ .Values.resourceMem }}
ignoredByScheduler: true
- name: {{ .Values.resourceCores }}
ignoredByScheduler: true
- name: {{ .Values.resourceMemPercentage }}
ignoredByScheduler: true
- name: {{ .Values.resourcePriority }}
ignoredByScheduler: true
- name: {{ .Values.mluResourceName }}
ignoredByScheduler: true
- name: {{ .Values.dcuResourceName }}
ignoredByScheduler: true
- name: {{ .Values.dcuResourceMem }}
ignoredByScheduler: true
- name: {{ .Values.dcuResourceCores }}
ignoredByScheduler: true
- name: {{ .Values.iluvatarResourceName }}
ignoredByScheduler: true
{{- if .Values.devices.ascend.enabled }}
{{- range .Values.devices.ascend.customresources }}
- name: {{ . }}
ignoredByScheduler: true
{{- end }}
{{- end }}
{{- if .Values.devices.mthreads.enabled }}
{{- range .Values.devices.mthreads.customresources }}
- name: {{ . }}
ignoredByScheduler: true
{{- end }}
{{- end }}
{{- end }}// pkg\scheduler\scheduler.go
func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFilterResult, error) {
klog.InfoS("Starting schedule filter process", "pod", args.Pod.Name, "uuid", args.Pod.UID, "namespace", args.Pod.Namespace)
nums := k8sutil.Resourcereqs(args.Pod)
total := 0
for _, n := range nums {
for _, k := range n {
total += int(k.Nums)
}
}
// 这里是统计到 Pod 申请的特殊资源为0,所以返回全部 Node 都可以调度
if total == 0 {
klog.V(1).InfoS("Pod does not request any resources",
"pod", args.Pod.Name)
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("does not request any resource"))
return &extenderv1.ExtenderFilterResult{
NodeNames: args.NodeNames,
FailedNodes: nil,
Error: "",
}, nil
}
annos := args.Pod.Annotations
s.delPod(args.Pod)
// 返回所有 Node 的内存使用情况
nodeUsage, failedNodes, err := s.getNodesUsage(args.NodeNames, args.Pod)
if err != nil {
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
return nil, err
}
if len(failedNodes) != 0 {
klog.V(5).InfoS("Nodes failed during usage retrieval",
"nodes", failedNodes)
}
// 给可用的 Node 进行打分,具体打分逻辑则是根据每个节点上的已经使用的 GPU Core、GPU Memory 资源和总的GPU Core、GPU Memory 的比值,根据权重归一化处理后得到最终的得分。除了计算得分还要判断节点剩余资源是否能满足当前 Pod,如果不满足则直接忽略掉,所以这里的 nodeScores 可能为空
nodeScores, err := s.calcScore(nodeUsage, nums, annos, args.Pod, failedNodes)
if err != nil {
err := fmt.Errorf("calcScore failed %v for pod %v", err, args.Pod.Name)
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
return nil, err
}
// 如果打分后的 node 为空返回失败
if len((*nodeScores).NodeList) == 0 {
klog.V(4).InfoS("No available nodes meet the required scores",
"pod", args.Pod.Name)
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("no available node, all node scores do not meet"))
return &extenderv1.ExtenderFilterResult{
FailedNodes: failedNodes,
}, nil
}
klog.V(4).Infoln("nodeScores_len=", len((*nodeScores).NodeList))
// 根据打分进行升序排序
sort.Sort(nodeScores)
// 直接选择最高分的 node,因为这里的 extender 没有实现 Prioritize,这里的 filter 集成了 Filter 和 Prioritize
m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1]
klog.InfoS("Scheduling pod to node",
"podNamespace", args.Pod.Namespace,
"podName", args.Pod.Name,
"nodeID", m.NodeID,
"devices", m.Devices)
annotations := make(map[string]string)
annotations[util.AssignedNodeAnnotations] = m.NodeID
annotations[util.AssignedTimeAnnotations] = strconv.FormatInt(time.Now().Unix(), 10)
for _, val := range device.GetDevices() {
val.PatchAnnotations(&annotations, m.Devices)
}
//InRequestDevices := util.EncodePodDevices(util.InRequestDevices, m.devices)
//supportDevices := util.EncodePodDevices(util.SupportDevices, m.devices)
//maps.Copy(annotations, InRequestDevices)
//maps.Copy(annotations, supportDevices)
s.addPod(args.Pod, m.NodeID, m.Devices)
// 更新 Pod 的 annotation
err = util.PatchPodAnnotations(args.Pod, annotations)
if err != nil {
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
s.delPod(args.Pod)
return nil, err
}
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringSucceed, []string{m.NodeID}, nil)
// 返回结果
res := extenderv1.ExtenderFilterResult{NodeNames: &[]string{m.NodeID}}
return &res, nil
}bind 就是根据前面 filter 的结果,将 Pod 和 Node 进行绑定:
// pkg\scheduler\scheduler.go
func (s *Scheduler) Bind(args extenderv1.ExtenderBindingArgs) (*extenderv1.ExtenderBindingResult, error) {
klog.InfoS("Attempting to bind pod to node", "pod", args.PodName, "namespace", args.PodNamespace, "node", args.Node)
var res *extenderv1.ExtenderBindingResult
binding := &corev1.Binding{
ObjectMeta: metav1.ObjectMeta{Name: args.PodName, UID: args.PodUID},
Target: corev1.ObjectReference{Kind: "Node", Name: args.Node},
}
current, err := s.kubeClient.CoreV1().Pods(args.PodNamespace).Get(context.Background(), args.PodName, metav1.GetOptions{})
if err != nil {
klog.ErrorS(err, "Failed to get pod", "pod", args.PodName, "namespace", args.PodNamespace)
return &extenderv1.ExtenderBindingResult{Error: err.Error()}, err
}
klog.InfoS("Trying to get the target node for pod", "pod", args.PodName, "namespace", args.PodNamespace, "node", args.Node)
node, err := s.kubeClient.CoreV1().Nodes().Get(context.Background(), args.Node, metav1.GetOptions{})
if err != nil {
klog.ErrorS(err, "Failed to get node", "node", args.Node)
s.recordScheduleBindingResultEvent(current, EventReasonBindingFailed, []string{}, fmt.Errorf("failed to get node %s", args.Node))
res = &extenderv1.ExtenderBindingResult{Error: err.Error()}
return res, nil
}
tmppatch := map[string]string{
util.DeviceBindPhase: "allocating",
util.BindTimeAnnotations: strconv.FormatInt(time.Now().Unix(), 10),
}
for _, val := range device.GetDevices() {
err = val.LockNode(node, current)
if err != nil {
klog.ErrorS(err, "Failed to lock node", "node", args.Node, "device", val)
goto ReleaseNodeLocks
}
}
err = util.PatchPodAnnotations(current, tmppatch)
if err != nil {
klog.ErrorS(err, "Failed to patch pod annotations", "pod", klog.KObj(current))
return &extenderv1.ExtenderBindingResult{Error: err.Error()}, err
}
// 在这里调用 apiserver 的接口创建一个 binding 对象,将 Pod 调度到指定 Node
err = s.kubeClient.CoreV1().Pods(args.PodNamespace).Bind(context.Background(), binding, metav1.CreateOptions{})
if err != nil {
klog.ErrorS(err, "Failed to bind pod", "pod", args.PodName, "namespace", args.PodNamespace, "node", args.Node)
goto ReleaseNodeLocks
}
s.recordScheduleBindingResultEvent(current, EventReasonBindingSucceed, []string{args.Node}, nil)
klog.InfoS("Successfully bound pod to node", "pod", args.PodName, "namespace", args.PodNamespace, "node", args.Node)
return &extenderv1.ExtenderBindingResult{Error: ""}, nil
// 绑定失败的话需要释放锁定的 Node 上的资源
ReleaseNodeLocks:
klog.InfoS("Release node locks", "node", args.Node)
for _, val := range device.GetDevices() {
val.ReleaseNodeLock(node, current)
}
s.recordScheduleBindingResultEvent(current, EventReasonBindingFailed, []string{}, err)
return &extenderv1.ExtenderBindingResult{Error: err.Error()}, nil
}从第2章节的图中,可以看出 hami-scheduler 会动态的从 kube-apiserver 获取 Node、Pod 的 annotation 并从中解析出 GPU 资源和使用情况,可以通过 scheduler 的 Start 方法看出:
// pkg\scheduler\scheduler.go
func (s *Scheduler) Start() {
klog.InfoS("Starting HAMi scheduler components")
s.kubeClient = client.GetClient()
// informers 提供了一种高效、可靠的方式来监听和响应 K8s 集群资源变化
informerFactory := informers.NewSharedInformerFactoryWithOptions(s.kubeClient, time.Hour*1)
s.podLister = informerFactory.Core().V1().Pods().Lister()
s.nodeLister = informerFactory.Core().V1().Nodes().Lister()
informer := informerFactory.Core().V1().Pods().Informer()
// 处理 Pod 的 Add/Del/Update 事件
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: s.onAddPod,
UpdateFunc: s.onUpdatePod,
DeleteFunc: s.onDelPod,
})
// 处理 Node 的 Add/Del/Update 事件
informerFactory.Core().V1().Nodes().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: s.onAddNode,
UpdateFunc: s.onUpdateNode,
DeleteFunc: s.onDelNode,
})
informerFactory.Start(s.stopCh)
informerFactory.WaitForCacheSync(s.stopCh)
s.addAllEventHandlers()
}Node 变化处理:
// pkg\scheduler\scheduler.go
// 这里都是给 nodeNotify channel 发送一个消息,会触发 RegisterFromNodeAnnotations 的循环处理
func (s *Scheduler) onUpdateNode(_, newObj interface{}) {
s.nodeNotify <- struct{}{}
}
func (s *Scheduler) onDelNode(obj interface{}) {
s.nodeNotify <- struct{}{}
}
func (s *Scheduler) onAddNode(obj interface{}) {
s.nodeNotify <- struct{}{}
}Pod 变化处理:
// pkg\scheduler\scheduler.go
func (s *Scheduler) onAddPod(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
klog.ErrorS(fmt.Errorf("invalid pod object"), "Failed to process pod addition")
return
}
klog.V(5).InfoS("Pod added", "pod", pod.Name, "namespace", pod.Namespace)
// 对于没有 hami.io/vgpu-node annoations 的 Pod,不予处理
nodeID, ok := pod.Annotations[util.AssignedNodeAnnotations]
if !ok {
return
}
// 待销毁的 Pod 也不处理
if k8sutil.IsPodInTerminatedState(pod) {
s.delPod(pod)
return
}
podDev, _ := util.DecodePodDevices(util.SupportDevices, pod.Annotations)
// 记录 Pod 占用的资源信息
s.addPod(pod, nodeID, podDev)
}
func (s *Scheduler) onUpdatePod(_, newObj interface{}) {
s.onAddPod(newObj)
}
func (s *Scheduler) onDelPod(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
klog.Errorf("unknown add object type")
return
}
_, ok = pod.Annotations[util.AssignedNodeAnnotations]
if !ok {
return
}
// 移除 Pod 占用的资源
s.delPod(pod)
}原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。