首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >HAMi源码解析——device-plugin

HAMi源码解析——device-plugin

原创
作者头像
DifficultWork
修改2025-07-09 14:53:21
修改2025-07-09 14:53:21
5750
举报
文章被收录于专栏:阶梯计划阶梯计划

1 背景

在介绍 HAMi 的 device-plugin 前,先了解一下 K8s 中的 Pod 是怎么使用 GPU 的。

1.1 K8s 的 device-plugin

K8s 缺省只为 Pod 提供申请 CPU 和内存两种资源,如果涉及其他设备资源,则需要 K8s 的 device-plugin 机制,通过插件的形式来接入其他设备资源(这种插件机制类似 K8s 的 CNI/CRI/CNI)。

device-plugin 需要具备两个基本的能力:1、向节点的kubelet注册插件;2、提供必要的接口能力(如:ListAndWatch 和 Allocate,分别用于列出可用的设备并持续监视这些设备的状态变化和分配指定数量的设备资源)

1.2 NVIDIA GPU Operator

NVIDIA GPU Operator 是为了简化在 K8s 环境使用 GPU 的过程。下图是 的各组件介绍:

HAMi 的 device-plugin 就是替代上面的 NVIDIA Device Plugin 管理 GPU 资源的分配。

2 源码分析

2.1 device-plugin 代码结构

代码语言:bash
复制
HAMi
├── cmd
│   ├── device-plugin
│       ├── nvidia
│           ├── main.go
│           ├── plugin-manager.go
│           ├── vgpucfg.go
│           ├── watchers.go
├── pkg
│   ├── device-plugin
│       ├── nvidiadevice
│           ├── nvinternal
│               ├── cdi
│               │   ├── api_mock.go
│               │   ├── api.go
│               │   ├── cdi.go
│               │   ├── factory.go
│               │   ├── null.go
│               │   ├── options.go
│               ├── info
│               │   ├── version.go
│               ├── mig
│               │   ├── mig.go
│               ├── plugin
│               │   ├── manager
│               │   │   ├── api.go
│               │   │   ├── factory.go
│               │   │   ├── null.go
│               │   │   ├── nvml.go
│               │   │   ├── options.go
│               │   │   ├── tegra.go
│               │   ├── api.go
│               │   ├── register_test.go
│               │   ├── register.go
│               │   ├── server_test.go
│               │   ├── server.go
│               │   ├── ulti_test.go
│               │   ├── ulti.go
│               ├── rm
│                   ├── allocate.go
│                   ├── device_map_test.go
│                   ├── device_map.go
│                   ├── health_test.go
│                   ├── health.go
│                   ├── helper.go
│                   ├── nvml_devices_test.go
│                   ├── nvml_devices.go
│                   ├── nvml_manager.go
│                   ├── rm.go
│                   ├── tegra_devices.go
│                   ├── tegra_manager.go
│                   ├── wsl_devices.go

2.2 device-plugin 的启动流程

与其他 Go 的后台进程一样,device-plugin 的入口文件 main.go 在 cmd/device-plugin/nvidia 下面,主要实现在 pkg/device-plugin/nvidiadevice 下面。

device-plugin 的启动流程核心是做两件事情:

  • 向 kubelet 注册
  • 启动 grpc 服务,提供 ListAndWatch 和 Allocate 等接口能力

下图是一个启动调用链:

启动时会接收一些关键的启动参数:

代码语言:go
复制
// cmd\device-plugin\nvidia\main.go
func main() {
	var configFile string

	c := cli.NewApp()
	c.Name = "NVIDIA Device Plugin"
	c.Usage = "NVIDIA device plugin for Kubernetes"
	c.Action = func(ctx *cli.Context) error {
		flagutil.PrintCliFlags(ctx)
		return start(ctx, c.Flags)
	}
	
    ...
    // 这里追加的启动参数
	c.Flags = append(c.Flags, addFlags()...)
	err := c.Run(os.Args)
	if err != nil {
		klog.Error(err)
		os.Exit(1)
	}
}

