首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >HAMi源码解析——scheduler

HAMi源码解析——scheduler

原创
作者头像
DifficultWork
发布2025-07-04 15:57:54
发布2025-07-04 15:57:54
3510
举报
文章被收录于专栏:阶梯计划阶梯计划

1 背景

HAMi 的 scheduler 比较复杂,这里会用比较长的篇幅介绍。在介绍 HAMi 的 scheduler 前会提及一些其他的内容以便更好理解 scheduler 的逻辑。

1.1 K8s 的自定义调度逻辑

K8s 的调度就是给 Pod 的 spec.nodeName 进行赋值,待调度 Pod 的这个值是空的。

K8s 有如下几种自定义调度逻辑的实现:

  • 使用插件化的调度框架(Scheduling Framework):K8s有 scheduler-plugin 模板,其原理就是向现有的调度器中添加了一组插件化的 API,该 API 在保持调度程序“核心”简单且易于维护的同时,使得大部分的调度功能以插件的形式存在。
  • 原有调度器扩展(Scheduler Extender):将一个实现对应接口的 http 服务作为外置调度器使用。通过配置 KubeSchedulerConfiguration 原 scheduler 会以 http 调用方式和外置调度器交互,实现在不改动原有调度器基础上增加自定义逻辑。HAMi 的 scheduler 就是基于此实现的。
  • 其他方案:通过自定义 webhook 直接修改未调度 Pod 的 spec.nodeName 理论上也可以实现自定义调度。

1.2 Scheduler Extender

Scheduler Extender 需要外置调度器实现一系列 http 接口:

接口名

路径

方法

功能

输入

输出

必选/可选

Filter

/filter

POST

过滤掉不符合条件的节点

包含 Pod 和节点列表的 JSON

过滤后的节点列表和错误信息

必选

Prioritize

/prioritize

POST

为节点打分

包含 Pod 和节点列表的 JSON

每个节点的分数和错误信息

可选

Bind

/bind

POST

将 Pod 绑定到指定节点

包含 Pod 和节点名的 JSON

操作结果和错误信息

可选

Preempt

/preempt

POST

抢占逻辑

包含 Pod 和候选节点列表的 JSON

抢占结果和错误信息

可选

HAMi 的 scheduler 只实现了 Filter 和 Bind。

接口定义参见 K8s Extender

