Location>code7788 >text

Custom Resource Support: K8s Device Plugin from Principle to Implementation

Popularity:540 ℃/2024-12-13 22:35:40

In this article, we analyze how the device-plugin mechanism works in k8s, and implement a simple device-plugin to deepen our understanding.


1. Background

By default, a Pod in k8s can only request two resources, CPU and Memory, as shown below:

resources:
  requests:
    memory: "1024Mi"
    cpu: "100m"
  limits:
    memory: "2048Mi"
    cpu: "200m"

As AI heats up and more business Pods need to claim GPU resources.GPU Environment Setup Guide: How to Use GPUs in Bare Metal, Docker, K8s, and More We analyzed how to use the GPU in a k8s environment by relying on theDevice Plugin mechanism, which allows k8s to sense GPU resources on a node and use them as if they were native CPU and Memory resources.

In fact, in the early days, K8s also provided a feature called/nvidia-gpu The k8s community was under a lot of pressure because they had to modify the k8s kernel code every time they added a resource to support NVIDIA GPUs. So in version 1.8, we introduced thedevice plugin mechanism to access other resources through the plug-in form, device manufacturers only need to develop the corresponding xxx-device-plugin to access the resources to k8s.

ps: similarly there is the introduction ofCSI Make the storage plugin separate from the Kubernetes internal (in-tree) codebase to a separate, pluggable external component (out-of-tree), and theCRICNI Wait, the Device Plugin can be considered as one of them.

Device Plugin has two meanings, which are distinguished in the following according to their semantics:

  • First it can represent the Device Plugin framework in k8s.
  • The second can also represent a manufacturer-specific implementation, such as NVIDIA/k8s-device-plugin, which is a Device Plugin implementation for accessing NVIDIA GPU resources

2. Principles

The working principle of Device Plugin is not really complicated, it can be divided intoPlugin Registration respond in singingkubelet call pluginTwo parts.

  • Plugin registration: DevicePlugin will register with the Kubelet on the node when it starts up, so that the Kubelet will be aware of the plugin's existence.
  • kubelet calls the plugin: after registration, when a Pod applies for a resource, the kubelet calls the plugin API to realize the specific function.

As shown in the diagram on the k8s official website:

Kubelet section

To provide this functionality, Kubelet has added a newRegistration gRPC service:

service Registration {
	rpc Register(RegisterRequest) returns (Empty) {}
}

The device plugin can call this interface to register with Kubelet, which requires three parameters:

  • The name of the unix socket corresponding to the device plugin.: The kubelet then finds the corresponding unix socket by name and makes a call to the plugin.

  • device plugin tuning API version: Used to distinguish between different versions of the plugin

  • ResourceName provided by the device plugin: When it encounters a resource request that cannot be processed (other than CPU and Memory), Kubelet will match the corresponding plugin based on the name of the requested resource.

    • The ResourceName needs to be in accordance with thevendor-domain/resourcetype format, for example/gpu

device plugin section

For device management, the device plugin needs to implement the following interfaces:

  • GetDevicePluginOptions: This interface is used to get information about the device plug-in, and you can specify some device plug-in configuration options in the response it returns, which can be seen as the plug-in's metadata

  • ListAndWatch: This interface is used to list available devices and to continuously monitor these devices for status changes.

  • GetPreferredAllocation: Provide allocation preference information to the device plugin so that the device plugin can make better choices during allocation

  • Allocate: This interface is used to request the allocation of a specified number of device resources from a device plug-in.
  • PreStartContainer: This interface is called before the container starts and is used to configure the device resources used by the container.

only ifListAndWatch respond in singingAllocate Two interfaces are required, everything else is optional.

workflow

Generally all Device Plugin implementations end up running as Pods in a k8s cluster and are deployed as DaemonSet because they need to manage all the nodes.

The first step after the device plugin is started is to register with Kubelet to let it know that a new device has been accessed.