// cmd\device-plugin\nvidia\vgpucfg.go
func addFlags() []cli.Flag {
	addition := []cli.Flag{
		&cli.StringFlag{
			Name:    "node-name",
			Value:   os.Getenv(util.NodeNameEnvName),
			Usage:   "node name",
			EnvVars: []string{"NodeName"},
		},
        // GPU 的分割数:即每张 GPU 可最大分配的任务。如果配置为 N,则每个 GPU 上最多可以同时存在 N 个任务
		&cli.UintFlag{
			Name:    "device-split-count",
			Value:   2,
			Usage:   "the number for NVIDIA device split",
			EnvVars: []string{"DEVICE_SPLIT_COUNT"},
		},
        // 表示 GPU memory 的超分比例,默认 1.0,大于 1.0 则表示启用虚拟显存(实验功能)
		&cli.Float64Flag{
			Name:    "device-memory-scaling",
			Value:   1.0,
			Usage:   "the ratio for NVIDIA device memory scaling",
			EnvVars: []string{"DEVICE_MEMORY_SCALING"},
		},
        // 表示 GPU core 的超分比例,默认 1.0
		&cli.Float64Flag{
			Name:    "device-cores-scaling",
			Value:   1.0,
			Usage:   "the ratio for NVIDIA device cores scaling",
			EnvVars: []string{"DEVICE_CORES_SCALING"},
		},
        // 是否关闭 GPU Core Limit,默认 false
		&cli.BoolFlag{
			Name:    "disable-core-limit",
			Value:   false,
			Usage:   "If set, the core utilization limit will be ignored",
			EnvVars: []string{"DISABLE_CORE_LIMIT"},
		},
        // 资源名称,默认是 nvidia.com/gpu,和 nvidia device-plugin 原生的重复了,可以改掉
		&cli.StringFlag{
			Name:  "resource-name",
			Value: "nvidia.com/gpu",
			Usage: "the name of field for number GPU visible in container",
		},
	}
	return addition
}

2.3 Register 实现

Register 负责向 kubelet 注册插件信息:

代码语言:go
复制
// pkg\device-plugin\nvidiadevice\nvinternal\plugin\server.go
// Register registers the device plugin for the given resourceName with Kubelet.
func (plugin *NvidiaDevicePlugin) Register() error {
	conn, err := plugin.dial(kubeletdevicepluginv1beta1.KubeletSocket, 5*time.Second)
	if err != nil {
		return err
	}
	defer conn.Close()

	client := kubeletdevicepluginv1beta1.NewRegistrationClient(conn)
	reqt := &kubeletdevicepluginv1beta1.RegisterRequest{
        // device plugin 的版本
		Version:      kubeletdevicepluginv1beta1.Version,
        // device plugin 的访问地址,kubelet通过这个地址访问插件
		Endpoint:     path.Base(plugin.socket),
        // 资源名称,创建 Pod 申请 vGPU 时通过这个名称查找到该插件进行资源申请
		ResourceName: string(plugin.rm.Resource()),
		Options: &kubeletdevicepluginv1beta1.DevicePluginOptions{
			GetPreferredAllocationAvailable: false,
		},
	}

	_, err = client.Register(context.Background(), reqt)
	if err != nil {
		return err
	}
	return nil
}

2.3.1 WatchAndRegister

WatchAndRegister 是将 Node 上的 GPU 信息以 annotations 的形式添加到 Node。

代码语言:go
复制
// pkg\device-plugin\nvidiadevice\nvinternal\plugin\register.go
func (plugin *NvidiaDevicePlugin) WatchAndRegister() {
	klog.Info("Starting WatchAndRegister")
	errorSleepInterval := time.Second * 5
	successSleepInterval := time.Second * 30
	for {
        // 将 Node 上的 GPU 信息以 annotations 的形式添加到 Node
		err := plugin.RegistrInAnnotation()
		if err != nil {
			klog.Errorf("Failed to register annotation: %v", err)
			klog.Infof("Retrying in %v seconds...", errorSleepInterval)
			time.Sleep(errorSleepInterval)
		} else {
			klog.Infof("Successfully registered annotation. Next check in %v seconds...", successSleepInterval)
			time.Sleep(successSleepInterval)
		}
	}
}