2 HAMi webhook、scheduler、device-plugin 间的关系和工作流程 {#section2}

3 HAMi scheduler 源码分析

3.1 代码结构

代码语言:bash
复制
HAMi
├── cmd
│   ├── scheduler
│       ├── main.go
│       ├── metrics.go
├── pkg
│   ├── device
│   │   ├── ascend
│   │   │   ├── device_test.go
│   │   │   ├── device.go
│   │   │   ├── vnpu.go
│   │   ├── cambricon
│   │   │   ├── device_test.go
│   │   │   ├── device.go
│   │   ├── hygon
│   │   │   ├── device_test.go
│   │   │   ├── device.go
│   │   ├── iluvatar
│   │   │   ├── device_test.go
│   │   │   ├── device.go
│   │   ├── metax
│   │   │   ├── device_test.go
│   │   │   ├── device.go
│   │   ├── mthreads
│   │   │   ├── device_test.go
│   │   │   ├── device.go
│   │   ├── nvidia
│   │   │   ├── device_test.go
│   │   │   ├── device.go
│   │   ├── device_test.go
│   │   ├── device.go
│   ├── scheduler
│       ├── config
│       │   ├── config.go
│       ├── policy
│       │   ├── constant.go
│       │   ├── gpu_policy_test.go
│       │   ├── gpu_policy.go
│       │   ├── node_policy_test.go
│       │   ├── node_policy.go
│       ├── routes
│       │   ├── route.go
│       ├── event_test.go
│       ├── event.go
│       ├── nodes_test.go
│       ├── nodes.go
│       ├── pod_test.go
│       ├── pods.go
│       ├── scheduler_test.go
│       ├── scheduler.go
│       ├── score_test.go
│       ├── score.go
│       ├── webhook_test.go
│       ├── webhook.go

本篇除了介绍以上代码外还会有用到 charts/hami 下的部署文件。

3.2 scheduler 启动流程

scheduler 的启动流程相对比较简单直观:

代码语言:go
复制
// cmd\scheduler\main.go
var (
	sher        *scheduler.Scheduler
	tlsKeyFile  string
	tlsCertFile string
	rootCmd     = &cobra.Command{
		Use:   "scheduler",
		Short: "kubernetes vgpu scheduler",
		Run: func(cmd *cobra.Command, args []string) {
			flag.PrintPFlags(cmd.Flags())
			start()
		},
	}
)

func start() {
    // 初始化 device,主要用于 webhook
	device.InitDevices()

    // 启动 scheduler
	sher = scheduler.NewScheduler()
	sher.Start()
	defer sher.Stop()

    // 不断获取 device plugin 添加到 Node 对象上的 Annotations,用来解析 GPU 信息
	go sher.RegisterFromNodeAnnotations()
	go initMetrics(config.MetricsBindAddress)

    // 启动 http 服务
	router := httprouter.New()
	router.POST("/filter", routes.PredicateRoute(sher)) // 用于 Scheduler Extender
	router.POST("/bind", routes.Bind(sher)) // 用于 Scheduler Extender
	router.POST("/webhook", routes.WebHookRoute()) // 用于 Webhook
	router.GET("/healthz", routes.HealthzRoute()) // 用于健康检查
	klog.Info("listen on ", config.HTTPBind)
	if len(tlsCertFile) == 0 || len(tlsKeyFile) == 0 {
		if err := http.ListenAndServe(config.HTTPBind, router); err != nil {
			klog.Fatal("Listen and Serve error, ", err)
		}
	} else {
		if err := http.ListenAndServeTLS(config.HTTPBind, tlsCertFile, tlsKeyFile, router); err != nil {
			klog.Fatal("Listen and Serve error, ", err)
		}
	}
}

func main() {
    // 使用 cobra 框架启动进程
	if err := rootCmd.Execute(); err != nil {
		klog.Fatal(err)
	}
}

3.3 webhook 实现

这里的 webhook 是一个 Mutating Webhook,也是服务于 scheduler的。其核心功能是:根据 Pod Resource 字段中的 ResourceName 判断该 Pod 是否使用了 HAMi vGPU,如果是则修改 Pod 的 SchedulerName 为 hami-scheduler,由 hami-scheduler 进行调度,否则不做处理。

注:Mutating Webhook 和 Validating Webhook 区别:

特性

Mutating Webhook

Validating Webhook

主要目的

修改对象

验证对象

执行顺序

先执行

后执行

可修改请求

典型应用

对象增强

对象验证

更多 webhook 知识还请自行了解。

3.3.1 MutatingWebhookConfiguration 对象

这个对象是为了让 webhook 生效,这里可以展开 HAMi 的部署配置文件:

代码语言:yaml
复制
# charts\hami\templates\scheduler\webhook.yaml
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
  name: {{ include "hami-vgpu.scheduler.webhook" . }}
webhooks:
  - admissionReviewVersions:
    - v1beta1
    clientConfig:
      {{- if .Values.scheduler.admissionWebhook.customURL.enabled }}
      url: https://{{ .Values.scheduler.admissionWebhook.customURL.host}}:{{.Values.scheduler.admissionWebhook.customURL.port}}{{.Values.scheduler.admissionWebhook.customURL.path}}
      {{- else }}
      service:
        name: {{ include "hami-vgpu.scheduler" . }}
        namespace: {{ .Release.Namespace }}
        path: /webhook
        port: {{ .Values.scheduler.service.httpPort }}
      {{- end }}
    failurePolicy: {{ .Values.scheduler.admissionWebhook.failurePolicy }}
    matchPolicy: Equivalent
    name: vgpu.hami.io
    namespaceSelector:   # 如果 namespace 带上了hami.io/webhook=ignore 标签不走该 webhook 逻辑
      matchExpressions:
      - key: hami.io/webhook
        operator: NotIn
        values:
        - ignore
      {{- if .Values.scheduler.admissionWebhook.whitelistNamespaces }}
      - key: kubernetes.io/metadata.name
        operator: NotIn
        values:
        {{- toYaml .Values.scheduler.admissionWebhook.whitelistNamespaces | nindent 10 }}
      {{- end }}
    objectSelector:  # 如果资源对象带上了hami.io/webhook=ignore 标签不走该 webhook 逻辑
      matchExpressions:
      - key: hami.io/webhook
        operator: NotIn
        values:
        - ignore
    reinvocationPolicy: {{ .Values.scheduler.admissionWebhook.reinvocationPolicy }}
    rules:
      - apiGroups:
          - ""
        apiVersions:
          - v1
        operations:
          - CREATE     # 这里表示关注的是 Pod 创建的事件
        resources:
          - pods
        scope: '*'
    sideEffects: None
    timeoutSeconds: 10

当满足条件的 Pod 被创建时,会触发 K8s 的 api-server 调用这个 webhook 接口。

3.3.2 代码实现

webhook 使用了 sigs.k8s.io/controller-runtime/pkg/webhook/admission 的 webhook 框架,需要实现 admission.Handler 这个接口,接口请求会调用实现的 Handle 方法:

代码语言:go
复制
// pkg\scheduler\webhook.go
func (h *webhook) Handle(_ context.Context, req admission.Request) admission.Response {
	pod := &corev1.Pod{}
	err := h.decoder.Decode(req, pod)
	if err != nil {
		klog.Errorf("Failed to decode request: %v", err)
		return admission.Errored(http.StatusBadRequest, err)
	}
	if len(pod.Spec.Containers) == 0 {
		klog.Warningf(template+" - Denying admission as pod has no containers", req.Namespace, req.Name, req.UID)
		return admission.Denied("pod has no containers")
	}
	klog.Infof(template, req.Namespace, req.Name, req.UID)
	hasResource := false
	for idx, ctr := range pod.Spec.Containers {
		c := &pod.Spec.Containers[idx]
		if ctr.SecurityContext != nil {
            // 对于特权模式的 Pod,直接忽略
			if ctr.SecurityContext.Privileged != nil && *ctr.SecurityContext.Privileged {
				klog.Warningf(template+" - Denying admission as container %s is privileged", req.Namespace, req.Name, req.UID, c.Name)
				continue
			}
		}
		for _, val := range device.GetDevices() { // 这里的 device 信息就是在启动时初始化的
            // 判断 Pod Resource 中是否有申请 HAMi 支持的 vGPU 资源,这个 MutateAdmission 方法,不同的显卡实现有差异,这里不一一展开看了
			found, err := val.MutateAdmission(c, pod)
			if err != nil {
				klog.Errorf("validating pod failed:%s", err.Error())
				return admission.Errored(http.StatusInternalServerError, err)
			}
			hasResource = hasResource || found
		}
	}

	if !hasResource {
		klog.Infof(template+" - Allowing admission for pod: no resource found", req.Namespace, req.Name, req.UID)
		//return admission.Allowed("no resource found")
	} else if len(config.SchedulerName) > 0 {
        // 如果 Pod Resource 申请了 HAMi 支持的 vGPU 资源,则使用 hami-scheduler 进行调度,这里修改了 Pod 的 spec.schedulerName
		pod.Spec.SchedulerName = config.SchedulerName
		if pod.Spec.NodeName != "" { // 如果 Pod 的 nodeName 不为空,不能用 hami-scheduler 进行调度
			klog.Infof(template+" - Pod already has node assigned", req.Namespace, req.Name, req.UID)
			return admission.Denied("pod has node assigned")
		}
	}
	marshaledPod, err := json.Marshal(pod)
	if err != nil {
		klog.Errorf(template+" - Failed to marshal pod, error: %v", req.Namespace, req.Name, req.UID, err)
		return admission.Errored(http.StatusInternalServerError, err)
	}
	return admission.PatchResponseFromRaw(req.Object.Raw, marshaledPod)
}

3.4 scheduler 实现

HAMi 的 scheduler 包含两个部分,scheduler 的 Pod 里面有两个容器:其中一个容器是使用 kube-scheduler 的镜像拉起的,另一个是 HAMi 实现的 Scheduler Extender 的 http 服务,通过 hami-scheduler 的部署文件可以看出来(由于文件比较长,这里会略过部分):

代码语言:yaml
复制
# charts\hami\templates\scheduler\deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: {{ include "hami-vgpu.scheduler" . }}
  ...
spec:
  replicas: 1
  selector:
    matchLabels:
      app.kubernetes.io/component: hami-scheduler
      {{- include "hami-vgpu.selectorLabels" . | nindent 6 }}
  template:
    metadata:
      ...
    spec:
      ...
      containers:
      {{- if .Values.scheduler.kubeScheduler.enabled }}
        - name: kube-scheduler    # 这里用了 kube-scheduler
          image: "{{ .Values.scheduler.kubeScheduler.image }}:{{ include "resolvedKubeSchedulerTag" . }}"
          imagePullPolicy: {{ .Values.scheduler.kubeScheduler.imagePullPolicy | quote }}
          command:
            。。。
          resources:
          {{- toYaml .Values.scheduler.kubeScheduler.resources | nindent 12 }}
          volumeMounts:
            - name: scheduler-config
              mountPath: /config
        {{- end }}
          ...
        - name: vgpu-scheduler-extender # 这个是 HAMi 自己实现的 extender
          image: {{ .Values.scheduler.extender.image }}:{{ .Values.version }}
          imagePullPolicy: {{ .Values.scheduler.extender.imagePullPolicy | quote }}
          env:
          ...

而 KubeSchedulerConfiguration 的配置文件如下:

代码语言:yaml
复制
# charts\hami\templates\scheduler\configmapnew.yaml
{{- if .Values.scheduler.kubeScheduler.enabled }}
apiVersion: v1
kind: ConfigMap
metadata:
  name: {{ include "hami-vgpu.scheduler" . }}-newversion
  labels:
    app.kubernetes.io/component: hami-scheduler
    {{- include "hami-vgpu.labels" . | nindent 4 }}
data:
  config.yaml: |
    {{- if gt (regexReplaceAll "[^0-9]" .Capabilities.KubeVersion.Minor "" | int) 25}}
    apiVersion: kubescheduler.config.k8s.io/v1
    {{- else }}
    apiVersion: kubescheduler.config.k8s.io/v1beta2
    {{- end }}
    kind: KubeSchedulerConfiguration
    leaderElection:
      leaderElect: false
    profiles:
    - schedulerName: {{ .Values.schedulerName }}
    extenders:    # 这里配置了 extender
    - urlPrefix: "https://127.0.0.1:443" # 指定相同网络命名空间下的地址,即同一个 Pod 内
      filterVerb: filter # filter 接口,指示了调度器会调用这个扩展器服务来过滤节点
      bindVerb: bind # bind 接口,调度器扩展器可以执行绑定操作
      nodeCacheCapable: true
      weight: 1
      httpTimeout: 30s
      enableHTTPS: true
      tlsConfig:
        insecure: true
      managedResources: # 这部分指定这个扩展调度器 hami-cheduler 管理的资源,只有 Pod Resource 中申请了 managedResources 中指定的资源时,Scheduler 才会请求我们配置的 Extender
      - name: {{ .Values.resourceName }}
        ignoredByScheduler: true   # 这里的 true 表示当前调度器忽略这个资源,由 extender 进行调度
      - name: {{ .Values.resourceMem }}
        ignoredByScheduler: true
      - name: {{ .Values.resourceCores }}
        ignoredByScheduler: true
      - name: {{ .Values.resourceMemPercentage }}
        ignoredByScheduler: true
      - name: {{ .Values.resourcePriority }}
        ignoredByScheduler: true
      - name: {{ .Values.mluResourceName }}
        ignoredByScheduler: true
      - name: {{ .Values.dcuResourceName }}
        ignoredByScheduler: true
      - name: {{ .Values.dcuResourceMem }}
        ignoredByScheduler: true
      - name: {{ .Values.dcuResourceCores }}
        ignoredByScheduler: true
      - name: {{ .Values.iluvatarResourceName }}
        ignoredByScheduler: true
      {{- if .Values.devices.ascend.enabled }}
      {{- range .Values.devices.ascend.customresources }}
      - name: {{ . }}
        ignoredByScheduler: true
      {{- end }}
      {{- end }}
      {{- if .Values.devices.mthreads.enabled }}
      {{- range .Values.devices.mthreads.customresources }}
      - name: {{ . }}
        ignoredByScheduler: true
      {{- end }}
      {{- end }}
{{- end }}

3.4.1 filter 实现

代码语言:go
复制
// pkg\scheduler\scheduler.go
func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFilterResult, error) {
	klog.InfoS("Starting schedule filter process", "pod", args.Pod.Name, "uuid", args.Pod.UID, "namespace", args.Pod.Namespace)
	nums := k8sutil.Resourcereqs(args.Pod)
	total := 0
	for _, n := range nums {
		for _, k := range n {
			total += int(k.Nums)
		}
	}
    // 这里是统计到 Pod 申请的特殊资源为0,所以返回全部 Node 都可以调度 
	if total == 0 {
		klog.V(1).InfoS("Pod does not request any resources",
			"pod", args.Pod.Name)
		s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("does not request any resource"))
		return &extenderv1.ExtenderFilterResult{
			NodeNames:   args.NodeNames,
			FailedNodes: nil,
			Error:       "",
		}, nil
	}
	annos := args.Pod.Annotations
	s.delPod(args.Pod)
    // 返回所有 Node 的内存使用情况
	nodeUsage, failedNodes, err := s.getNodesUsage(args.NodeNames, args.Pod)
	if err != nil {
		s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
		return nil, err
	}
	if len(failedNodes) != 0 {
		klog.V(5).InfoS("Nodes failed during usage retrieval",
			"nodes", failedNodes)
	}
    // 给可用的 Node 进行打分,具体打分逻辑则是根据每个节点上的已经使用的 GPU Core、GPU Memory 资源和总的GPU Core、GPU Memory 的比值,根据权重归一化处理后得到最终的得分。除了计算得分还要判断节点剩余资源是否能满足当前 Pod,如果不满足则直接忽略掉,所以这里的 nodeScores 可能为空
	nodeScores, err := s.calcScore(nodeUsage, nums, annos, args.Pod, failedNodes)
	if err != nil {
		err := fmt.Errorf("calcScore failed %v for pod %v", err, args.Pod.Name)
		s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
		return nil, err
	}
    // 如果打分后的 node 为空返回失败
	if len((*nodeScores).NodeList) == 0 {
		klog.V(4).InfoS("No available nodes meet the required scores",
			"pod", args.Pod.Name)
		s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("no available node, all node scores do not meet"))
		return &extenderv1.ExtenderFilterResult{
			FailedNodes: failedNodes,
		}, nil
	}
	klog.V(4).Infoln("nodeScores_len=", len((*nodeScores).NodeList))
    // 根据打分进行升序排序
	sort.Sort(nodeScores)
    // 直接选择最高分的 node,因为这里的 extender 没有实现 Prioritize,这里的 filter 集成了 Filter 和 Prioritize
	m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1]
	klog.InfoS("Scheduling pod to node",
		"podNamespace", args.Pod.Namespace,
		"podName", args.Pod.Name,
		"nodeID", m.NodeID,
		"devices", m.Devices)
	annotations := make(map[string]string)
	annotations[util.AssignedNodeAnnotations] = m.NodeID
	annotations[util.AssignedTimeAnnotations] = strconv.FormatInt(time.Now().Unix(), 10)

	for _, val := range device.GetDevices() {
		val.PatchAnnotations(&annotations, m.Devices)
	}

	//InRequestDevices := util.EncodePodDevices(util.InRequestDevices, m.devices)
	//supportDevices := util.EncodePodDevices(util.SupportDevices, m.devices)
	//maps.Copy(annotations, InRequestDevices)
	//maps.Copy(annotations, supportDevices)
	s.addPod(args.Pod, m.NodeID, m.Devices)
    // 更新 Pod 的 annotation
	err = util.PatchPodAnnotations(args.Pod, annotations)
	if err != nil {
		s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
		s.delPod(args.Pod)
		return nil, err
	}
	s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringSucceed, []string{m.NodeID}, nil)
    // 返回结果
	res := extenderv1.ExtenderFilterResult{NodeNames: &[]string{m.NodeID}}
	return &res, nil
}

