您好,登錄后才能下訂單哦!
本篇內容主要講解“如何實例化一個Taint Manager”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“如何實例化一個Taint Manager”吧!
PodInformer添加Event Handler時,通過調用taintManager.PodUpdated(oldPod *v1.Pod, newPod *v1.Pod)
往tc.podUpdateQueue添加updateItem。
NodeInformer添加Event Handler時,通過調用taintManager.NodeUpdated(oldNode *v1.Node, newNode *v1.Node)
往tc.nodeUpdateQueue添加updateItem。
當創建NodeController時,如果runTaintManager為true(通過kube-controller-manager的--enable-taint-manager中指定,默認為true),則會通過NewNoExecuteTaintManager
來實例化一個Taint Manager。
pkg/controller/node/nodecontroller.go:195 func NewNodeController(..) (*NodeController, error) { ... podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { ... if nc.taintManager != nil { nc.taintManager.PodUpdated(nil, pod) } }, ... } ... } else { nodeEventHandlerFuncs = cache.ResourceEventHandlerFuncs{ AddFunc: func(originalObj interface{}) { ... if nc.taintManager != nil { nc.taintManager.NodeUpdated(nil, node) } }, ... } } ... if nc.runTaintManager { nc.taintManager = NewNoExecuteTaintManager(kubeClient) } ... return nc, nil }
因此,創建NodeController時已經配置了監聽pod和node的事件,并會將相關數據發送到tc.podUpdateQueue和tc.nodeUpdateQueue,然后由Taint Manager從中取出數據進行處理。在此之前,我們先來看看NewNoExecuteTaintManager是如何實例化一個Taint Manager的。
pkg/controller/node/taint_controller.go:152 func NewNoExecuteTaintManager(c clientset.Interface) *NoExecuteTaintManager { ... tm := &NoExecuteTaintManager{ client: c, recorder: recorder, // taintedNodes記錄每個Node對應的Taint信息。 taintedNodes: make(map[string][]v1.Taint), // nodeUpdateQueue中取出的updateItem會發送到nodeUpdateChannel,Tait Manager從該Channel中取出對應的node update info。 nodeUpdateChannel: make(chan *nodeUpdateItem, nodeUpdateChannelSize), // podUpdateQueue中取出的updateItem會發送到podUpdateChannel,Tait Manager從該Channel中取出對應的pod update info。 podUpdateChannel: make(chan *podUpdateItem, podUpdateChannelSize), // Node Controller監聽到的node update info會發送到nodeUpdateQueue。 nodeUpdateQueue: workqueue.New(), // Node Controller監聽到的pod update info會發送到podUpdateQueue。 podUpdateQueue: workqueue.New(), } // CreateWorkerQueue creates a new TimedWorkerQueue for workers that will execute deletePodHandler. tm.taintEvictionQueue = CreateWorkerQueue(deletePodHandler(c, tm.emitPodDeletionEvent)) return tm }
相關的代碼分析見里面的代碼注釋。需要強調的是,我們在這里給tm.taintEvictionQueue注冊了函數deletePodHandler
,用來通過Taint Eviction時刪除pod時調用。Taint Manager Run的時候會通過tc.taintEvictionQueue.AddWork()
時創建Worker來執行deletePodHandler
。
func deletePodHandler(c clientset.Interface, emitEventFunc func(types.NamespacedName)) func(args *WorkArgs) error { return func(args *WorkArgs) error { ns := args.NamespacedName.Namespace name := args.NamespacedName.Name glog.V(0).Infof("NoExecuteTaintManager is deleting Pod: %v", args.NamespacedName.String()) if emitEventFunc != nil { emitEventFunc(args.NamespacedName) } var err error // 按照失敗重試5次,每次間隔10s的重試機制,調用apiserver的api刪除對應的Pod。 for i := 0; i < retries; i++ { err = c.Core().Pods(ns).Delete(name, &metav1.DeleteOptions{}) if err == nil { break } time.Sleep(10 * time.Millisecond) } return err } }
在Kubernetes Node Controller源碼分析之執行篇中提到,在Node Controller Run的時候,如果runTaintManager為true,則會調用nc.taintManager.Run啟動Taint Manager loop。
pkg/controller/node/nodecontroller.go:550 func (nc *NodeController) Run() { go func() { ... if nc.runTaintManager { go nc.taintManager.Run(wait.NeverStop) } ... }() }
接下來,我們來看Taint Manager的Run方法。Node Controller啟動的Taint Manager實例其實就是NoExecuteTaintManager,其對應的Run方法代碼如下。
pkg/controller/node/taint_controller.go:179 // Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed. func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) { glog.V(0).Infof("Starting NoExecuteTaintManager") // Functions that are responsible for taking work items out of the workqueues and putting them into channels. // 從tc.nodeUpdateQueue中獲取updateItem,并發送到tc.nodeUpdateChannel。 go func(stopCh <-chan struct{}) { for { item, shutdown := tc.nodeUpdateQueue.Get() if shutdown { break } nodeUpdate := item.(*nodeUpdateItem) select { case <-stopCh: break case tc.nodeUpdateChannel <- nodeUpdate: } } }(stopCh) // 從tc.podUpdateQueue中獲取updateItem,并發送到tc.podUpdateChannel。 go func(stopCh <-chan struct{}) { for { item, shutdown := tc.podUpdateQueue.Get() if shutdown { break } podUpdate := item.(*podUpdateItem) select { case <-stopCh: break case tc.podUpdateChannel <- podUpdate: } } }(stopCh) // When processing events we want to prioritize Node updates over Pod updates, // as NodeUpdates that interest NoExecuteTaintManager should be handled as soon as possible - // we don't want user (or system) to wait until PodUpdate queue is drained before it can // start evicting Pods from tainted Nodes. for { select { case <-stopCh: break // 從tc.nodeUpdateChannel獲取nodeUpdate數據,然后invoke tc.handleNodeUpdate進行處理。 case nodeUpdate := <-tc.nodeUpdateChannel: tc.handleNodeUpdate(nodeUpdate) // 從tc.podUpdateChannel獲取podUpdate數據,在invoke tc.handlePodUpdate進行處理之前,先確保tc.nodeUpdateQueue中的數據已經被處理完。 case podUpdate := <-tc.podUpdateChannel: // If we found a Pod update we need to empty Node queue first. priority: for { select { case nodeUpdate := <-tc.nodeUpdateChannel: tc.handleNodeUpdate(nodeUpdate) default: break priority } } // After Node queue is emptied we process podUpdate. tc.handlePodUpdate(podUpdate) } } }
可見, Run方法中分別從對應的queue中取出數據,然后調用tc.handleNodeUpdate
和tc.handlePodUpdate
進行處理。
// pkg/controller/node/taint_controller.go:365 func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate *nodeUpdateItem) { // Delete // 如果nodeUpdate.newNode == nil,則表明該Node被刪除了,那么將該Node的Taints信息從tc.taintedNodes緩存中刪除。 if nodeUpdate.newNode == nil { node := nodeUpdate.oldNode glog.V(4).Infof("Noticed node deletion: %#v", node.Name) tc.taintedNodesLock.Lock() defer tc.taintedNodesLock.Unlock() delete(tc.taintedNodes, node.Name) return } // Create or Update // 如果是Node Create或者Node Update Event,則更新tc.taintedNodes緩存中記錄的該Node的Taints信息。 glog.V(4).Infof("Noticed node update: %#v", nodeUpdate) node := nodeUpdate.newNode taints := nodeUpdate.newTaints func() { tc.taintedNodesLock.Lock() defer tc.taintedNodesLock.Unlock() glog.V(4).Infof("Updating known taints on node %v: %v", node.Name, taints) if len(taints) == 0 { delete(tc.taintedNodes, node.Name) } else { tc.taintedNodes[node.Name] = taints } }() // 然后,獲取該Node上所有pods list。 pods, err := getPodsAssignedToNode(tc.client, node.Name) if err != nil { glog.Errorf(err.Error()) return } if len(pods) == 0 { return } // Short circuit, to make this controller a bit faster. // 如果該Node上的Taints被刪除了,則取消所有該node上的pod evictions。 if len(taints) == 0 { glog.V(4).Infof("All taints were removed from the Node %v. Cancelling all evictions...", node.Name) for i := range pods { tc.cancelWorkWithEvent(types.NamespacedName{Namespace: pods[i].Namespace, Name: pods[i].Name}) } return } // 否則,調用tc.processPodOnNode根據Node Taints info和Pod Tolerations info處理該Node上的Pod Eviction。 now := time.Now() for i := range pods { pod := &pods[i] podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} tc.processPodOnNode(podNamespacedName, node.Name, pod.Spec.Tolerations, taints, now) } }
handleNodeUpdate的邏輯為:
如果nodeUpdate.newNode == nil,則表明該Node被刪除了,那么將該Node的Taints信息從tc.taintedNodes緩存中刪除。
如果是Node Create或者Node Update Event,則更新tc.taintedNodes緩存中記錄的該Node的Taints信息。
獲取該Node上所有pods list。
如果該Node上的Taints被刪除了,則取消所有該node上的pod evictions。
否則,遍歷pods list中的每個pod,分別調用tc.processPodOnNode根據Node Taints info和Pod Tolerations info處理該Node上的Pod Eviction。
// pkg/controller/node/taint_controller.go:334 func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate *podUpdateItem) { // Delete // 如果podUpdate.newPod == nil,則表明該Pod被刪除了,那么取消該Pod Evictions。 if podUpdate.newPod == nil { pod := podUpdate.oldPod podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} glog.V(4).Infof("Noticed pod deletion: %#v", podNamespacedName) tc.cancelWorkWithEvent(podNamespacedName) return } // Create or Update // 如果是Pod Create或者Pod Update Event,則取出該pod的node上的Taints info。 pod := podUpdate.newPod podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} glog.V(4).Infof("Noticed pod update: %#v", podNamespacedName) nodeName := pod.Spec.NodeName if nodeName == "" { return } taints, ok := func() ([]v1.Taint, bool) { tc.taintedNodesLock.Lock() defer tc.taintedNodesLock.Unlock() taints, ok := tc.taintedNodes[nodeName] return taints, ok }() // It's possible that Node was deleted, or Taints were removed before, which triggered // eviction cancelling if it was needed. if !ok { return } // 然后,調用tc.processPodOnNode根據Node Taints info和Pod Tolerations info處理該Node上的Pod Eviction。 tc.processPodOnNode(podNamespacedName, nodeName, podUpdate.newTolerations, taints, time.Now()) }
handlePodUpdate的邏輯為:
如果podUpdate.newPod == nil,則表明該Pod被刪除了,那么取消該Pod Evictions。
如果是Pod Create或者Pod Update Event,則取出該pod的node上的Taints info。
如果node上的Taints info信息為空,表明Taints info被刪除了或者Node被刪除了,那么就不需要處理該node上的pod eviction了,流程結束。
否則,調用tc.processPodOnNode根據Node Taints info和Pod Tolerations info處理該Node上的Pod Eviction。
因此,不管是handlePodUpdate
還是handleNodeUpdate
,最終都是通過processPodOnNode來處理Pod Eviction的。
pkg/controller/node/taint_controller.go:295 func (tc *NoExecuteTaintManager) processPodOnNode( podNamespacedName types.NamespacedName, nodeName string, tolerations []v1.Toleration, taints []v1.Taint, now time.Time, ) { // 如果該node的taints info為空,則取消Taint Eviction Pods。 if len(taints) == 0 { tc.cancelWorkWithEvent(podNamespacedName) } // 對比node的taints info和pod tolerations info,判斷出node的taints是否都能被pod所能容忍。 allTolerated, usedTolerations := v1.GetMatchingTolerations(taints, tolerations) // 如果不是全部都能容忍,那么調用立刻調用AddWork來創建worker,啟動tc.taintEvictionQueue注冊的deletePodHandler來刪除該pod。 if !allTolerated { glog.V(2).Infof("Not all taints are tolerated after update for Pod %v on %v", podNamespacedName.String(), nodeName) // We're canceling scheduled work (if any), as we're going to delete the Pod right away. tc.cancelWorkWithEvent(podNamespacedName) tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), time.Now(), time.Now()) return } // 否則,取pod的所有tolerations的TolerationSeconds的最小值作為minTolerationTime。如果某個Toleration沒有設置TolerationSeconds,則表示0,如果設置的值為負數,則用0替代。 minTolerationTime := getMinTolerationTime(usedTolerations) // getMinTolerationTime returns negative value to denote infinite toleration. if minTolerationTime < 0 { glog.V(4).Infof("New tolerations for %v tolerate forever. Scheduled deletion won't be cancelled if already scheduled.", podNamespacedName.String()) return } startTime := now triggerTime := startTime.Add(minTolerationTime) // 從tc.taintEvictionQueue中獲取Worker-scheduledEviction scheduledEviction := tc.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String()) // 如果獲取到不為空的scheduledEviction,則判斷worker創建時間加上minTolerationTime是否達到觸發時間要求,如果沒達到,則不進行Taint Pod Eviction,流程結束。 if scheduledEviction != nil { startTime = scheduledEviction.CreatedAt if startTime.Add(minTolerationTime).Before(triggerTime) { return } else { tc.cancelWorkWithEvent(podNamespacedName) } } // 如果達到觸發時間要求,則取消worker,并立刻調用AddWork來創建worker,啟動tc.taintEvictionQueue注冊的deletePodHandler來刪除該pod。 tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime) }
processPodOnNode的邏輯為:
如果該node的taints info為空,則取消Taint Eviction Pods。
對比node的taints info和pod tolerations info,判斷出node的taints是否都能被pod所能容忍。
如果不是全部都能容忍,那么調用立刻調用AddWork來創建worker,啟動tc.taintEvictionQueue注冊的deletePodHandler來刪除該pod。
否則,取pod的所有tolerations的TolerationSeconds的最小值作為minTolerationTime。如果某個Toleration沒有設置TolerationSeconds,表示不作驅逐。
如果獲取到不為空的scheduledEviction,則判斷worker創建時間加上minTolerationTime是否達到觸發時間要求,如果沒達到,則不進行Taint Pod Eviction,流程結束。
如果達到觸發時間要求,則取消worker,并立刻調用AddWork來創建worker,啟動tc.taintEvictionQueue注冊的deletePodHandler來刪除該pod。
如果minTolerationTime小于0,則永遠容忍,流程結束。
從tc.taintEvictionQueue中獲取Worker-scheduledEviction。
到此,相信大家對“如何實例化一個Taint Manager”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。