// pkg\device-plugin\nvidiadevice\nvinternal\plugin\register.go
func (plugin *NvidiaDevicePlugin) RegistrInAnnotation() error {
    // 获取 Node 上的 GPU 信息,组装成 api.DeviceInfo 对象
	devices := plugin.getAPIDevices()
	klog.InfoS("start working on the devices", "devices", devices)
	annos := make(map[string]string)
	node, err := util.GetNode(util.NodeName)
	if err != nil {
		klog.Errorln("get node error", err.Error())
		return err
	}
	encodeddevices := util.EncodeNodeDevices(*devices)
	annos[nvidia.HandshakeAnnos] = "Reported " + time.Now().String()
	annos[nvidia.RegisterAnnos] = encodeddevices
	klog.Infof("patch node with the following annos %v", fmt.Sprintf("%v", annos))
    // 调用 kube-apiserver 接口更新 Node 对象的 Annoations 把 Device 信息存起来(后续 HAMi-Scheduler 在进行调度时就会用到这边上报的 annotations 作为调度依据的一部分)
	err = util.PatchNodeAnnotations(node, annos)

	if err != nil {
		klog.Errorln("patch node error", err.Error())
	}
	return err
}

2.4 ListAndWatch 实现

ListAndWatch 用于感知 Node 上的设备并上报给 kubelet,该grpc接口是一个服务端流接口:

代码语言:go
复制
// pkg\device-plugin\nvidiadevice\nvinternal\plugin\server.go
// ListAndWatch lists devices and update that list according to the health status
func (plugin *NvidiaDevicePlugin) ListAndWatch(e *kubeletdevicepluginv1beta1.Empty, s kubeletdevicepluginv1beta1.DevicePlugin_ListAndWatchServer) error {
    // 首次上报
	s.Send(&kubeletdevicepluginv1beta1.ListAndWatchResponse{Devices: plugin.apiDevices()})

	for {
		select {
		case <-plugin.stop:
			return nil
        // 当有设备出现 unhealthy 状态,就重新上报一次给 kubelet
		case d := <-plugin.health:
			// FIXME: there is no way to recover from the Unhealthy state.
			d.Health = kubeletdevicepluginv1beta1.Unhealthy
			klog.Infof("'%s' device marked unhealthy: %s", plugin.rm.Resource(), d.ID)
			s.Send(&kubeletdevicepluginv1beta1.ListAndWatchResponse{Devices: plugin.apiDevices()})
		}
	}
}

上面的 plugin.apiDevices() 是核心:

代码语言:go
复制
// pkg\device-plugin\nvidiadevice\nvinternal\plugin\server.go
func (plugin *NvidiaDevicePlugin) apiDevices() []*kubeletdevicepluginv1beta1.Device {
	return plugin.rm.Devices().GetPluginDevices(plugin.schedulerConfig.DeviceSplitCount)
}

// pkg\device-plugin\nvidiadevice\nvinternal\rm\devices.go
// GetPluginDevices returns the plugin Devices from all devices in the Devices
func (ds Devices) GetPluginDevices(count uint) []*kubeletdevicepluginv1beta1.Device {
	var res []*kubeletdevicepluginv1beta1.Device

	if !strings.Contains(ds.GetIDs()[0], "MIG") {
		for _, dev := range ds {
            // 根据 DeviceSplitCount 进行 Device 复制
			for i := uint(0); i < count; i++ {
				id := fmt.Sprintf("%v-%v", dev.ID, i)
				res = append(res, &kubeletdevicepluginv1beta1.Device{
					ID:       id,
					Health:   dev.Health,
					Topology: nil,
				})
			}
		}
	} else {
		for _, d := range ds {
			res = append(res, &d.Device)
		}

	}

	return res
}

2.5 Allocate 实现

device-plugin 的 Allocate 实现包含两部分:

  • HAMi 的自定义逻辑:主要是根据 Pod Resource 中的申请资源数量设置对应的环境变量,以及挂载 libvgpu.so 以替换 Pod 中的原生驱动(参考)
  • NVIDIA 的原生逻辑:则是设置 NVIDIA_VISIBLE_DEVICES 这个环境变量,然后由 NVIDIA Container Toolkit 对该容器分配 GPU。因为 HAMi 并没有为容器分配 GPU 的能力, 因此需要依赖 NVIDIA 的原生逻辑。

这样 Pod 中有环境变量, NVIDIA Container Toolkit 就会为其分配 GPU,通过 HAMi 自定义逻辑中替换 libvgpu.so 和添加部分环境变量,从而实现了对 GPU 的限制。

2.5.1 HAMi 自定义逻辑