3.4.2 bind 实现

bind 就是根据前面 filter 的结果,将 Pod 和 Node 进行绑定:

代码语言:go
复制
// pkg\scheduler\scheduler.go
func (s *Scheduler) Bind(args extenderv1.ExtenderBindingArgs) (*extenderv1.ExtenderBindingResult, error) {
	klog.InfoS("Attempting to bind pod to node", "pod", args.PodName, "namespace", args.PodNamespace, "node", args.Node)
	var res *extenderv1.ExtenderBindingResult

	binding := &corev1.Binding{
		ObjectMeta: metav1.ObjectMeta{Name: args.PodName, UID: args.PodUID},
		Target:     corev1.ObjectReference{Kind: "Node", Name: args.Node},
	}
	current, err := s.kubeClient.CoreV1().Pods(args.PodNamespace).Get(context.Background(), args.PodName, metav1.GetOptions{})
	if err != nil {
		klog.ErrorS(err, "Failed to get pod", "pod", args.PodName, "namespace", args.PodNamespace)
		return &extenderv1.ExtenderBindingResult{Error: err.Error()}, err
	}
	klog.InfoS("Trying to get the target node for pod", "pod", args.PodName, "namespace", args.PodNamespace, "node", args.Node)
	node, err := s.kubeClient.CoreV1().Nodes().Get(context.Background(), args.Node, metav1.GetOptions{})
	if err != nil {
		klog.ErrorS(err, "Failed to get node", "node", args.Node)
		s.recordScheduleBindingResultEvent(current, EventReasonBindingFailed, []string{}, fmt.Errorf("failed to get node %s", args.Node))
		res = &extenderv1.ExtenderBindingResult{Error: err.Error()}
		return res, nil
	}

	tmppatch := map[string]string{
		util.DeviceBindPhase:     "allocating",
		util.BindTimeAnnotations: strconv.FormatInt(time.Now().Unix(), 10),
	}

	for _, val := range device.GetDevices() {
		err = val.LockNode(node, current)
		if err != nil {
			klog.ErrorS(err, "Failed to lock node", "node", args.Node, "device", val)
			goto ReleaseNodeLocks
		}
	}

	err = util.PatchPodAnnotations(current, tmppatch)
	if err != nil {
		klog.ErrorS(err, "Failed to patch pod annotations", "pod", klog.KObj(current))
		return &extenderv1.ExtenderBindingResult{Error: err.Error()}, err
	}
    // 在这里调用 apiserver 的接口创建一个 binding 对象,将 Pod 调度到指定 Node
	err = s.kubeClient.CoreV1().Pods(args.PodNamespace).Bind(context.Background(), binding, metav1.CreateOptions{})
	if err != nil {
		klog.ErrorS(err, "Failed to bind pod", "pod", args.PodName, "namespace", args.PodNamespace, "node", args.Node)
		goto ReleaseNodeLocks
	}

	s.recordScheduleBindingResultEvent(current, EventReasonBindingSucceed, []string{args.Node}, nil)
	klog.InfoS("Successfully bound pod to node", "pod", args.PodName, "namespace", args.PodNamespace, "node", args.Node)
	return &extenderv1.ExtenderBindingResult{Error: ""}, nil
// 绑定失败的话需要释放锁定的 Node 上的资源
ReleaseNodeLocks:
	klog.InfoS("Release node locks", "node", args.Node)
	for _, val := range device.GetDevices() {
		val.ReleaseNodeLock(node, current)
	}
	s.recordScheduleBindingResultEvent(current, EventReasonBindingFailed, []string{}, err)
	return &extenderv1.ExtenderBindingResult{Error: err.Error()}, nil
}

