
在介绍 HAMi 的 device-plugin 前,先了解一下 K8s 中的 Pod 是怎么使用 GPU 的。
K8s 缺省只为 Pod 提供申请 CPU 和内存两种资源,如果涉及其他设备资源,则需要 K8s 的 device-plugin 机制,通过插件的形式来接入其他设备资源(这种插件机制类似 K8s 的 CNI/CRI/CNI)。
device-plugin 需要具备两个基本的能力:1、向节点的kubelet注册插件;2、提供必要的接口能力(如:ListAndWatch 和 Allocate,分别用于列出可用的设备并持续监视这些设备的状态变化和分配指定数量的设备资源)
NVIDIA GPU Operator 是为了简化在 K8s 环境使用 GPU 的过程。下图是 的各组件介绍:

HAMi 的 device-plugin 就是替代上面的 NVIDIA Device Plugin 管理 GPU 资源的分配。
HAMi
├── cmd
│ ├── device-plugin
│ ├── nvidia
│ ├── main.go
│ ├── plugin-manager.go
│ ├── vgpucfg.go
│ ├── watchers.go
├── pkg
│ ├── device-plugin
│ ├── nvidiadevice
│ ├── nvinternal
│ ├── cdi
│ │ ├── api_mock.go
│ │ ├── api.go
│ │ ├── cdi.go
│ │ ├── factory.go
│ │ ├── null.go
│ │ ├── options.go
│ ├── info
│ │ ├── version.go
│ ├── mig
│ │ ├── mig.go
│ ├── plugin
│ │ ├── manager
│ │ │ ├── api.go
│ │ │ ├── factory.go
│ │ │ ├── null.go
│ │ │ ├── nvml.go
│ │ │ ├── options.go
│ │ │ ├── tegra.go
│ │ ├── api.go
│ │ ├── register_test.go
│ │ ├── register.go
│ │ ├── server_test.go
│ │ ├── server.go
│ │ ├── ulti_test.go
│ │ ├── ulti.go
│ ├── rm
│ ├── allocate.go
│ ├── device_map_test.go
│ ├── device_map.go
│ ├── health_test.go
│ ├── health.go
│ ├── helper.go
│ ├── nvml_devices_test.go
│ ├── nvml_devices.go
│ ├── nvml_manager.go
│ ├── rm.go
│ ├── tegra_devices.go
│ ├── tegra_manager.go
│ ├── wsl_devices.go与其他 Go 的后台进程一样,device-plugin 的入口文件 main.go 在 cmd/device-plugin/nvidia 下面,主要实现在 pkg/device-plugin/nvidiadevice 下面。
device-plugin 的启动流程核心是做两件事情:
下图是一个启动调用链:

启动时会接收一些关键的启动参数:
// cmd\device-plugin\nvidia\main.go
func main() {
var configFile string
c := cli.NewApp()
c.Name = "NVIDIA Device Plugin"
c.Usage = "NVIDIA device plugin for Kubernetes"
c.Action = func(ctx *cli.Context) error {
flagutil.PrintCliFlags(ctx)
return start(ctx, c.Flags)
}
...
// 这里追加的启动参数
c.Flags = append(c.Flags, addFlags()...)
err := c.Run(os.Args)
if err != nil {
klog.Error(err)
os.Exit(1)
}
}
// cmd\device-plugin\nvidia\vgpucfg.go
func addFlags() []cli.Flag {
addition := []cli.Flag{
&cli.StringFlag{
Name: "node-name",
Value: os.Getenv(util.NodeNameEnvName),
Usage: "node name",
EnvVars: []string{"NodeName"},
},
// GPU 的分割数:即每张 GPU 可最大分配的任务。如果配置为 N,则每个 GPU 上最多可以同时存在 N 个任务
&cli.UintFlag{
Name: "device-split-count",
Value: 2,
Usage: "the number for NVIDIA device split",
EnvVars: []string{"DEVICE_SPLIT_COUNT"},
},
// 表示 GPU memory 的超分比例,默认 1.0,大于 1.0 则表示启用虚拟显存(实验功能)
&cli.Float64Flag{
Name: "device-memory-scaling",
Value: 1.0,
Usage: "the ratio for NVIDIA device memory scaling",
EnvVars: []string{"DEVICE_MEMORY_SCALING"},
},
// 表示 GPU core 的超分比例,默认 1.0
&cli.Float64Flag{
Name: "device-cores-scaling",
Value: 1.0,
Usage: "the ratio for NVIDIA device cores scaling",
EnvVars: []string{"DEVICE_CORES_SCALING"},
},
// 是否关闭 GPU Core Limit,默认 false
&cli.BoolFlag{
Name: "disable-core-limit",
Value: false,
Usage: "If set, the core utilization limit will be ignored",
EnvVars: []string{"DISABLE_CORE_LIMIT"},
},
// 资源名称,默认是 nvidia.com/gpu,和 nvidia device-plugin 原生的重复了,可以改掉
&cli.StringFlag{
Name: "resource-name",
Value: "nvidia.com/gpu",
Usage: "the name of field for number GPU visible in container",
},
}
return addition
}Register 负责向 kubelet 注册插件信息:
// pkg\device-plugin\nvidiadevice\nvinternal\plugin\server.go
// Register registers the device plugin for the given resourceName with Kubelet.
func (plugin *NvidiaDevicePlugin) Register() error {
conn, err := plugin.dial(kubeletdevicepluginv1beta1.KubeletSocket, 5*time.Second)
if err != nil {
return err
}
defer conn.Close()
client := kubeletdevicepluginv1beta1.NewRegistrationClient(conn)
reqt := &kubeletdevicepluginv1beta1.RegisterRequest{
// device plugin 的版本
Version: kubeletdevicepluginv1beta1.Version,
// device plugin 的访问地址,kubelet通过这个地址访问插件
Endpoint: path.Base(plugin.socket),
// 资源名称,创建 Pod 申请 vGPU 时通过这个名称查找到该插件进行资源申请
ResourceName: string(plugin.rm.Resource()),
Options: &kubeletdevicepluginv1beta1.DevicePluginOptions{
GetPreferredAllocationAvailable: false,
},
}
_, err = client.Register(context.Background(), reqt)
if err != nil {
return err
}
return nil
}WatchAndRegister 是将 Node 上的 GPU 信息以 annotations 的形式添加到 Node。
// pkg\device-plugin\nvidiadevice\nvinternal\plugin\register.go
func (plugin *NvidiaDevicePlugin) WatchAndRegister() {
klog.Info("Starting WatchAndRegister")
errorSleepInterval := time.Second * 5
successSleepInterval := time.Second * 30
for {
// 将 Node 上的 GPU 信息以 annotations 的形式添加到 Node
err := plugin.RegistrInAnnotation()
if err != nil {
klog.Errorf("Failed to register annotation: %v", err)
klog.Infof("Retrying in %v seconds...", errorSleepInterval)
time.Sleep(errorSleepInterval)
} else {
klog.Infof("Successfully registered annotation. Next check in %v seconds...", successSleepInterval)
time.Sleep(successSleepInterval)
}
}
}
// pkg\device-plugin\nvidiadevice\nvinternal\plugin\register.go
func (plugin *NvidiaDevicePlugin) RegistrInAnnotation() error {
// 获取 Node 上的 GPU 信息,组装成 api.DeviceInfo 对象
devices := plugin.getAPIDevices()
klog.InfoS("start working on the devices", "devices", devices)
annos := make(map[string]string)
node, err := util.GetNode(util.NodeName)
if err != nil {
klog.Errorln("get node error", err.Error())
return err
}
encodeddevices := util.EncodeNodeDevices(*devices)
annos[nvidia.HandshakeAnnos] = "Reported " + time.Now().String()
annos[nvidia.RegisterAnnos] = encodeddevices
klog.Infof("patch node with the following annos %v", fmt.Sprintf("%v", annos))
// 调用 kube-apiserver 接口更新 Node 对象的 Annoations 把 Device 信息存起来(后续 HAMi-Scheduler 在进行调度时就会用到这边上报的 annotations 作为调度依据的一部分)
err = util.PatchNodeAnnotations(node, annos)
if err != nil {
klog.Errorln("patch node error", err.Error())
}
return err
}ListAndWatch 用于感知 Node 上的设备并上报给 kubelet,该grpc接口是一个服务端流接口:
// pkg\device-plugin\nvidiadevice\nvinternal\plugin\server.go
// ListAndWatch lists devices and update that list according to the health status
func (plugin *NvidiaDevicePlugin) ListAndWatch(e *kubeletdevicepluginv1beta1.Empty, s kubeletdevicepluginv1beta1.DevicePlugin_ListAndWatchServer) error {
// 首次上报
s.Send(&kubeletdevicepluginv1beta1.ListAndWatchResponse{Devices: plugin.apiDevices()})
for {
select {
case <-plugin.stop:
return nil
// 当有设备出现 unhealthy 状态,就重新上报一次给 kubelet
case d := <-plugin.health:
// FIXME: there is no way to recover from the Unhealthy state.
d.Health = kubeletdevicepluginv1beta1.Unhealthy
klog.Infof("'%s' device marked unhealthy: %s", plugin.rm.Resource(), d.ID)
s.Send(&kubeletdevicepluginv1beta1.ListAndWatchResponse{Devices: plugin.apiDevices()})
}
}
}上面的 plugin.apiDevices() 是核心:
// pkg\device-plugin\nvidiadevice\nvinternal\plugin\server.go
func (plugin *NvidiaDevicePlugin) apiDevices() []*kubeletdevicepluginv1beta1.Device {
return plugin.rm.Devices().GetPluginDevices(plugin.schedulerConfig.DeviceSplitCount)
}
// pkg\device-plugin\nvidiadevice\nvinternal\rm\devices.go
// GetPluginDevices returns the plugin Devices from all devices in the Devices
func (ds Devices) GetPluginDevices(count uint) []*kubeletdevicepluginv1beta1.Device {
var res []*kubeletdevicepluginv1beta1.Device
if !strings.Contains(ds.GetIDs()[0], "MIG") {
for _, dev := range ds {
// 根据 DeviceSplitCount 进行 Device 复制
for i := uint(0); i < count; i++ {
id := fmt.Sprintf("%v-%v", dev.ID, i)
res = append(res, &kubeletdevicepluginv1beta1.Device{
ID: id,
Health: dev.Health,
Topology: nil,
})
}
}
} else {
for _, d := range ds {
res = append(res, &d.Device)
}
}
return res
}device-plugin 的 Allocate 实现包含两部分:
这样 Pod 中有环境变量, NVIDIA Container Toolkit 就会为其分配 GPU,通过 HAMi 自定义逻辑中替换 libvgpu.so 和添加部分环境变量,从而实现了对 GPU 的限制。
// pkg\device-plugin\nvidiadevice\nvinternal\plugin\server.go
// Allocate which return list of devices.
func (plugin *NvidiaDevicePlugin) Allocate(ctx context.Context, reqs *kubeletdevicepluginv1beta1.AllocateRequest) (*kubeletdevicepluginv1beta1.AllocateResponse, error) {
...
for idx, req := range reqs.ContainerRequests {
// If the devices being allocated are replicas, then (conditionally)
// error out if more than one resource is being allocated.
if strings.Contains(req.DevicesIDs[0], "MIG") {
// 这里会有调用 NVIDIA 原生逻辑
...
} else {
// 这里会有调用 NVIDIA 原生逻辑
...
// 非 MIG 模式
if plugin.operatingMode != "mig" {
// 添加一个 CUDA_DEVICE_MEMORY_LIMIT_$Index 的环境变量,对 GPU 内存进行限制
for i, dev := range devreq {
limitKey := fmt.Sprintf("CUDA_DEVICE_MEMORY_LIMIT_%v", i)
response.Envs[limitKey] = fmt.Sprintf("%vm", dev.Usedmem)
}
// 配置限制 GPU Core 的环境变量
response.Envs["CUDA_DEVICE_SM_LIMIT"] = fmt.Sprint(devreq[0].Usedcores)
// 该环境变量设置 share_region mmap 文件在容器中的位置
response.Envs["CUDA_DEVICE_MEMORY_SHARED_CACHE"] = fmt.Sprintf("%s/vgpu/%v.cache", hostHookPath, uuid.New().String())
// >1表示 GPU 内存超分
if plugin.schedulerConfig.DeviceMemoryScaling > 1 {
response.Envs["CUDA_OVERSUBSCRIBE"] = "true"
}
// 是否关闭算力限制
if plugin.schedulerConfig.DisableCoreLimit {
response.Envs[util.CoreLimitSwitch] = "disable"
}
// 这里就实现了 libvgpu.so 库的替换
cacheFileHostDirectory := fmt.Sprintf("%s/vgpu/containers/%s_%s", hostHookPath, current.UID, currentCtr.Name)
os.RemoveAll(cacheFileHostDirectory)
os.MkdirAll(cacheFileHostDirectory, 0777)
os.Chmod(cacheFileHostDirectory, 0777)
os.MkdirAll("/tmp/vgpulock", 0777)
os.Chmod("/tmp/vgpulock", 0777)
response.Mounts = append(response.Mounts,
&kubeletdevicepluginv1beta1.Mount{ContainerPath: fmt.Sprintf("%s/vgpu/libvgpu.so", hostHookPath),
HostPath: GetLibPath(),
ReadOnly: true},
&kubeletdevicepluginv1beta1.Mount{ContainerPath: fmt.Sprintf("%s/vgpu", hostHookPath),
HostPath: cacheFileHostDirectory,
ReadOnly: false},
&kubeletdevicepluginv1beta1.Mount{ContainerPath: "/tmp/vgpulock",
HostPath: "/tmp/vgpulock",
ReadOnly: false},
)
found := false
for _, val := range currentCtr.Env {
// 没有指定 CUDA_DISABLE_CONTROL=true 时进行动态库替换
if strings.Compare(val.Name, "CUDA_DISABLE_CONTROL") == 0 {
// if env existed but is set to false or can not be parsed, ignore
t, _ := strconv.ParseBool(val.Value)
if !t {
continue
}
// only env existed and set to true, we mark it "found"
found = true
break
}
}
if !found {
response.Mounts = append(response.Mounts, &kubeletdevicepluginv1beta1.Mount{ContainerPath: "/etc/ld.so.preload",
HostPath: hostHookPath + "/vgpu/ld.so.preload",
ReadOnly: true},
)
}
...
}
responses.ContainerResponses = append(responses.ContainerResponses, response)
}
}
klog.Infoln("Allocate Response", responses.ContainerResponses)
device.PodAllocationTrySuccess(nodename, nvidia.NvidiaGPUDevice, NodeLockNvidia, current)
return &responses, nil
}// pkg\device-plugin\nvidiadevice\nvinternal\plugin\server.go
func (plugin *NvidiaDevicePlugin) getAllocateResponse(requestIds []string) (*kubeletdevicepluginv1beta1.ContainerAllocateResponse, error) {
deviceIDs := plugin.deviceIDsFromAnnotatedDeviceIDs(requestIds)
responseID := uuid.New().String()
response, err := plugin.getAllocateResponseForCDI(responseID, deviceIDs)
if err != nil {
return nil, fmt.Errorf("failed to get allocate response for CDI: %v", err)
}
// 核心:添加一个环境变量 NVIDIA_VISIBLE_DEVICES,该环境变量设置后 nvidia-container-toolkit 会为有这个环境变量的容器分配 GPU
response.Envs = plugin.apiEnvs(plugin.deviceListEnvvar, deviceIDs)
//if plugin.deviceListStrategies.Includes(spec.DeviceListStrategyVolumeMounts) || plugin.deviceListStrategies.Includes(spec.DeviceListStrategyEnvvar) {
// response.Envs = plugin.apiEnvs(plugin.deviceListEnvvar, deviceIDs)
//}
/*
if plugin.deviceListStrategies.Includes(spec.DeviceListStrategyVolumeMounts) {
response.Envs = plugin.apiEnvs(plugin.deviceListEnvvar, []string{deviceListAsVolumeMountsContainerPathRoot})
response.Mounts = plugin.apiMounts(deviceIDs)
}*/
if *plugin.config.Flags.Plugin.PassDeviceSpecs {
response.Devices = plugin.apiDeviceSpecs(*plugin.config.Flags.NvidiaDriverRoot, requestIds)
}
if *plugin.config.Flags.GDSEnabled {
response.Envs["NVIDIA_GDS"] = "enabled"
}
if *plugin.config.Flags.MOFEDEnabled {
response.Envs["NVIDIA_MOFED"] = "enabled"
}
return &response, nil
}上面环境变量 NVIDIA_VISIBLE_DEVICES 是在 NewNvidiaDevicePlugin 时添加的:
// pkg\device-plugin\nvidiadevice\nvinternal\plugin\server.go
// NewNvidiaDevicePlugin returns an initialized NvidiaDevicePlugin
func NewNvidiaDevicePlugin(config *nvidia.DeviceConfig, resourceManager rm.ResourceManager, cdiHandler cdi.Interface, cdiEnabled bool, sConfig *device.Config, mode string) *NvidiaDevicePlugin {
_, name := resourceManager.Resource().Split()
deviceListStrategies, _ := spec.NewDeviceListStrategies(*config.Flags.Plugin.DeviceListStrategy)
klog.Infoln("reading config=", config, "resourceName", config.ResourceName, "configfile=", *ConfigFile, "sconfig=", sConfig)
// Initialize devices with configuration
if err := device.InitDevicesWithConfig(sConfig); err != nil {
klog.Fatalf("failed to initialize devices: %v", err)
}
return &NvidiaDevicePlugin{
rm: resourceManager,
config: config,
deviceListEnvvar: "NVIDIA_VISIBLE_DEVICES",
deviceListStrategies: deviceListStrategies,
socket: kubeletdevicepluginv1beta1.DevicePluginPath + "nvidia-" + name + ".sock",
cdiHandler: cdiHandler,
cdiEnabled: cdiEnabled,
cdiAnnotationPrefix: *config.Flags.Plugin.CDIAnnotationPrefix,
schedulerConfig: sConfig.NvidiaConfig,
operatingMode: mode,
migCurrent: nvidia.MigPartedSpec{},
// These will be reinitialized every
// time the plugin server is restarted.
server: nil,
health: nil,
stop: nil,
}
}原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。