代码语言:go
复制
// pkg\device-plugin\nvidiadevice\nvinternal\plugin\server.go
// Allocate which return list of devices.
func (plugin *NvidiaDevicePlugin) Allocate(ctx context.Context, reqs *kubeletdevicepluginv1beta1.AllocateRequest) (*kubeletdevicepluginv1beta1.AllocateResponse, error) {
	...
	for idx, req := range reqs.ContainerRequests {
		// If the devices being allocated are replicas, then (conditionally)
		// error out if more than one resource is being allocated.

		if strings.Contains(req.DevicesIDs[0], "MIG") {
            // 这里会有调用 NVIDIA 原生逻辑
            ...
        } else {
            // 这里会有调用 NVIDIA 原生逻辑
            ...
            // 非 MIG 模式
			if plugin.operatingMode != "mig" {
                // 添加一个 CUDA_DEVICE_MEMORY_LIMIT_$Index 的环境变量,对 GPU 内存进行限制
				for i, dev := range devreq {
					limitKey := fmt.Sprintf("CUDA_DEVICE_MEMORY_LIMIT_%v", i)
					response.Envs[limitKey] = fmt.Sprintf("%vm", dev.Usedmem)
				}
                // 配置限制 GPU Core 的环境变量
				response.Envs["CUDA_DEVICE_SM_LIMIT"] = fmt.Sprint(devreq[0].Usedcores)
                // 该环境变量设置 share_region mmap 文件在容器中的位置
				response.Envs["CUDA_DEVICE_MEMORY_SHARED_CACHE"] = fmt.Sprintf("%s/vgpu/%v.cache", hostHookPath, uuid.New().String())
                // >1表示 GPU 内存超分
				if plugin.schedulerConfig.DeviceMemoryScaling > 1 {
					response.Envs["CUDA_OVERSUBSCRIBE"] = "true"
				}
                // 是否关闭算力限制
				if plugin.schedulerConfig.DisableCoreLimit {
					response.Envs[util.CoreLimitSwitch] = "disable"
				}
                // 这里就实现了 libvgpu.so 库的替换
				cacheFileHostDirectory := fmt.Sprintf("%s/vgpu/containers/%s_%s", hostHookPath, current.UID, currentCtr.Name)
				os.RemoveAll(cacheFileHostDirectory)

				os.MkdirAll(cacheFileHostDirectory, 0777)
				os.Chmod(cacheFileHostDirectory, 0777)
				os.MkdirAll("/tmp/vgpulock", 0777)
				os.Chmod("/tmp/vgpulock", 0777)
				response.Mounts = append(response.Mounts,
					&kubeletdevicepluginv1beta1.Mount{ContainerPath: fmt.Sprintf("%s/vgpu/libvgpu.so", hostHookPath),
						HostPath: GetLibPath(),
						ReadOnly: true},
					&kubeletdevicepluginv1beta1.Mount{ContainerPath: fmt.Sprintf("%s/vgpu", hostHookPath),
						HostPath: cacheFileHostDirectory,
						ReadOnly: false},
					&kubeletdevicepluginv1beta1.Mount{ContainerPath: "/tmp/vgpulock",
						HostPath: "/tmp/vgpulock",
						ReadOnly: false},
				)
				found := false
				for _, val := range currentCtr.Env {
                    // 没有指定 CUDA_DISABLE_CONTROL=true 时进行动态库替换
					if strings.Compare(val.Name, "CUDA_DISABLE_CONTROL") == 0 {
						// if env existed but is set to false or can not be parsed, ignore
						t, _ := strconv.ParseBool(val.Value)
						if !t {
							continue
						}
						// only env existed and set to true, we mark it "found"
						found = true
						break
					}
				}
				if !found {
					response.Mounts = append(response.Mounts, &kubeletdevicepluginv1beta1.Mount{ContainerPath: "/etc/ld.so.preload",
						HostPath: hostHookPath + "/vgpu/ld.so.preload",
						ReadOnly: true},
					)
				}
                ...
			}
			responses.ContainerResponses = append(responses.ContainerResponses, response)
		}
	}
	klog.Infoln("Allocate Response", responses.ContainerResponses)
	device.PodAllocationTrySuccess(nodename, nvidia.NvidiaGPUDevice, NodeLockNvidia, current)
	return &responses, nil
}

2.5.2 NVIDIA 原生逻辑