To be able to call Kubelet's Register interface, the Device Plugin Pod mounts a file (unix socket) on the host machine into the container and initiates a call from the file to register it.

Once the cluster has been deployed, the Kubelet starts.

  • 1) Kubelet starts the Registration gRPC service (), which provides the Register interface.

  • 2) After the device-plugin is started, it registers with Kubelet by calling the Register interface. The registration information includes the device plugin's unix socket, API Version, and ResourceName.

  • 3) After successful registration, Kubelet calls ListAndWatch to the device plugin via the device-plugin's unix socket to get the resources on the current node.

  • 4) Kubelet updates the api-server with the node status to record the resources discovered in the previous step.

    • this timekubelet get node -oyaml You will be able to see that the Node object's Capacity has additional resources.

    (5) The user creates a Pod and requests the resource. After scheduling is complete, the kubelet on the corresponding node calls the Allocate interface of the device plugin to allocate the resource.

Roughly as follows:

k8s-device-plugin-timeline


[Kubernetes Series]Continuously updated, search the public number [Explore Cloud Native]Subscribe to read more articles.


3. Realization

Source Code:/lixd/i-device-plugin

The device plugin implementation is roughly divided into three parts:

  • 1) Initiate registration with Kubelet at startup
    • Note that kubelet restarts are monitored, typically using thefsnotify Similarly, the library monitors for re-creation events. If it is recreated, it is assumed that the kubelet was restarted, and you need to re-register the
  • 2) gRPC Server: mainly implements theListAndWatch respond in singingAllocateTwo methods

Implementing gRPC Server

For simplicity, only theListAndWatch respond in singingAllocate These are two must-have methods.

For those of you who are not familiar with gRPC, check this out -->gRPC Tutorial Series

ListAndWatch

This is a gRPC Stream method that establishes a long connection to continuously send information about the device to the Kubelet.

// ListAndWatch returns a stream of List of Devices
// Whenever a Device state change or a Device disappears, ListAndWatch
// returns the new list
func (c *GopherDevicePlugin) ListAndWatch(_ *, srv pluginapi.DevicePlugin_ListAndWatchServer) error {
	devs := ()
	("find devices:%s", String(devs))

	err := (&{Devices: devs})
	if err != nil {
		return (err, "send device failed")
	}

	("waiting for device update")
	for range  {
		devs = ()
		("device update,new device list:%s", String(devs))
		_ = (&{Devices: devs})
	}
	return nil
}

Part of the code for the discovery device is as follows:

func (d *DeviceMonitor) List() error {
	err := (, func(path string, info , err error) error {
		if () {
			("%s is dir,skip", path)
			return nil
		}

		[()] = &{
			ID:     (),
			Health: ,
		}
		return nil
	})

	return (err, "walk [%s] failed", )
}

It's simple to traverse to see/etc/gophers All files in the directory, each file is treated as a device.

Then a Goroutine is started to monitor the changes in the device, i.e./etc/gophers Notifications are sent to Kubelet via chan when files in the directory change, sending the latest device information to Kubelet.

func (d *DeviceMonitor) Watch() error {
	("watching devices")

	w, err := ()
	if err != nil {
		return (err, "new watcher failed")
	}
	defer ()

	errChan := make(chan error)
	go func() {
		defer func() {
			if r := recover(); r != nil {
				errChan <- ("device watcher panic:%v", r)
			}
		}()
		for {
			select {
			case event, ok := <-:
				if !ok {
					continue
				}
				("fsnotify device event: %s %s", , ())

				if  ==  {
					dev := ()
					[dev] = &{
						ID:     dev,
						Health: ,
					}
					 <- struct{}{}
					("find new device [%s]", dev)
				} else if & ==  {
					dev := ()
					delete(, dev)
					 <- struct{}{}
					("device [%s] removed", dev)
				}

			case err, ok := <-:
				if !ok {
					continue
				}
				("fsnotify watch device failed:%v", err)
			}
		}
	}()

	err = ()
	if err != nil {
		return ("watch device error:%v", err)
	}

	return <-errChan
}