3.4.3 动态获取资源信息和使用情况

从第2章节的图中,可以看出 hami-scheduler 会动态的从 kube-apiserver 获取 Node、Pod 的 annotation 并从中解析出 GPU 资源和使用情况,可以通过 scheduler 的 Start 方法看出:

代码语言:go
复制
// pkg\scheduler\scheduler.go
func (s *Scheduler) Start() {
	klog.InfoS("Starting HAMi scheduler components")
	s.kubeClient = client.GetClient()
    // informers 提供了一种高效、可靠的方式来监听和响应 K8s 集群资源变化
	informerFactory := informers.NewSharedInformerFactoryWithOptions(s.kubeClient, time.Hour*1)
	s.podLister = informerFactory.Core().V1().Pods().Lister()
	s.nodeLister = informerFactory.Core().V1().Nodes().Lister()

	informer := informerFactory.Core().V1().Pods().Informer()
    // 处理 Pod 的 Add/Del/Update 事件
	informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    s.onAddPod,
		UpdateFunc: s.onUpdatePod,
		DeleteFunc: s.onDelPod,
	})
    // 处理 Node 的 Add/Del/Update 事件
	informerFactory.Core().V1().Nodes().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    s.onAddNode,
		UpdateFunc: s.onUpdateNode,
		DeleteFunc: s.onDelNode,
	})
	informerFactory.Start(s.stopCh)
	informerFactory.WaitForCacheSync(s.stopCh)
	s.addAllEventHandlers()
}