代码语言:go
复制
// pkg\device-plugin\nvidiadevice\nvinternal\plugin\server.go
func (plugin *NvidiaDevicePlugin) getAllocateResponse(requestIds []string) (*kubeletdevicepluginv1beta1.ContainerAllocateResponse, error) {
	deviceIDs := plugin.deviceIDsFromAnnotatedDeviceIDs(requestIds)

	responseID := uuid.New().String()
	response, err := plugin.getAllocateResponseForCDI(responseID, deviceIDs)
	if err != nil {
		return nil, fmt.Errorf("failed to get allocate response for CDI: %v", err)
	}

    // 核心:添加一个环境变量 NVIDIA_VISIBLE_DEVICES,该环境变量设置后 nvidia-container-toolkit 会为有这个环境变量的容器分配 GPU
	response.Envs = plugin.apiEnvs(plugin.deviceListEnvvar, deviceIDs)
	//if plugin.deviceListStrategies.Includes(spec.DeviceListStrategyVolumeMounts) || plugin.deviceListStrategies.Includes(spec.DeviceListStrategyEnvvar) {
	//	response.Envs = plugin.apiEnvs(plugin.deviceListEnvvar, deviceIDs)
	//}
	/*
		if plugin.deviceListStrategies.Includes(spec.DeviceListStrategyVolumeMounts) {
			response.Envs = plugin.apiEnvs(plugin.deviceListEnvvar, []string{deviceListAsVolumeMountsContainerPathRoot})
			response.Mounts = plugin.apiMounts(deviceIDs)
		}*/
	if *plugin.config.Flags.Plugin.PassDeviceSpecs {
		response.Devices = plugin.apiDeviceSpecs(*plugin.config.Flags.NvidiaDriverRoot, requestIds)
	}
	if *plugin.config.Flags.GDSEnabled {
		response.Envs["NVIDIA_GDS"] = "enabled"
	}
	if *plugin.config.Flags.MOFEDEnabled {
		response.Envs["NVIDIA_MOFED"] = "enabled"
	}

	return &response, nil
}

上面环境变量 NVIDIA_VISIBLE_DEVICES 是在 NewNvidiaDevicePlugin 时添加的:

代码语言:go
复制
// pkg\device-plugin\nvidiadevice\nvinternal\plugin\server.go
// NewNvidiaDevicePlugin returns an initialized NvidiaDevicePlugin
func NewNvidiaDevicePlugin(config *nvidia.DeviceConfig, resourceManager rm.ResourceManager, cdiHandler cdi.Interface, cdiEnabled bool, sConfig *device.Config, mode string) *NvidiaDevicePlugin {
	_, name := resourceManager.Resource().Split()

	deviceListStrategies, _ := spec.NewDeviceListStrategies(*config.Flags.Plugin.DeviceListStrategy)

	klog.Infoln("reading config=", config, "resourceName", config.ResourceName, "configfile=", *ConfigFile, "sconfig=", sConfig)

	// Initialize devices with configuration
	if err := device.InitDevicesWithConfig(sConfig); err != nil {
		klog.Fatalf("failed to initialize devices: %v", err)
	}
	return &NvidiaDevicePlugin{
		rm:                   resourceManager,
		config:               config,
		deviceListEnvvar:     "NVIDIA_VISIBLE_DEVICES",
		deviceListStrategies: deviceListStrategies,
		socket:               kubeletdevicepluginv1beta1.DevicePluginPath + "nvidia-" + name + ".sock",
		cdiHandler:           cdiHandler,
		cdiEnabled:           cdiEnabled,
		cdiAnnotationPrefix:  *config.Flags.Plugin.CDIAnnotationPrefix,
		schedulerConfig:      sConfig.NvidiaConfig,
		operatingMode:        mode,
		migCurrent:           nvidia.MigPartedSpec{},

		// These will be reinitialized every
		// time the plugin server is restarted.
		server: nil,
		health: nil,
		stop:   nil,
	}
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 背景
    • 1.1 K8s 的 device-plugin
    • 1.2 NVIDIA GPU Operator
  • 2 源码分析
    • 2.1 device-plugin 代码结构
    • 2.2 device-plugin 的启动流程
    • 2.3 Register 实现
      • 2.3.1 WatchAndRegister
    • 2.4 ListAndWatch 实现
    • 2.5 Allocate 实现
      • 2.5.1 HAMi 自定义逻辑
      • 2.5.2 NVIDIA 原生逻辑
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档