Allocate

Allocate is to tell the kubelet how to allocate the device to the container, which is relatively simple to accomplish by adding an environment variable to the corresponding container, Gopher=$deviceId.

// Allocate is called during container creation so that the Device
// Plugin can run device specific operations and instruct Kubelet
// of the steps to make the Device available in the container
func (c *GopherDevicePlugin) Allocate(_ , reqs *) (*, error) {
	ret := &{}
	for _, req := range  {
		("[Allocate] received request: %v", (, ","))
		resp := {
			Envs: map[string]string{
				"Gopher": (, ","),
			},
		}
		 = append(, &resp)
	}
	return ret, nil
}

Take a quick look at how NVIDIA's device plugin implements Allocate.

// Allocate which return list of devices.
func (plugin *NvidiaDevicePlugin) Allocate(ctx , reqs *) (*, error) {
	responses := {}
	for _, req := range  {
		if err := (); err != nil {
			return nil, ("invalid allocation request for %q: %w", (), err)
		}
		response, err := ()
		if err != nil {
			return nil, ("failed to get allocate response: %v", err)
		}
		 = append(, response)
	}

	return &responses, nil
}

The core is really this method:

// updateResponseForDeviceListEnvvar sets the environment variable for the requested devices.
func (plugin *NvidiaDevicePlugin) updateResponseForDeviceListEnvvar(response *, deviceIDs ...string) {
	[] = (deviceIDs, ",")
}

Adds an environment variable to the container, value is the device id, the specific deviceID provides two measurements, it may be the number or uuid

const (
	DeviceIDStrategyUUID  = "uuid"
	DeviceIDStrategyIndex = "index"
)

key is a variable that is initialized as follows.

	plugin := NvidiaDevicePlugin{
		deviceListEnvvar:     "NVIDIA_VISIBLE_DEVICES",
		socket:               pluginPath + ".sock",
	  // ...
	}

In other words, the NVIDIA device plugin implementation of Allocate essentially adds environment variables to the container, for example:

NVIDIA_VISIBLE_DEVICES="0,1"

in an articleGPU Environment Build Guide: Accelerating Kubernetes GPU Environment Builds with GPU Operator The NVIDIA Container Toolit Installer is used by GPU Operator to install the NVIDIA Container Toolit as mentioned in the NVIDIA Container Toolit Installer.

The NVIDIA Container Toolit is designed to add support for GPUs, including recognizing the NVIDIA_VISIBLE_DEVICES environment variable, and then mounting the corresponding device into the container.

In addition to this it will mount the device into the container:

func (plugin *NvidiaDevicePlugin) apiDeviceSpecs(devRoot string, ids []string) []* {
	optional := map[string]bool{
		"/dev/nvidiactl":        true,
		"/dev/nvidia-uvm":       true,
		"/dev/nvidia-uvm-tools": true,
		"/dev/nvidia-modeset":   true,
	}

	paths := (ids)

	var specs []*
	for _, p := range paths {
		if optional[p] {
			if _, err := (p); err != nil {
				continue
			}
		}
		spec := &{
			ContainerPath: p,
			HostPath:      (devRoot, p),
			Permissions:   "rw",
		}
		specs = append(specs, spec)
	}

	return specs
}

Core for:

		spec := &{
			ContainerPath: p,
			HostPath:      (devRoot, p),
			Permissions:   "rw",
		}

Here you specify the path of the device on the host and the path of the device after it is mounted to the container, so you can mount the device based on this information.

Other methods

Several other methods are non-mandatory, so just make an empty implementation.

// GetDevicePluginOptions returns options to be communicated with Device
// Manager
func (c *GopherDevicePlugin) GetDevicePluginOptions(_ , _ *) (*, error) {
	return &{PreStartRequired: true}, nil
}

