您好,登錄后才能下訂單哦!
這篇文章主要介紹“Device Manager在什么時候創建”,在日常操作中,相信很多人在Device Manager在什么時候創建問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Device Manager在什么時候創建”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
Device Manager和Volume Manager、QoS Container Manager等一樣,都屬于kubelet管理的眾多Manager之一。Device Manager在kubelet啟動時的NewContainerManager中創建。
pkg/kubelet/cm/container_manager_linux.go:197 func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, devicePluginEnabled bool, recorder record.EventRecorder) (ContainerManager, error) { ... glog.Infof("Creating device plugin manager: %t", devicePluginEnabled) if devicePluginEnabled { cm.deviceManager, err = devicemanager.NewManagerImpl() } else { cm.deviceManager, err = devicemanager.NewManagerStub() } if err != nil { return nil, err } ... }
我們有必要先了解Device Manager的結構體:
// ManagerImpl is the structure in charge of managing Device Plugins. type ManagerImpl struct { socketname string socketdir string endpoints map[string]endpoint // Key is ResourceName mutex sync.Mutex server *grpc.Server // activePods is a method for listing active pods on the node // so the amount of pluginResources requested by existing pods // could be counted when updating allocated devices activePods ActivePodsFunc // sourcesReady provides the readiness of kubelet configuration sources such as apiserver update readiness. // We use it to determine when we can purge inactive pods from checkpointed state. sourcesReady config.SourcesReady // callback is used for updating devices' states in one time call. // e.g. a new device is advertised, two old devices are deleted and a running device fails. callback monitorCallback // healthyDevices contains all of the registered healthy resourceNames and their exported device IDs. healthyDevices map[string]sets.String // unhealthyDevices contains all of the unhealthy devices and their exported device IDs. unhealthyDevices map[string]sets.String // allocatedDevices contains allocated deviceIds, keyed by resourceName. allocatedDevices map[string]sets.String // podDevices contains pod to allocated device mapping. podDevices podDevices store utilstore.Store pluginOpts map[string]*pluginapi.DevicePluginOptions }
下面是核心field的說明:
socketname: 就是kubelet對外暴露的socket名,即 kubelet.sock
。
socketdir: device plugins' socket的存放的目錄,/var/lib/kubelet/device-plugins/
。
endpoints: map對象,key為Resource Name,value為endpoint接口(包括run,stop,allocate,preStartContainer,getDevices,callback,isStoped,StopGracePeriodExpired),每個endpoint接口對應一個已注冊的device plugin,負責與device plugin的gRPC通信及緩存device plugin反饋的device states。
server: Register服務暴露的gRPC Server。
activePods: 用來獲取該節點上所有active pods,即non-Terminated狀態的Pods。在kubelet的initializeRuntimeDependentModules時會注冊activePods Func為如下函數:
// GetActivePods returns non-terminal pods func (kl *Kubelet) GetActivePods() []*v1.Pod { allPods := kl.podManager.GetPods() activePods := kl.filterOutTerminatedPods(allPods) return activePods }
callback: 是kubelet收到device plugin的ListAndWatch gRCP stream中有devices state變更時的回調函數,包括有新設備增加、舊設備刪除、設備狀態變化,所以通過ListAndWatch接口的回調方式,可以實現設備的自動發現和熱插拔。
type monitorCallback func(resourceName string, added, updated, deleted []pluginapi.Device)
healthyDevices: map對象,key為Resource Name,value為對應的健康的device IDs。
unhealthyDevices: map對象,key為Resource Name,value為對應的不健康的device IDs。
allocatedDevices: map對象,key為Resource Name,value為已經分配出去的device IDs。
podDevices: 記錄每個pod中每個容器的device分配情況。
// ContainerAllocateResponse為容器內某個device對應的分配信息,包括注入的環境變量、掛載信息、Annotations。 type ContainerAllocateResponse struct { Envs map[string]string Mounts []*Mount Devices []*DeviceSpec Annotations map[string]string } // deviceAllocateInfo type deviceAllocateInfo struct { deviceIds sets.String allocResp *pluginapi.ContainerAllocateResponse } type resourceAllocateInfo map[string]deviceAllocateInfo // Keyed by resourceName. type containerDevices map[string]resourceAllocateInfo // Keyed by containerName. type podDevices map[string]containerDevices // Keyed by podUID.
store: 是對checkpointData的文件存儲(/var/lib/kubelet/device-plugins/kubelet_internal_checkpoint
),具體存儲了每個Pod分配的Devices信息PodDeviceEntries, 以及已經注冊的Resource Name及對應的Devices IDs。
type checkpointData struct { PodDeviceEntries []podDevicesCheckpointEntry RegisteredDevices map[string][]string // key為Resource Name,value為DeviceIDs } type podDevicesCheckpointEntry struct { PodUID string ContainerName string ResourceName string DeviceIDs []string AllocResp []byte }
pluginOpts: map對象,key為Resource Name,value為DevicePluginOptions,目前只有一項內容,就是PreStartRequired bool
,表示是否在容器啟動前要調用device plugin的PreStartContiner
接口。在nvidia-k8s-plugin中,PreStartContainer
為空實現。
我們再來看看Device Manager的具體創建實現NewManagerImpl
。
pkg/kubelet/cm/devicemanager/manager.go:97 // NewManagerImpl creates a new manager. func NewManagerImpl() (*ManagerImpl, error) { // 通過/var/lib/kubelet/device-plugins/kubelet.sock與device plugin交互 return newManagerImpl(pluginapi.KubeletSocket) } func newManagerImpl(socketPath string) (*ManagerImpl, error) { glog.V(2).Infof("Creating Device Plugin manager at %s", socketPath) if socketPath == "" || !filepath.IsAbs(socketPath) { return nil, fmt.Errorf(errBadSocket+" %v", socketPath) } dir, file := filepath.Split(socketPath) manager := &ManagerImpl{ endpoints: make(map[string]endpoint), socketname: file, socketdir: dir, healthyDevices: make(map[string]sets.String), unhealthyDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String), pluginOpts: make(map[string]*pluginapi.DevicePluginOptions), podDevices: make(podDevices), } manager.callback = manager.genericDeviceUpdateCallback // The following structs are populated with real implementations in manager.Start() // Before that, initializes them to perform no-op operations. manager.activePods = func() []*v1.Pod { return []*v1.Pod{} } manager.sourcesReady = &sourcesReadyStub{} var err error // 在/var/lib/kubelet/device-plugins/目錄下創建file store類型的key-value存儲文件kubelet_internal_checkpoint,用來作為kubelet的device plugin的checkpoint。 manager.store, err = utilstore.NewFileStore(dir, utilfs.DefaultFs{}) if err != nil { return nil, fmt.Errorf("failed to initialize device plugin checkpointing store: %+v", err) } return manager, nil }
kubelet Device Manager通過/var/lib/kubelet/device-plugins/kubelet.sock
與device plugin交互。
注冊callback為genericDeviceUpdateCallback
,用來處理對應devices的add,delete,update事件。
在/var/lib/kubelet/device-plugins/
目錄下創建file store類型的key-value存儲文件kubelet_internal_checkpoint
,用來作為kubelet的device plugin的checkpoint。
當監聽到devices add/delete/update事件發生時,會更新到kubelet_internal_checkpoint
文件中。
當device plugin的stop time超過grace period time(代碼寫死為5min,不可配置),會從checkpoint中刪除對應的devices。在這個時間范圍內,Device Manager會繼續緩存該endpoint及對應的devices。
為Container Allocate Devices后,也會將PodDevices更新到checkpoint中。
我們來看看callback的實現genericDeviceUpdateCallback
的實現,了解Device Manager是如何處理devices的add/delete/update消息的。
pkg/kubelet/cm/devicemanager/manager.go:134 func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, added, updated, deleted []pluginapi.Device) { kept := append(updated, added...) m.mutex.Lock() if _, ok := m.healthyDevices[resourceName]; !ok { m.healthyDevices[resourceName] = sets.NewString() } if _, ok := m.unhealthyDevices[resourceName]; !ok { m.unhealthyDevices[resourceName] = sets.NewString() } for _, dev := range kept { if dev.Health == pluginapi.Healthy { m.healthyDevices[resourceName].Insert(dev.ID) m.unhealthyDevices[resourceName].Delete(dev.ID) } else { m.unhealthyDevices[resourceName].Insert(dev.ID) m.healthyDevices[resourceName].Delete(dev.ID) } } for _, dev := range deleted { m.healthyDevices[resourceName].Delete(dev.ID) m.unhealthyDevices[resourceName].Delete(dev.ID) } m.mutex.Unlock() m.writeCheckpoint() }
將callback中收到的devices狀態是Healthy
,那么將device ID插入到ManagerImpl中healthDevices中,并從unhealthyDevices中刪除。
將callback中收到的devices狀態是Unhealthy
,那么將device ID插入到ManagerImpl中unhealthDevices中,并從healthyDevices中刪除。
將device plugin反饋的需要delete的devices從healthDevices和unhealthDevices中一并刪除。
將ManagerImpl中的數據更新到checkpoint文件中。
前面把Device Manager的創建流程分析了一下,還涉及到checkpoint和callback的分析。接下來,我們繼續對Device Manager的Start流程進行分析。
Device Manager是在containerManagerImpl的Start時啟動的。
pkg/kubelet/cm/container_manager_linux.go:527 func (cm *containerManagerImpl) Start(node *v1.Node, activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, runtimeService internalapi.RuntimeService) error { ... // Starts device manager. if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady); err != nil { return err } return nil }
deviceManager.Start的第一個參數是獲取該節點的active(non-terminated)Pods的函數。
SourcesReady是用來跟蹤kubelet配置的Pod Sources,這些Sources包括:
file: 通過static file創建靜態Pods。
http: 通過http接口來獲取Pods信息。
api: 從Kubernetes API Server獲取Pods信息,是Kubernetes默認的內部機制。
*: 表示包含以上全部的Sources類型。
ManagerIml.Start負責啟動Device Manager,對外提供gRPC服務。
pkg/kubelet/cm/devicemanager/manager.go:204 // Start starts the Device Plugin Manager amd start initialization of // podDevices and allocatedDevices information from checkpoint-ed state and // starts device plugin registration service. func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error { m.activePods = activePods m.sourcesReady = sourcesReady // Loads in allocatedDevices information from disk. err := m.readCheckpoint() ... socketPath := filepath.Join(m.socketdir, m.socketname) os.MkdirAll(m.socketdir, 0755) // Removes all stale sockets in m.socketdir. Device plugins can monitor // this and use it as a signal to re-register with the new Kubelet. if err := m.removeContents(m.socketdir); err != nil { glog.Errorf("Fail to clean up stale contents under %s: %+v", m.socketdir, err) } s, err := net.Listen("unix", socketPath) if err != nil { glog.Errorf(errListenSocket+" %+v", err) return err } m.server = grpc.NewServer([]grpc.ServerOption{}...) pluginapi.RegisterRegistrationServer(m.server, m) go m.server.Serve(s) glog.V(2).Infof("Serving device plugin registration server on %q", socketPath) return nil }
首先讀取checkpoint file中數據,恢復ManagerImpl的相關數據,包括:
podDevices;
allocatedDevices;
healthyDevices;
unhealthyDevices;
endpoints,注意這里會將endpoint的stop time設置為當前時間,意味著kubelet restart后,需要等待device plugin進行re-register后,才認為這些resource是可用的。
然后將/var/lib/kubelet/device-plugins/
下面的所有文件清空,當然checkpiont文件除外,也就是清空所有的socket文件,包括自己的kubelet.sock,以及其他所有之前的device plugin的socket文件。device plugin會監控kubelet.sock文件是否被刪除,如果刪除,則會觸發自己的向kubelet重新注冊自己。
創建kubelet.sock并啟動gRPC Server對外提供gRPC服務,目前只注冊了Register服務,用于Device plugin調用進行插件注冊。
我們就來看看kubelet Device Manager對外提供的唯一gRPC接口Register。
pkg/kubelet/cm/devicemanager/manager.go:289 // Register registers a device plugin. func (m *ManagerImpl) Register(ctx context.Context, r *pluginapi.RegisterRequest) (*pluginapi.Empty, error) { glog.Infof("Got registration request from device plugin with resource name %q", r.ResourceName) metrics.DevicePluginRegistrationCount.WithLabelValues(r.ResourceName).Inc() var versionCompatible bool for _, v := range pluginapi.SupportedVersions { if r.Version == v { versionCompatible = true break } } if !versionCompatible { errorString := fmt.Sprintf(errUnsupportedVersion, r.Version, pluginapi.SupportedVersions) glog.Infof("Bad registration request from device plugin with resource name %q: %v", r.ResourceName, errorString) return &pluginapi.Empty{}, fmt.Errorf(errorString) } if !v1helper.IsExtendedResourceName(v1.ResourceName(r.ResourceName)) { errorString := fmt.Sprintf(errInvalidResourceName, r.ResourceName) glog.Infof("Bad registration request from device plugin: %v", errorString) return &pluginapi.Empty{}, fmt.Errorf(errorString) } // TODO: for now, always accepts newest device plugin. Later may consider to // add some policies here, e.g., verify whether an old device plugin with the // same resource name is still alive to determine whether we want to accept // the new registration. go m.addEndpoint(r) return &pluginapi.Empty{}, nil }
注冊請求是device plugin向kubelet發送的,注冊請求RegisterRequest為:
type RegisterRequest struct { Version string // Kubernetes 1.10對應的device plugin api version為v1beta1 Endpoint string // device plugin對應的socket name ResourceName string Options *DevicePluginOptions }
這里會檢查注冊的Resource Name是否符合Extended Resource的規則:
Resource Name不能屬于kubernetes.io,得有自己的domain,比如nvidia.com。
Resource Name中不能包含requests.
前綴。
對應的Resource value只能是整數值。
調用addEndpoint進行插件注冊。
從上面Register的方法中可見,真正插件注冊的邏輯是在addEndpoint中實現的。
pkg/kubelet/cm/devicemanager/manager.go:332 func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { existingDevs := make(map[string]pluginapi.Device) m.mutex.Lock() old, ok := m.endpoints[r.ResourceName] if ok && old != nil { // Pass devices of previous endpoint into re-registered one, // to avoid potential orphaned devices upon re-registration devices := make(map[string]pluginapi.Device) for _, device := range old.getDevices() { devices[device.ID] = device } existingDevs = devices } m.mutex.Unlock() socketPath := filepath.Join(m.socketdir, r.Endpoint) e, err := newEndpointImpl(socketPath, r.ResourceName, existingDevs, m.callback) if err != nil { glog.Errorf("Failed to dial device plugin with request %v: %v", r, err) return } m.mutex.Lock() if r.Options != nil { m.pluginOpts[r.ResourceName] = r.Options } // Check for potential re-registration during the initialization of new endpoint, // and skip updating if re-registration happens. // TODO: simplify the part once we have a better way to handle registered devices ext := m.endpoints[r.ResourceName] if ext != old { glog.Warningf("Some other endpoint %v is added while endpoint %v is initialized", ext, e) m.mutex.Unlock() e.stop() return } // Associates the newly created endpoint with the corresponding resource name. // Stops existing endpoint if there is any. m.endpoints[r.ResourceName] = e glog.V(2).Infof("Registered endpoint %v", e) m.mutex.Unlock() if old != nil { old.stop() } go func() { e.run() e.stop() m.mutex.Lock() if old, ok := m.endpoints[r.ResourceName]; ok && old == e { m.markResourceUnhealthy(r.ResourceName) } glog.V(2).Infof("Unregistered endpoint %v", e) m.mutex.Unlock() }() }
首先檢查注冊的這個device plugin是否已經注冊過,如果注冊過,則獲取已經緩存的devices。
再檢查device plugin的socket是否能dial成功,如果dial失敗,則說明device plugin沒正常啟動。如果dial成功,就根據已經緩存的devices重新初始化Endpoint,EndpointImpl的定義如下:
type endpointImpl struct { client pluginapi.DevicePluginClient clientConn *grpc.ClientConn socketPath string resourceName string stopTime time.Time devices map[string]pluginapi.Device mutex sync.Mutex cb monitorCallback }
為了防止在EndpointImpl重新初始化的過程中device plugin進行re-register,初始化完成后再次獲取緩存中該device plugin的Endpoint,并與初始化之前的Endpoint對象進行比對:
如果不是同一個對象,則說明在初始化過程中發生了re-register,那么就invoke Endpoint的stop接口,關閉gRPC連接,并設置Endpoint的stopTime為當前時間,Register流程以失敗結束。
否則繼續后面流程。
如果該device plugin之前注冊過,那么再重新調用Endpoint的run()啟動之前,先調用Endpoint的stop關閉gRPC連接,并設置Endpoint的stopTime為當前時間。
然后啟動golang協程執行Endpoint的run(),在run方法中:
調用device plugin的ListAndWatch gRPC接口,通過長連接持續獲取ListAndWatch gRPC stream,
從stream流中獲取的devices與Endpoint中緩存的devices進行比對,得到需要add/delete/update的devices,
然后調用Endpoint的callback(也就是ManagerImpl注冊的callback方法genericDeviceUpdateCallback)進行Device Manager的緩存更新并寫到checkpoint文件中。
直到與device plugin的gRPC連接發生errListAndWatch錯誤,跳出持續獲取stream的死循環,然后調用Endpoint的stop關閉gRPC連接,并設置Endpoint的stopTime為當前時間。
invoke stop后,再標記該device plugin對應的所有devices為unhealthy,即設置healthyDevices為空, 所有原來healthy的devices都加到unhealthyDevices中,此時表示注冊失敗。
kubelet在NewMainKubelet中會注冊一系列的Pod Admit Handler,當有Pod需要創建的時,都會先調用這些Pod Admit Handler進行處理,其中klet.containerManager.UpdatePluginResources
就是kubelet Device Manager為Pod分配devices的。
pkg/kubelet/kubelet.go:893 func NewMainKubelet( ... ) (*Kubelet, error) { ... klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources)) ... } pkg/kubelet/cm/container_manager_linux.go:618 func (cm *containerManagerImpl) UpdatePluginResources(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { return cm.deviceManager.Allocate(node, attrs) }
kubelet在創建Pod前,會invoke Device Manager的Allocate方法,為Pod中的每個Container請求分配對應的devices,kubelet會將請求轉發到對應的Endpoint的Allocate方法, 然后請求會到對應的device plugin進行處理。
pkg/kubelet/cm/devicemanager/manager.go:259 func (m *ManagerImpl) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { pod := attrs.Pod devicesToReuse := make(map[string]sets.String) // TODO: Reuse devices between init containers and regular containers. for _, container := range pod.Spec.InitContainers { if err := m.allocateContainerResources(pod, &container, devicesToReuse); err != nil { return err } m.podDevices.addContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse) } for _, container := range pod.Spec.Containers { if err := m.allocateContainerResources(pod, &container, devicesToReuse); err != nil { return err } m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse) } m.mutex.Lock() defer m.mutex.Unlock() // quick return if no pluginResources requested if _, podRequireDevicePluginResource := m.podDevices[string(pod.UID)]; !podRequireDevicePluginResource { return nil } m.sanitizeNodeAllocatable(node) return nil }
調用allocateContainerResources為Pod中的init container分配devices,并更新ManagerImpl中PodDevices緩存;
調用allocateContainerResources為Pod中的regular container分配devices,并更新ManagerImpl中PodDevices緩存;
調用sanitizeNodeAllocatable更新scheduler cache中Node對應Resource Name的Allocatable Resource;
pkg/kubelet/cm/devicemanager/manager.go:608 func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Container, devicesToReuse map[string]sets.String) error { podUID := string(pod.UID) contName := container.Name allocatedDevicesUpdated := false // Extended resources are not allowed to be overcommitted. // Since device plugin advertises extended resources, // therefore Requests must be equal to Limits and iterating // over the Limits should be sufficient. for k, v := range container.Resources.Limits { resource := string(k) needed := int(v.Value()) glog.V(3).Infof("needs %d %s", needed, resource) if !m.isDevicePluginResource(resource) { continue } // Updates allocatedDevices to garbage collect any stranded resources // before doing the device plugin allocation. if !allocatedDevicesUpdated { m.updateAllocatedDevices(m.activePods()) allocatedDevicesUpdated = true } allocDevices, err := m.devicesToAllocate(podUID, contName, resource, needed, devicesToReuse[resource]) if err != nil { return err } if allocDevices == nil || len(allocDevices) <= 0 { continue } startRPCTime := time.Now() m.mutex.Lock() e, ok := m.endpoints[resource] m.mutex.Unlock() if !ok { m.mutex.Lock() m.allocatedDevices = m.podDevices.devices() m.mutex.Unlock() return fmt.Errorf("Unknown Device Plugin %s", resource) } devs := allocDevices.UnsortedList() glog.V(3).Infof("Making allocation request for devices %v for device plugin %s", devs, resource) resp, err := e.allocate(devs) metrics.DevicePluginAllocationLatency.WithLabelValues(resource).Observe(metrics.SinceInMicroseconds(startRPCTime)) if err != nil { m.mutex.Lock() m.allocatedDevices = m.podDevices.devices() m.mutex.Unlock() return err } // Update internal cached podDevices state. m.mutex.Lock() m.podDevices.insert(podUID, contName, resource, allocDevices, resp.ContainerResponses[0]) m.mutex.Unlock() } // Checkpoints device to container allocation information. return m.writeCheckpoint() }
device plugin提供的Resource屬于Kubernetes Extended Resources,所以其Resource QoS只能是Guaranted。
每次在為Pod分配devices之前,都去檢查一下此時的active pods,并與podDevices緩存中的pods進行比對,將已經terminated的Pods的devices從podDevices中刪除,即進行了devices的GC操作。
從healthyDevices中隨機分配對應數量的devices給該Pod,并注意更新allocatedDevices,否則會導致一個device被分配給多個Pod。
拿到devices后,就調用Endpoint的Allocate方法(進而調用對應device plugin的Allocate gRPC Service),device plugin返回ContainerAllocateResponse(包括注入的環境變量、掛載信息、Annotations)。
更新podDevices緩存信息,并將ManagerImpl中緩存數據更新到checkpoint文件中。
思考:當init container結束后,對應分配的devices會被釋放嗎? 目前還不會釋放devices,在Allocate前只會回收Terminated Pods的devices,并沒有回收init container的devices。要優化這個也是比較簡單的,只要修改上面代碼中updateAllocatedDevices方法內的邏輯就行了,增加init container的devices回收邏輯。
所以當前版本最好不會要在init container中使用devices,雖然這種場景幾乎不存在。
當kubelet更新node status時會調用GetCapacity更新device plugins對應的Resource信息。
pkg/kubelet/kubelet_node_status.go:599 func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) { ... devicePluginCapacity, devicePluginAllocatable, removedDevicePlugins = kl.containerManager.GetDevicePluginResourceCapacity() ... } pkg/kubelet/cm/container_manager_linux.go:881 func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) { return cm.deviceManager.GetCapacity() }
下面是GetCapacity的具體代碼實現,邏輯很簡單:
檢測healthyDevices對應的device plugin是否已經從緩存中刪除或者已經停止超過5min,如果滿足以上條件之一,則從endpoints和healthyDevices緩存中刪除這些devices。
檢測unhealthyDevices對應的device plugin是否已經從緩存中刪除或者已經停止超過5min,如果滿足以上條件之一,則從endpoints和unhealthyDevices緩存中刪除這些devices。
如果緩存發生變化,則更新到checkpoint文件中。
pkg/kubelet/cm/devicemanager/manager.go:414 func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) { needsUpdateCheckpoint := false var capacity = v1.ResourceList{} var allocatable = v1.ResourceList{} deletedResources := sets.NewString() m.mutex.Lock() for resourceName, devices := range m.healthyDevices { e, ok := m.endpoints[resourceName] if (ok && e.stopGracePeriodExpired()) || !ok { if !ok { glog.Errorf("unexpected: healthyDevices and endpoints are out of sync") } delete(m.endpoints, resourceName) delete(m.healthyDevices, resourceName) deletedResources.Insert(resourceName) needsUpdateCheckpoint = true } else { capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI) allocatable[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI) } } for resourceName, devices := range m.unhealthyDevices { e, ok := m.endpoints[resourceName] if (ok && e.stopGracePeriodExpired()) || !ok { if !ok { glog.Errorf("unexpected: unhealthyDevices and endpoints are out of sync") } delete(m.endpoints, resourceName) delete(m.unhealthyDevices, resourceName) deletedResources.Insert(resourceName) needsUpdateCheckpoint = true } else { capacityCount := capacity[v1.ResourceName(resourceName)] unhealthyCount := *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI) capacityCount.Add(unhealthyCount) capacity[v1.ResourceName(resourceName)] = capacityCount } } m.mutex.Unlock() if needsUpdateCheckpoint { m.writeCheckpoint() } return capacity, allocatable, deletedResources.UnsortedList() }
GetCapacity更新NodeStatus如下數據:
registered device plugin resource Capacity
registered device plugin resource Allocatable
previously registered resources that are no longer active
在kubelet的GetResource中,會調用DeviceManager的GetDeviceRunContainerOptions,并將這些options添加到kubecontainer.RunContainerOptions
中。RunContainerOptions包括Envs、Mounts、Devices、PortMappings、Annotations等信息。
pkg/kubelet/cm/container_manager_linux.go:601 // TODO: move the GetResources logic to PodContainerManager. func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) { opts := &kubecontainer.RunContainerOptions{} // Allocate should already be called during predicateAdmitHandler.Admit(), // just try to fetch device runtime information from cached state here devOpts, err := cm.deviceManager.GetDeviceRunContainerOptions(pod, container) if err != nil { return nil, err } else if devOpts == nil { return opts, nil } opts.Devices = append(opts.Devices, devOpts.Devices...) opts.Mounts = append(opts.Mounts, devOpts.Mounts...) opts.Envs = append(opts.Envs, devOpts.Envs...) opts.Annotations = append(opts.Annotations, devOpts.Annotations...) return opts, nil }
Device Manager的GetDeviceRunContainerOptions會根據pluginOpts的PreStartRequired是否為true,決定是否調用device plugin的PreStartContainer gRPC Service。
注意:如果某個device plugin的PreStartRequired為true,那么需要注冊kubelet Device Manager調用device plugin的PreStartContainer接口的超時時間是30s,即30s內必須完成PreStartContainer的邏輯并返回。
pkg/kubelet/cm/devicemanager/manager.go:688 // GetDeviceRunContainerOptions checks whether we have cached containerDevices // for the passed-in <pod, container> and returns its DeviceRunContainerOptions // for the found one. An empty struct is returned in case no cached state is found. func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error) { podUID := string(pod.UID) contName := container.Name for k := range container.Resources.Limits { resource := string(k) if !m.isDevicePluginResource(resource) { continue } err := m.callPreStartContainerIfNeeded(podUID, contName, resource) if err != nil { return nil, err } } m.mutex.Lock() defer m.mutex.Unlock() return m.podDevices.deviceRunContainerOptions(string(pod.UID), container.Name), nil }
然后deviceRunContainerOptions負責封裝Container的Envs、Mount points、Device files、Annotations。
到此,關于“Device Manager在什么時候創建”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。