您好,登錄后才能下訂單哦!
本篇內容主要講解“kubernetes提升Scheduler吞吐量的工作機制是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“kubernetes提升Scheduler吞吐量的工作機制是什么”吧!
2015年,google發表的關于Borg的論文“Large-scale cluster management at Google with Borg”中對Equivalence Class的描述如下:
Equivalence classes: Tasks in a Borg job usually have identical requirements and constraints, so rather than determining feasibility for every pending task on every machine, and scoring all the feasible machines, Borg only does feasibility and scoring for one task per equivalence class – a group of tasks with identical requirements.
Equivalence Class目前是用來在Kubernetes Scheduler加速Predicate,提升Scheduler的吞吐性能。Kubernetes scheduler及時維護著Equivalence Cache的數據,當某些情況發生時(比如delete node、bind pod等事件),需要立刻invalid相關的Equivalence Cache中的緩存數據。
一個Equivalence Class是用來定義一組具有相同Requirements和Constraints的Pods的相關信息的集合,在Scheduler進行Predicate階段時可以只需對Equivalence Class中一個Pod進行Predicate,并把Predicate的結果放到Equivalence Cache中以供該Equivalence Class中其他Pods(成為Equivalent Pods)重用該結果。只有當Equivalence Cache中沒有可以重用的Predicate Result才會進行正常的Predicate流程。
什么樣的Pods會被歸類到同一Equivalence Class呢?按照其定義,其實只要Pods有某些相同的field,比如resources requirement、label、affinity等,它們都被認為是Equivalent Pods,屬于同一Equivalence Class。但是考慮到用戶可能隨時修改Pods的fields,會導致Scheduler需要及時更新該Pod所屬的Equivalence Class變動,從而導致可能正在進行的Predicate需要感知這一變化并作出變更,這使得問題變得異常復雜。因此,目前Scheduler只把那些屬于同一OwnerReference(包括RC,RS,Job, StatefulSet)的Pods歸類到同一Equivalence Class,比如某個RS定義了N個副本,那么這N個副本Pods就對應一個Equivalence Class。Scheduler會為每個Equivalence Class中的Equivalent Pods計算出一個uint64 EquivalenceHash值。
注意,截止Kubernetes 1.10,即使有兩個一樣Pod Template的RS,也會對應兩個Equivalence Class。
要想使用Equivalence Class需要啟用EnableEquivalenceClassCache Feature Gate,截止Kubernetes 1.10,該Feature還是Alpha階段。
前期我的幾遍關于scheduler的博客中對Predicate的分析中提到,所有注冊成功的Predicate Policy都會在scheduler.findNodesThatFit(pod, nodes, predicateFuncs ...)
過程中按照一定的并行數對每個node調用scheduler.podFitsOnNode(pod, node, predicateFuncs ...)
進行注冊的Predicate Policys檢查。
podFitsOnNode的輸入是一個pod,一個node和一系列注冊成功的predicateFuncs,用來檢查node是否滿足該pod的預選條件。加入了Equivalence Class之后,預選階段會發生了如下變化:
預選之前,先檢查該pod是否有對應的Equivalence Class。
如果有對應的Equivalence Class,那么接下來檢查Equivalence Cache中是否有可用的Predicate Result,否則觸發完整的正常預選。
如果有可用的Predicate Result,那么直接使用該Cached Predicate Result完成預選,否則觸發完整的正常預選。
Equivalence Cache會存儲每個node的Predicates Results,是一個3層Map對象:
第一層key是node name,表示節點名稱;
第二層key是predicateKey,表示預選策略,因此該node對應的algorithmCache Entries數量最多不超過Scheduler注冊的Predicate Policies數量,這用來保證Cache大小,防止查找Equivalence Cache時性能太差。
第三層key是Equivalence Hash,前面已經提到過。
比如,algorithmCache[$nodeName].predicatesCache.Get($predicateKey)[$equivalenceHash]
表示$equivalenceHash
對應的Pods在$nodeName
節點上進行$predicateKey
進行預選是否成功。
截止Kubernetes 1.10,predicateKey支持列表如下(20個):
MatchInterPodAffinity
CheckVolumeBinding
CheckNodeCondition
GeneralPredicates
HostName
PodFitsHostPorts
MatchNodeSelector
PodFitsResources
NoDiskConflict
PodToleratesNodeTaints
CheckNodeUnschedulable
PodToleratesNodeNoExecuteTaints
CheckNodeLabelPresence
CheckServiceAffinity
MaxEBSVolumeCount
MaxGCEPDVolumeCount
MaxAzureDiskVolumeCount
NoVolumeZoneConflict
CheckNodeMemoryPressure
CheckNodeDiskPressure
注意,即使該Pod找到對應的Equivalence Class,Equivalence Cache中也有可能沒有可用的Predicate Result,或者對應的Predicate Result已經失效。這時就會觸發正常的Predicate,并把Result寫到Equivalence Cache中。
如何維護和更新Equivalence Cache呢?如果頻繁的更新整個node對應的Equivalence Cache,這違背了Equivalence Cache設計的初衷,并不能提升Predicate的效率。
前面提到過Equivalence Cache的三層Map結構設計,第二層Key是predicateKey,因此Scheduler能做到只invalid單個Predicate Result,而不是盲目的invalid整個node的algorithmCache。
Scheduler會Watch相關API Objects Add/Update/Delete Event,并根據相關策略invalid對應的Equivalence Cache數據,具體的邏輯請看下面的源碼分析部分。
Equivalence Cache結構定義如下:
// EquivalenceCache holds: // 1. a map of AlgorithmCache with node name as key // 2. function to get equivalence pod type EquivalenceCache struct { sync.RWMutex getEquivalencePod algorithm.GetEquivalencePodFunc algorithmCache map[string]AlgorithmCache } // The AlgorithmCache stores PredicateMap with predicate name as key type AlgorithmCache struct { // Only consider predicates for now predicatesCache *lru.Cache }
Equivalence Cache真正的緩存數據是通過algorithmCache Map存儲,其key為nodeName。
每個node上的Predicate Result Cache通過AlgorithmCache.predicateCache存儲,predicateCache是LRU(Least Recently Used,最少最近使用算法)Cache,只能存儲一定數量的Entries,Kubernetes中指定最大值為100(Kubernetes 1.10默認實現的Predicate Funcs一共有20個)。
LRU Cache是一個Cache置換算法,含義是“最近最少使用”,當Cache滿(沒有空閑的cache塊)時,把滿足“最近最少使用”的數據從Cache中置換出去,并且保證Cache中第一個數據是最近剛剛訪問的。由“局部性原理”,這樣的數據更有可能被接下來的程序訪問,提升性能。
predicateCache也是k-v存儲,key為predicateKey,value為PredicateMap。
predicateMap的key為uint64的Equivalence Hash,value為HostPredicate。
HostPredicate用來表示Pod使用Predicate Policy與某個node的匹配結果,結構如下:
// HostPredicate is the cached predicate result type HostPredicate struct { Fit bool FailReasons []algorithm.PredicateFailureReason }
InvalidateCachedPredicateItem:用來從Equivalence Cache中刪除某個node上某個predicate policy的所有EquivalenceHash(對應Equivalent Pods)的Predicate Result緩存數據。
func (ec *EquivalenceCache) InvalidateCachedPredicateItem(nodeName string, predicateKeys sets.String) { ... if algorithmCache, exist := ec.algorithmCache[nodeName]; exist { for predicateKey := range predicateKeys { algorithmCache.predicatesCache.Remove(predicateKey) } } ... }
InvalidateCachedPredicateItemOfAllNodes:用來刪除所有node上指定predicate policy集合對應的所有EquivalenceHash(對應Equivalent Pods)的Predicate Result緩存數據。
func (ec *EquivalenceCache) InvalidateCachedPredicateItemOfAllNodes(predicateKeys sets.String) { ... // algorithmCache uses nodeName as key, so we just iterate it and invalid given predicates for _, algorithmCache := range ec.algorithmCache { for predicateKey := range predicateKeys { // just use keys is enough algorithmCache.predicatesCache.Remove(predicateKey) } } ... }
PredicateWithECache:檢查Equivalence Cache中的Predicate Result緩存數據是否有可用的數據,如果命中緩存,則直接根據緩存中的Predicate Result作為該pod在該node上該Predicate policy的預選結果返回。如果沒命中,則返回false和失敗原因。
// PredicateWithECache returns: // 1. if fit // 2. reasons if not fit // 3. if this cache is invalid // based on cached predicate results func (ec *EquivalenceCache) PredicateWithECache( podName, nodeName, predicateKey string, equivalenceHash uint64, needLock bool, ) (bool, []algorithm.PredicateFailureReason, bool) { ... if algorithmCache, exist := ec.algorithmCache[nodeName]; exist { if cachePredicate, exist := algorithmCache.predicatesCache.Get(predicateKey); exist { predicateMap := cachePredicate.(PredicateMap) // TODO(resouer) Is it possible a race that cache failed to update immediately? if hostPredicate, ok := predicateMap[equivalenceHash]; ok { if hostPredicate.Fit { return true, []algorithm.PredicateFailureReason{}, false } return false, hostPredicate.FailReasons, false } // is invalid return false, []algorithm.PredicateFailureReason{}, true } } return false, []algorithm.PredicateFailureReason{}, true }
UpdateCachedPredicateItem:當PredicateWithECache使用Predicate Result Cache數據命中失敗時,scheduler會調用對應的Predicate Funcs觸發真正的預選邏輯,完成之后,就通過UpdateCachedPredicateItem將剛預選的結果更新到Equivalence Cache緩存中。每個node的predicateCache的初始化也是在這里完成的。
// UpdateCachedPredicateItem updates pod predicate for equivalence class func (ec *EquivalenceCache) UpdateCachedPredicateItem( podName, nodeName, predicateKey string, fit bool, reasons []algorithm.PredicateFailureReason, equivalenceHash uint64, needLock bool, ) { ... if _, exist := ec.algorithmCache[nodeName]; !exist { ec.algorithmCache[nodeName] = newAlgorithmCache() } predicateItem := HostPredicate{ Fit: fit, FailReasons: reasons, } // if cached predicate map already exists, just update the predicate by key if v, ok := ec.algorithmCache[nodeName].predicatesCache.Get(predicateKey); ok { predicateMap := v.(PredicateMap) // maps in golang are references, no need to add them back predicateMap[equivalenceHash] = predicateItem } else { ec.algorithmCache[nodeName].predicatesCache.Add(predicateKey, PredicateMap{ equivalenceHash: predicateItem, }) } }
Kubernetes在注冊predicates、priorities、scheduler extenders時,同時也會進行Equivalence Cache的初始化,并將其傳入scheduler config中。
// Creates a scheduler from a set of registered fit predicate keys and priority keys. func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*scheduler.Config, error) { ... // Init equivalence class cache if c.enableEquivalenceClassCache && getEquivalencePodFuncFactory != nil { pluginArgs, err := c.getPluginArgs() if err != nil { return nil, err } c.equivalencePodCache = core.NewEquivalenceCache( getEquivalencePodFuncFactory(*pluginArgs), ) glog.Info("Created equivalence class cache") } ... } // NewEquivalenceCache creates a EquivalenceCache object. func NewEquivalenceCache(getEquivalencePodFunc algorithm.GetEquivalencePodFunc) *EquivalenceCache { return &EquivalenceCache{ getEquivalencePod: getEquivalencePodFunc, algorithmCache: make(map[string]AlgorithmCache), } }
NewEquivalenceCache負責Equivalence Cache的初始化工作,那么getEquivalencePod又是在哪完成注冊的呢?defualt algorithm provider初始化時完成注冊GetEquivalencePodFunc(只能使用defualt provider?通過configfile就不行嗎?),注意這里factory.PluginFactoryArgs
只傳入了PVCInfo。
GetEquivalencePodFunc is a function that gets a EquivalencePod from a pod.
pkg/scheduler/algorithmprovider/defaults/defaults.go:38 func init() { ... // Use equivalence class to speed up heavy predicates phase. factory.RegisterGetEquivalencePodFunction( func(args factory.PluginFactoryArgs) algorithm.GetEquivalencePodFunc { return predicates.NewEquivalencePodGenerator(args.PVCInfo) }, ) ... }
為什么只傳入PVCInfo呢?或者為什么需要PVCInfo呢?要回答這個問題,我們先來看看EquivalencePod和getEquivalencePod的定義。
// EquivalencePod is a group of pod attributes which can be reused as equivalence to schedule other pods. type EquivalencePod struct { ControllerRef metav1.OwnerReference PVCSet sets.String }
EquivalencePod定義了具備哪些相同屬性的Pods屬于Equivalent Pods,Equivalence Hash就是根據Pod的EquivalencePod中指定的兩個屬性來計算的,這兩個屬性分別是:
ControllerRef:對應Pod的meta.OwnerReference,對應Pod所屬的Controller Object,可以是RS,RC,Job,StatefulSet類型之一。
PVCSet:是Pod所引用的所有PVCs IDs集合。
因此,只有兩個Pod屬于同一個Controller并且引用可同樣的PVCs對象才被認為是EquivalentPod,對應同一個Equivalence Hash。
getEquivalencePod根據Pod Object中的OwnerReference和PVC信息獲取它所屬的EquivalencePod對象。
func (e *EquivalencePodGenerator) getEquivalencePod(pod *v1.Pod) interface{} { for _, ref := range pod.OwnerReferences { if ref.Controller != nil && *ref.Controller { pvcSet, err := e.getPVCSet(pod) if err == nil { // A pod can only belongs to one controller, so let's return. return &EquivalencePod{ ControllerRef: ref, PVCSet: pvcSet, } } return nil } } return nil }
預選的入口是findNodesThatFit,也就是在findNodesThatFit中調用了getEquivalenceClassInfo計算Pod的EquivalenceHash,然后把該hash值傳入podFitsOnNode中進行后續的Equivalence Class功能。
func findNodesThatFit( pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*v1.Node, predicateFuncs map[string]algorithm.FitPredicate, extenders []algorithm.SchedulerExtender, metadataProducer algorithm.PredicateMetadataProducer, ecache *EquivalenceCache, schedulingQueue SchedulingQueue, alwaysCheckAllPredicates bool, ) ([]*v1.Node, FailedPredicateMap, error) { ... var equivCacheInfo *equivalenceClassInfo if ecache != nil { // getEquivalenceClassInfo will return immediately if no equivalence pod found equivCacheInfo = ecache.getEquivalenceClassInfo(pod) } checkNode := func(i int) { nodeName := nodes[i].Name fits, failedPredicates, err := podFitsOnNode( pod, meta, nodeNameToInfo[nodeName], predicateFuncs, ecache, schedulingQueue, alwaysCheckAllPredicates, equivCacheInfo, ) ... } ... }
getEquivalenceClassInfo計算pod的EquivalenceHash的原理如下:
// getEquivalenceClassInfo returns the equivalence class of given pod. func (ec *EquivalenceCache) getEquivalenceClassInfo(pod *v1.Pod) *equivalenceClassInfo { equivalencePod := ec.getEquivalencePod(pod) if equivalencePod != nil { hash := fnv.New32a() hashutil.DeepHashObject(hash, equivalencePod) return &equivalenceClassInfo{ hash: uint64(hash.Sum32()), } } return nil }
可見,EquivalenceHash就是對getEquivalencePod利用FNV算法進行哈希的。
我們先看看podFitsOnNode的相關實現:
func podFitsOnNode( pod *v1.Pod, meta algorithm.PredicateMetadata, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate, ecache *EquivalenceCache, queue SchedulingQueue, alwaysCheckAllPredicates bool, equivCacheInfo *equivalenceClassInfo, ) (bool, []algorithm.PredicateFailureReason, error) { ... if predicate, exist := predicateFuncs[predicateKey]; exist { // Use an in-line function to guarantee invocation of ecache.Unlock() // when the in-line function returns. func() { var invalid bool if eCacheAvailable { // Lock ecache here to avoid a race condition against cache invalidation invoked // in event handlers. This race has existed despite locks in equivClassCacheimplementation. ecache.Lock() defer ecache.Unlock() // PredicateWithECache will return its cached predicate results. fit, reasons, invalid = ecache.PredicateWithECache( pod.GetName(), info.Node().GetName(), predicateKey, equivCacheInfo.hash, false) } if !eCacheAvailable || invalid { // we need to execute predicate functions since equivalence cache does not work fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse) if err != nil { return } if eCacheAvailable { // Store data to update equivClassCacheafter this loop. if res, exists := predicateResults[predicateKey]; exists { res.Fit = res.Fit && fit res.FailReasons = append(res.FailReasons, reasons...) predicateResults[predicateKey] = res } else { predicateResults[predicateKey] = HostPredicate{Fit: fit, FailReasons: reasons} } result := predicateResults[predicateKey] ecache.UpdateCachedPredicateItem( pod.GetName(), info.Node().GetName(), predicateKey, result.Fit, result.FailReasons, equivCacheInfo.hash, false) } } }() ... }
podFitsOnNode時會先通過PredicateWithECache檢查是否Equivalence Cache中有該緩存命中:
如果有命中數據可用,則對應的Predicate Policy就算處理完成。
如果沒有命中數據才會觸發調用predicate,然后將predicate的結果通過UpdateCachedPredicateItem添加/更新到緩存中。
我們回到Scheduler Config Factory,看看Scheduler中podInformer、nodeInformer、serviceInformer、pvcInformer等注冊的EventHandler中對Equivalence Cache的操作。
當完成pod的調度后,在Bind Node之前,會先進行Pod Assume,在Assume過程中,會對Equivalence Cache有操作。
// assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous. // assume modifies `assumed`. func (sched *Scheduler) assume(assumed *v1.Pod, host string) error { ... // Optimistically assume that the binding will succeed, so we need to invalidate affected // predicates in equivalence cache. // If the binding fails, these invalidated item will not break anything. if sched.config.Ecache != nil { sched.config.Ecache.InvalidateCachedPredicateItemForPodAdd(assumed, host) } return nil }
Assume Pod時調用InvalidateCachedPredicateItemForPodAdd對Equivalence Cache進行操作。
func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) { // GeneralPredicates: will always be affected by adding a new pod invalidPredicates := sets.NewString("GeneralPredicates") // MaxPDVolumeCountPredicate: we check the volumes of pod to make decision. for _, vol := range pod.Spec.Volumes { if vol.PersistentVolumeClaim != nil { invalidPredicates.Insert("MaxEBSVolumeCount", "MaxGCEPDVolumeCount", "MaxAzureDiskVolumeCount") } else { if vol.AWSElasticBlockStore != nil { invalidPredicates.Insert("MaxEBSVolumeCount") } if vol.GCEPersistentDisk != nil { invalidPredicates.Insert("MaxGCEPDVolumeCount") } if vol.AzureDisk != nil { invalidPredicates.Insert("MaxAzureDiskVolumeCount") } } } ec.InvalidateCachedPredicateItem(nodeName, invalidPredicates) }
InvalidateCachedPredicateItemForPodAdd中可以看出,Assume Pod會刪除該node上以下predicateKey對應的predicateCache:
GeneralPredicates;
如果該pod中引用了PVCs,則會刪除"MaxEBSVolumeCount", "MaxGCEPDVolumeCount", "MaxAzureDiskVolumeCount"這些PredicateCaches;
如果pod volume中使用了AWSElasticBlockStore,則會刪除MaxEBSVolumeCount PredicateCache;
如果pod volume中使用了GCEPersistentDisk,則會刪除MaxGCEPDVolumeCount PredicateCache;
如果pod volume中使用了AzureDisk,則會刪除MaxAzureDiskVolumeCount PredicateCache;
在scheduler進行NewConfigFactory時,注冊Update assignedNonTerminatedPod Event Handler為updatePodInCache。
func (c *configFactory) updatePodInCache(oldObj, newObj interface{}) { ... c.invalidateCachedPredicatesOnUpdatePod(newPod, oldPod) c.podQueue.AssignedPodUpdated(newPod) } func (c *configFactory) invalidateCachedPredicatesOnUpdatePod(newPod *v1.Pod, oldPod *v1.Pod) { if c.enableEquivalenceClassCache { // if the pod does not have bound node, updating equivalence cache is meaningless; // if pod's bound node has been changed, that case should be handled by pod add & delete. if len(newPod.Spec.NodeName) != 0 && newPod.Spec.NodeName == oldPod.Spec.NodeName { if !reflect.DeepEqual(oldPod.GetLabels(), newPod.GetLabels()) { // MatchInterPodAffinity need to be reconsidered for this node, // as well as all nodes in its same failure domain. c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes( matchInterPodAffinitySet) } // if requested container resource changed, invalidate GeneralPredicates of this node if !reflect.DeepEqual(predicates.GetResourceRequest(newPod), predicates.GetResourceRequest(oldPod)) { c.equivalencePodCache.InvalidateCachedPredicateItem( newPod.Spec.NodeName, generalPredicatesSets) } } } }
updatePodInCache調用invalidateCachedPredicatesOnUpdatePod對Equivalence Cache做了如下處理:
如果pod Labels做了更新,那么會刪除所有nodes上Equivalence Cache中的MatchInterPodAffinity PredicateCache;
如果pod的resource request做了更新,那么會刪除該node上Equivalence Cache中的GeneralPredicates PredicateCache;
同樣的,當發生刪除assignedNonTerminatedPod時,對應會調用invalidateCachedPredicatesOnDeletePod更新Equivalence Cache。
func (c *configFactory) invalidateCachedPredicatesOnDeletePod(pod *v1.Pod) { if c.enableEquivalenceClassCache { // part of this case is the same as pod add. c.equivalencePodCache.InvalidateCachedPredicateItemForPodAdd(pod, pod.Spec.NodeName) // MatchInterPodAffinity need to be reconsidered for this node, // as well as all nodes in its same failure domain. // TODO(resouer) can we just do this for nodes in the same failure domain c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes( matchInterPodAffinitySet) // if this pod have these PV, cached result of disk conflict will become invalid. for _, volume := range pod.Spec.Volumes { if volume.GCEPersistentDisk != nil || volume.AWSElasticBlockStore != nil || volume.RBD != nil || volume.ISCSI != nil { c.equivalencePodCache.InvalidateCachedPredicateItem( pod.Spec.NodeName, noDiskConflictSet) } } } }
invalidateCachedPredicatesOnDeletePod更新Equivalence Cache的處理總結為:
刪除該node上Equivalence Cache中的GeneralPredicates PredicateCache;
如果該pod中引用了PVCs,則會刪除該node上Equivalence Cache中的"MaxEBSVolumeCount", "MaxGCEPDVolumeCount", "MaxAzureDiskVolumeCount"這些PredicateCaches;
如果pod volume中使用了AWSElasticBlockStore,則會刪除該node上Equivalence Cache中的MaxEBSVolumeCount PredicateCache;
如果pod volume中使用了GCEPersistentDisk,則會刪除該node上Equivalence Cache中的MaxGCEPDVolumeCount PredicateCache;
如果pod volume中使用了AzureDisk,則會刪除該node上Equivalence Cache中的MaxAzureDiskVolumeCount PredicateCache;
刪除所有nodes上Equivalence Cache中的MatchInterPodAffinity PredicateCache;
如果pod的resource request做了更新,那么會刪除該node上Equivalence Cache中的GeneralPredicates PredicateCache;
如果pod volume中引用了GCEPersistentDisk、AWSElasticBlockStore、RBD、ISCSI之一,則刪除該node上Equivalence Cache中的NoDiskConflict PredicateCache。
當發生node update event時,對應會調用invalidateCachedPredicatesOnNodeUpdate更新Equivalence Cache。
func (c *configFactory) invalidateCachedPredicatesOnNodeUpdate(newNode *v1.Node, oldNode *v1.Node) { if c.enableEquivalenceClassCache { // Begin to update equivalence cache based on node update // TODO(resouer): think about lazily initialize this set invalidPredicates := sets.NewString() if !reflect.DeepEqual(oldNode.Status.Allocatable, newNode.Status.Allocatable) { invalidPredicates.Insert(predicates.GeneralPred) // "PodFitsResources" } if !reflect.DeepEqual(oldNode.GetLabels(), newNode.GetLabels()) { invalidPredicates.Insert(predicates.GeneralPred, predicates.CheckServiceAffinityPred) // "PodSelectorMatches" for k, v := range oldNode.GetLabels() { // any label can be topology key of pod, we have to invalidate in all cases if v != newNode.GetLabels()[k] { invalidPredicates.Insert(predicates.MatchInterPodAffinityPred) } // NoVolumeZoneConflict will only be affected by zone related label change if isZoneRegionLabel(k) { if v != newNode.GetLabels()[k] { invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred) } } } } oldTaints, oldErr := helper.GetTaintsFromNodeAnnotations(oldNode.GetAnnotations()) if oldErr != nil { glog.Errorf("Failed to get taints from old node annotation for equivalence cache") } newTaints, newErr := helper.GetTaintsFromNodeAnnotations(newNode.GetAnnotations()) if newErr != nil { glog.Errorf("Failed to get taints from new node annotation for equivalence cache") } if !reflect.DeepEqual(oldTaints, newTaints) || !reflect.DeepEqual(oldNode.Spec.Taints, newNode.Spec.Taints) { invalidPredicates.Insert(predicates.PodToleratesNodeTaintsPred) } if !reflect.DeepEqual(oldNode.Status.Conditions, newNode.Status.Conditions) { oldConditions := make(map[v1.NodeConditionType]v1.ConditionStatus) newConditions := make(map[v1.NodeConditionType]v1.ConditionStatus) for _, cond := range oldNode.Status.Conditions { oldConditions[cond.Type] = cond.Status } for _, cond := range newNode.Status.Conditions { newConditions[cond.Type] = cond.Status } if oldConditions[v1.NodeMemoryPressure] != newConditions[v1.NodeMemoryPressure] { invalidPredicates.Insert(predicates.CheckNodeMemoryPressurePred) } if oldConditions[v1.NodeDiskPressure] != newConditions[v1.NodeDiskPressure] { invalidPredicates.Insert(predicates.CheckNodeDiskPressurePred) } if oldConditions[v1.NodeReady] != newConditions[v1.NodeReady] || oldConditions[v1.NodeOutOfDisk] != newConditions[v1.NodeOutOfDisk] || oldConditions[v1.NodeNetworkUnavailable] != newConditions[v1.NodeNetworkUnavailable] { invalidPredicates.Insert(predicates.CheckNodeConditionPred) } } if newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable { invalidPredicates.Insert(predicates.CheckNodeConditionPred) } c.equivalencePodCache.InvalidateCachedPredicateItem(newNode.GetName(), invalidPredicates) } }
因此,node update時,會刪除該node對應的Equivalence Cache中如下PredicateKey的PredicateCache:
GeneralPredicates, 前提:node.Status.Allocatable或node labels發生變更.
ServiceAffinity, 前提:node labels發生變更。
MatchInterPodAffinity, 前提:node labels發生變更。
NoVolumeZoneConflict, 前提:failure-domain.beta.kubernetes.io/zone或failure-domain.beta.kubernetes.io/region Annotation發生變更;
PodToleratesNodeTaints, 前提: Node的Taints(對應scheduler.alpha.kubernetes.io/taints Annotation)發生變更.
CheckNodeMemoryPressure, CheckNodeDiskPressure, CheckNodeCondition, 前提:如果對應的Node Condition發生變更。
當發生node delete event時,對應會調用InvalidateAllCachedPredicateItemOfNode更新Equivalence Cache。
// InvalidateAllCachedPredicateItemOfNode marks all cached items on given node as invalid func (ec *EquivalenceCache) InvalidateAllCachedPredicateItemOfNode(nodeName string) { ec.Lock() defer ec.Unlock() delete(ec.algorithmCache, nodeName) glog.V(5).Infof("Done invalidating all cached predicates on node: %s", nodeName) }
因此,node delete時,則會從Equivalence Cache中刪除整個node對應的algorthmCache。
當發生pv add或者delete event時,對應會調用invalidatePredicatesForPv更新Equivalence Cache。
func (c *configFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) { // You could have a PVC that points to a PV, but the PV object doesn't exist. // So when the PV object gets added, we can recount. invalidPredicates := sets.NewString() // PV types which impact MaxPDVolumeCountPredicate if pv.Spec.AWSElasticBlockStore != nil { invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred) } if pv.Spec.GCEPersistentDisk != nil { invalidPredicates.Insert(predicates.MaxGCEPDVolumeCountPred) } if pv.Spec.AzureDisk != nil { invalidPredicates.Insert(predicates.MaxAzureDiskVolumeCountPred) } // If PV contains zone related label, it may impact cached NoVolumeZoneConflict for k := range pv.Labels { if isZoneRegionLabel(k) { invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred) break } } if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { // Add/delete impacts the available PVs to choose from invalidPredicates.Insert(predicates.CheckVolumeBindingPred) } c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates) }
因此,當add或者delete PV時,會從Equivalence Cache中刪除所有nodes的以下predicateKey對應的PredicateCache:
MaxEBSVolumeCount, MaxGCEPDVolumeCount, MaxAzureDiskVolumeCount,前提:PV類型是這三者的范圍內;
當發生pv update event時,對應會調用invalidatePredicatesForPvUpdate更新Equivalence Cache。
func (c *configFactory) invalidatePredicatesForPvUpdate(oldPV, newPV *v1.PersistentVolume) { invalidPredicates := sets.NewString() for k, v := range newPV.Labels { // If PV update modifies the zone/region labels. if isZoneRegionLabel(k) && !reflect.DeepEqual(v, oldPV.Labels[k]) { invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred) break } } c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates) }
因此,當update PV時,會從Equivalence Cache中刪除所有nodes的以下predicateKey對應的PredicateCache:
NoVolumeZoneConflict, 前提:PV的failure-domain.beta.kubernetes.io/zone或failure-domain.beta.kubernetes.io/region Annotation發生變更;
func (c *configFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim) { // We need to do this here because the ecache uses PVC uid as part of equivalence hash of pod // The bound volume type may change invalidPredicates := sets.NewString(maxPDVolumeCountPredicateKeys...) // The bound volume's label may change invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred) if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { // Add/delete impacts the available PVs to choose from invalidPredicates.Insert(predicates.CheckVolumeBindingPred) } c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates) }
當發生pvc add或者delete event時,會從Equivalence Cache中刪除所有nodes的以下predicateKey對應的PredicateCache:
"MaxEBSVolumeCount", "MaxGCEPDVolumeCount", "MaxAzureDiskVolumeCount" PredicateCaches;
NoVolumeZoneConflict PredicateCaches;
CheckVolumeBinding,前提,VolumeScheduling這個Feature Gate是啟用狀態;
func (c *configFactory) invalidatePredicatesForPvcUpdate(old, new *v1.PersistentVolumeClaim) { invalidPredicates := sets.NewString() if old.Spec.VolumeName != new.Spec.VolumeName { if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { // PVC volume binding has changed invalidPredicates.Insert(predicates.CheckVolumeBindingPred) } // The bound volume type may change invalidPredicates.Insert(maxPDVolumeCountPredicateKeys...) } c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates) }
當發生pvc update event時,會從Equivalence Cache中刪除所有nodes的以下predicateKey對應的PredicateCache:
CheckVolumeBinding,前提:VolumeScheduling這個Feature Gate是啟用狀態,并且PVC對應的PV發生變更;
"MaxEBSVolumeCount", "MaxGCEPDVolumeCount", "MaxAzureDiskVolumeCount" PredicateCaches,前提:PVC對應的PV發生變更;
func (c *configFactory) onServiceAdd(obj interface{}) { if c.enableEquivalenceClassCache { c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet) } c.podQueue.MoveAllToActiveQueue() } func (c *configFactory) onServiceDelete(obj interface{}) { if c.enableEquivalenceClassCache { c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet) } c.podQueue.MoveAllToActiveQueue() }
當發生Service Add或Delete event時,會從Equivalence Cache中刪除所有nodes的以下predicateKey對應的PredicateCache:
CheckServiceAffinity;
func (c *configFactory) onServiceUpdate(oldObj interface{}, newObj interface{}) { if c.enableEquivalenceClassCache { // TODO(resouer) We may need to invalidate this for specified group of pods only oldService := oldObj.(*v1.Service) newService := newObj.(*v1.Service) if !reflect.DeepEqual(oldService.Spec.Selector, newService.Spec.Selector) { c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet) } } c.podQueue.MoveAllToActiveQueue() }
當發生Service Update event時,會從Equivalence Cache中刪除所有nodes的以下predicateKey對應的PredicateCache:
CheckServiceAffinity,前提:Service的Selector發生變更。
Equivalence Class Feature最困難的就是如何最優的維護和更新Equivalence Cache,做到每次更新都是最小粒度的、準確無誤的,目前這方面還需優化。
Equivalence Cache只緩存Predicate Result,并不支持Priority Result數據的緩存和維護(社區正在實現基于Map-Reduce方式優化),通常情況下,Priority Funcs的處理邏輯要比Predicate Funcs復雜,支持的意義就更大。
Equivalence Class目前只能根據Pod對應的OwnerReference和PVC信息進行Equivalence Hash,如果能摒棄OwnerReference的考慮,充分考慮Pod spec中那些核心的field,比如resource request, Labels,Affinity等,緩存命中的幾率可能會大的多,Predicate的性能就能得到更顯著的提升。
到此,相信大家對“kubernetes提升Scheduler吞吐量的工作機制是什么”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。