// GetPreferredAllocation returns a preferred set of devices to allocate
// from a list of available ones. The resulting preferred allocation is not
// guaranteed to be the allocation ultimately performed by the
// devicemanager. It is only designed to help the devicemanager make a more
// informed allocation decision when possible.
func (c *GopherDevicePlugin) GetPreferredAllocation(_ , _ *) (*, error) {
	return &{}, nil
}

// PreStartContainer is called, if indicated by Device Plugin during registeration phase,
// before each container start. Device plugin can run device specific operations
// such as reseting the device before making devices available to the container
func (c *GopherDevicePlugin) PreStartContainer(_ , _ *) (*, error) {
	return &{}, nil
}

Registering with Kubelet

Registering is as simple as calling the RegisterRequest method provided by the deviceplugin.

// Register registers the device plugin for the given resourceName with Kubelet.
func (c *GopherDevicePlugin) Register() error {
	conn, err := connect(, )
	if err != nil {
		return (err, "connect to %s failed", )
	}
	defer ()

	client := (conn)
	reqt := &{
		Version:      ,
		Endpoint:     (),
		ResourceName: ,
	}

	_, err = ((), reqt)
	if err != nil {
		return (err, "register to kubelet failed")
	}
	return nil
}

Monitoring Status

The fsnotify library is used to monitor the status of the files and determine if the kubelet is restarting by the changes in the files. When the kubelet is restarted, the device plugin needs to be restarted as well, and then registered with the new .

// WatchKubelet restart device plugin when kubelet restarted
func WatchKubelet(stop chan<- struct{}) error {
	watcher, err := ()
	if err != nil {
		return (err, "Unable to create fsnotify watcher")
	}
	defer ()

	go func() {
		// Start listening for events.
		for {
			select {
			case event, ok := <-:
				if !ok {
					continue
				}
				("fsnotify events: %s %v", , ())
				if  ==  &&  ==  {
					("inotify:  created, restarting.")
					stop <- struct{}{}
				}
			case err, ok := <-:
				if !ok {
					continue
				}
				("fsnotify failed restarting,detail:%v", err)
			}
		}
	}()

	// watch 
	err = ()
	if err != nil {
		return (err, "Unable to add path %s to watcher", )
	}
	return nil
}

Why you need to re-register

Since Kubelet uses a map to store registered plugins, they will be lost every time Kubelet is restarted, so we have to monitor the Kubelet restart state and re-register them when we implement the device plugin.

Kubelet Register Methods The realization is as follows:

// /pkg/kubelet/cm/devicemanager/plugin/v1beta1/#L143-L165
func (s *server) Register(ctx , r *) (*, error) {
	("Got registration request from device plugin with resource", "resourceName", )
	().Inc()

	if !() {
		err := (errUnsupportedVersion, , )
		("Bad registration request from device plugin with resource", "resourceName", , "err", err)
		return &{}, err
	}

	if !(()) {
		err := (errInvalidResourceName, )
		("Bad registration request from device plugin", "err", err)
		return &{}, err
	}

	if err := (, (, )); err != nil {
		("Error connecting to device plugin client", "err", err)
		return &{}, err
	}

	return &{}, nil
}

The core is in the connectClient method:

func (s *server) connectClient(name string, socketPath string) error {
	c := NewPluginClient(name, socketPath, )

	(name, c)
	if err := (); err != nil {
		(name)
		(err, "Failed to connect to new client", "resource", name)
		return err
	}

	go func() {
		(name, c)
	}()

	return nil
}

How do you save the client?

func (s *server) registerClient(name string, c Client) {
	()
	defer ()

	[name] = c
	(2).InfoS("Registered client", "name", name)
}

Definitions are as follows:

type server struct {
	socketName string
	socketDir string
	mutex
	wg
	grpc *
	rhandler RegistrationHandler
	chandler ClientHandler
	clients map[string]Client // utilization map stockpile,and for persistence
}