Node 变化处理:

代码语言:go
复制
// pkg\scheduler\scheduler.go
// 这里都是给 nodeNotify channel 发送一个消息,会触发 RegisterFromNodeAnnotations 的循环处理
func (s *Scheduler) onUpdateNode(_, newObj interface{}) {
	s.nodeNotify <- struct{}{}
}

func (s *Scheduler) onDelNode(obj interface{}) {
	s.nodeNotify <- struct{}{}
}

func (s *Scheduler) onAddNode(obj interface{}) {
	s.nodeNotify <- struct{}{}
}

Pod 变化处理:

代码语言:go
复制
// pkg\scheduler\scheduler.go
func (s *Scheduler) onAddPod(obj interface{}) {
	pod, ok := obj.(*corev1.Pod)
	if !ok {
		klog.ErrorS(fmt.Errorf("invalid pod object"), "Failed to process pod addition")
		return
	}
	klog.V(5).InfoS("Pod added", "pod", pod.Name, "namespace", pod.Namespace)
    // 对于没有 hami.io/vgpu-node annoations 的 Pod,不予处理
	nodeID, ok := pod.Annotations[util.AssignedNodeAnnotations]
	if !ok {
		return
	}
    // 待销毁的 Pod 也不处理
	if k8sutil.IsPodInTerminatedState(pod) {
		s.delPod(pod)
		return
	}
	podDev, _ := util.DecodePodDevices(util.SupportDevices, pod.Annotations)
    // 记录 Pod 占用的资源信息
	s.addPod(pod, nodeID, podDev)
}

func (s *Scheduler) onUpdatePod(_, newObj interface{}) {
	s.onAddPod(newObj)
}

func (s *Scheduler) onDelPod(obj interface{}) {
	pod, ok := obj.(*corev1.Pod)
	if !ok {
		klog.Errorf("unknown add object type")
		return
	}
	_, ok = pod.Annotations[util.AssignedNodeAnnotations]
	if !ok {
		return
	}
    // 移除 Pod 占用的资源
	s.delPod(pod)
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 背景
    • 1.1 K8s 的自定义调度逻辑
    • 1.2 Scheduler Extender
  • 2 HAMi webhook、scheduler、device-plugin 间的关系和工作流程 {#section2}
  • 3 HAMi scheduler 源码分析
    • 3.1 代码结构
    • 3.2 scheduler 启动流程
    • 3.3 webhook 实现
      • 3.3.1 MutatingWebhookConfiguration 对象
      • 3.3.2 代码实现
    • 3.4 scheduler 实现
      • 3.4.1 filter 实现
      • 3.4.2 bind 实现
      • 3.4.3 动态获取资源信息和使用情况
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档