The main method is divided into three parts:

  • 1) Start the gRPC service
  • 2) Register with Kubelet
  • 3) Monitoring Status
func main() {
	("device plugin starting")
	dp := device_plugin.NewGopherDevicePlugin()
	go ()

	// register when device plugin start
	if err := (); err != nil {
		("register to kubelet failed: %v", err)
	}

	// watch ,when kubelet restart,exit device plugin,then will restart by DaemonSet
	stop := make(chan struct{})
	err := (stop)
	if err != nil {
		("start to kubelet failed: %v", err)
	}

	<-stop
	("kubelet restart,exiting")
}

4. Testing

deployments

The first step is to deploy i-device-plugin, which is usually done using DaemonSet, the full yaml is as follows:

apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: i-device-plugin
  namespace: kube-system
  labels:
    app: i-device-plugin
spec:
  selector:
    matchLabels:
      app: i-device-plugin
  template:
    metadata:
      labels:
        app: i-device-plugin
    spec:
      containers:
        - name: i-device-plugin
          image: /lixd96/i-device-plugin:latest
          imagePullPolicy: IfNotPresent
          resources:
            limits:
              cpu: "1"
              memory: "512Mi"
            requests:
              cpu: "0.1"
              memory: "128Mi"
          volumeMounts:
            - name: device-plugin
              mountPath: /var/lib/kubelet/device-plugins
            - name: gophers
              mountPath: /etc/gophers
      volumes:
        - name: device-plugin
          hostPath:
            path: /var/lib/kubelet/device-plugins
        - name: gophers
          hostPath:
            path: /etc/gophers

Mount the two used directories in the Pod as hostPath:

  • /var/lib/kubelet/device-plugins: request initiates the call and writes the sock file for the device-plugin gRPC service to this directory for kubelet to call.
  • /etc/gophers: In this demo, the files in the /etc/gophers directory are treated as devices, so they need to be mounted in the Pod.

Make sure i-device-plugin is started.

[root@test ~]# kubectl -n kube-system get po
i-device-plugin-vnw6z            1/1     Running   0          17s

initialization

In this demo, the files in the /etc/gophers directory are treated as devices, so we just need to go to the /etc/gophers directory and create the files to simulate having a new device accessed.

mkdir /etc/gophers

touch /etc/gophers/g1

Viewing the device plugin log

[root@test ~]# kubectl -n kube-system logs -f i-device-plugin-vnw6z
I0719 13:52:24.674737       1 :10] device plugin starting
I0719 13:52:24.675440       1 device_monitor.go:33] /etc/gophers is dir,skip
I0719 13:52:24.675679       1 device_monitor.go:49] watching devices
I0719 13:52:24.682141       1 :22] find devices []
I0719 13:52:24.682315       1 :29] waiting for device update
I0719 13:53:09.369381       1 device_monitor.go:70] fsnotify device event: /etc/gophers/g1 CREATE
I0719 13:53:09.370394       1 device_monitor.go:79] find new device [g1]
I0719 13:53:09.370445       1 device_monitor.go:70] fsnotify device event: /etc/gophers/g1 CHMOD
I0719 13:53:09.370659       1 :32] device update,new device list [g1]

As you can see, the added devices have been sensed.

Not surprisingly, you can see the new resources on node now!

[root@test gophers]# k get node n1 -oyaml|grep  capacity -A 7
  capacity:
    cpu: "4"
    ephemeral-storage: 20960236Ki
    hugepages-1Gi: "0"
    hugepages-2Mi: "0"
    /gopher: "1"
    memory: 8154984Ki
    pods: "110"

Sure enough, node capacity has a new/gopher: "1"

Creating a Test Pod

Next, create a Pod to claim the resource and try it out

apiVersion: v1
kind: Pod
metadata:
  name: gopher-pod
spec:
  containers:
  - name: gopher-container
    image: busybox
    command: ["sh", "-c", "echo Hello, Kubernetes! && sleep 3600"]
    resources:
      requests:
        /gopher: "1"
      limits:
        /gopher: "1"

Pod started successfully.

[root@test ~]# kubectl get po
NAME         READY   STATUS    RESTARTS   AGE
gopher-pod   1/1     Running   0          27s

I added the Gopher=xxx environment variable to allocate the device before, so let's see if it allocates correctly.

[root@test ~]# kubectl exec -it gopher-pod -- env|grep Gopher
Gopher=g1

ok, the environment variable exists and you can see that the device assigned to the Pod is g1.

Additional equipment

Use the same yaml to create a Pod with a different name.

[root@test ~]# k get po
NAME          READY   STATUS    RESTARTS   AGE
gopher-pod    1/1     Running   0          3m9s
gopher-pod2   0/1     Pending   0          2s

Since there is only one gopher resource, the second Pod is pending.

Events:
  Type     Reason            Age   From               Message
  ----     ------            ----  ----               -------
  Warning  FailedScheduling  7s    default-scheduler  0/1 nodes are available: 1 Insufficient /gopher. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod..

When creating a device

touch /etc/gophers/g2

The device plugin immediately senses the device change, and the related logs are shown below:

I0719 14:01:00.308599       1 device_monitor.go:70] fsnotify device event: /etc/gophers/g2 CREATE
I0719 14:01:00.308986       1 device_monitor.go:79] find new device [g2]
I0719 14:01:00.309017       1 device_monitor.go:70] fsnotify device event: /etc/gophers/g2 CHMOD
I0719 14:01:00.309141       1 :32] device update,new device list [g2,g1]

The number of resources on node is also updated to 2

[root@argo-1 ~]# k get node argo-1 -oyaml|grep  capacity -A 7
  capacity:
    cpu: "4"
    ephemeral-storage: 20960236Ki
    hugepages-1Gi: "0"
    hugepages-2Mi: "0"
    /gopher: "2"
    memory: 8154984Ki
    pods: "110"

And then pod2 will start up normally.

[root@test ~]# kubectl get po
NAME          READY   STATUS    RESTARTS   AGE
gopher-pod    1/1     Running   0          4m31s
gopher-pod2   1/1     Running   0          84s

Delete Device

Then delete the g2 device

rm -rf /etc/gophers/g2

The device plugin is also recognized, and the related logs

I0719 14:03:55.904983       1 device_monitor.go:70] fsnotify device event: /etc/gophers/g2 REMOVE
I0719 14:03:55.905203       1 device_monitor.go:84] device [g2] removed
I0719 14:03:55.905267       1 :32] device update,new device list [g1]

To see if the number of resources on the Node has been updated

[root@test ~]# k get node argo-1 -oyaml|grep  capacity -A 7
  capacity:
    cpu: "4"
    ephemeral-storage: 20960236Ki
    hugepages-1Gi: "0"
    hugepages-2Mi: "0"
    /gopher: "1"
    memory: 8154984Ki
    pods: "110"

The corresponding resource has also become 1, and everything is fine.

5. Summary

This article analyzes the workings of the Device Plugin mechanism in k8s and implements a simplei-device-pluginto further deepen understanding.

The working principle of Device Plugin is not really complicated, it can be divided intoPlugin Registration respond in singingkubelet call pluginTwo parts:

  • Plugin registration: DevicePlugin will register with the Kubelet on the node when it starts up, so that the Kubelet will be aware of the plugin's existence.
  • kubelet calls the plugin: after registration, when a Pod applies for a resource, the kubelet calls the plugin API to realize the specific function.


[Kubernetes Series]Continuously updated, search the public number [Explore Cloud Native]Subscribe to read more articles.


6. Reference

k8s documentation: device-plugins

/NVIDIA/k8s-device-plugin

Kubernetes development knowledge-device-plugin implementation

Kubelet Register source code