您好,登錄后才能下訂單哦!
本篇內容主要講解“Kubernetes Replication Controller的結構定義是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Kubernetes Replication Controller的結構定義是什么”吧!
ReplicationManager就是ReplicationController控制器對象,方便在代碼中和ReplicationController Resource API Object進行區分。下面代碼是ReplicationManager的結構定義。
pkg/controller/replication/replication_controller.go:75 // ReplicationManager is responsible for synchronizing ReplicationController objects stored in the system with actual running pods. type ReplicationManager struct { kubeClient clientset.Interface podControl controller.PodControlInterface // internalPodInformer is used to hold a personal informer. If we're using // a normal shared informer, then the informer will be started for us. If // we have a personal informer, we must start it ourselves. If you start // the controller using NewReplicationManager(passing SharedInformer), this // will be null internalPodInformer cache.SharedIndexInformer // An rc is temporarily suspended after creating/deleting these many replicas. // It resumes normal action after observing the watch events for them. burstReplicas int // To allow injection of syncReplicationController for testing. syncHandler func(rcKey string) error // A TTLCache of pod creates/deletes each rc expects to see. expectations *controller.UIDTrackingControllerExpectations // A store of replication controllers, populated by the rcController rcStore cache.StoreToReplicationControllerLister // Watches changes to all replication controllers rcController *cache.Controller // A store of pods, populated by the podController podStore cache.StoreToPodLister // Watches changes to all pods podController cache.ControllerInterface // podStoreSynced returns true if the pod store has been synced at least once. // Added as a member to the struct to allow injection for testing. podStoreSynced func() bool lookupCache *controller.MatchingCache // Controllers that need to be synced queue workqueue.RateLimitingInterface // garbageCollectorEnabled denotes if the garbage collector is enabled. RC // manager behaves differently if GC is enabled. garbageCollectorEnabled bool }
重點對下面個幾個對象介紹說明:
podControl: 提供Create/Delete Pod的操作接口。
burstReplicas: 每次批量Create/Delete Pods時允許并發的最大數量。
syncHandler: 真正執行Replica Sync的函數。
expectation: 維護的期望狀態下的Pod的Uid Cache,并且提供了修正該Cache的接口。
rcStore: ReplicationController Resource對象的Indexer,數據由rcController提供和維護。
rcController: 用來watch 所有 ReplicationController Resource,watch到的change更新到rcStore中。
podStore: Pod的Indexer,數據由podController提供和維護。
podController: 用來watch所有Pod Resource,watch到的change更新到podStore中。
queue: 用來存放待sync的RC,是一個RateLimit類型的queue。
lookupCache: 提供Pod和RC匹配信息的cache,以提高查詢效率。
看過我我的博文: Kubernetes ResourceQuota Controller內部實現原理及源碼分析的可能有印象,里面也提到了controller manager是如何啟動ResourceQuotaController的,ReplicationController也是一樣的。在kube-controller-manager調用newControllerInitializers進行控制器初始化的時候,將startReplicationController注冊進去了,用來啟動ReplicationController控制器。
cmd/kube-controller-manager/app/controllermanager.go:224 func newControllerInitializers() map[string]InitFunc { controllers := map[string]InitFunc{} controllers["endpoint"] = startEndpointController controllers["replicationcontroller"] = startReplicationController controllers["podgc"] = startPodGCController controllers["resourcequota"] = startResourceQuotaController controllers["namespace"] = startNamespaceController controllers["serviceaccount"] = startServiceAccountController controllers["garbagecollector"] = startGarbageCollectorController controllers["daemonset"] = startDaemonSetController controllers["job"] = startJobController controllers["deployment"] = startDeploymentController controllers["replicaset"] = startReplicaSetController controllers["horizontalpodautoscaling"] = startHPAController controllers["disruption"] = startDisruptionController controllers["statefuleset"] = startStatefulSetController controllers["cronjob"] = startCronJobController controllers["certificatesigningrequests"] = startCSRController return controllers }
代碼繼續跟到startReplicationController,很簡單,啟動一個goroutine,調用replicationcontroller.NewReplicationManager創建一個ReplicationManager并執行其中Run方法開始工作。
cmd/kube-controller-manager/app/core.go:55 func startReplicationController(ctx ControllerContext) (bool, error) { go replicationcontroller.NewReplicationManager( ctx.InformerFactory.Pods().Informer(), ctx.ClientBuilder.ClientOrDie("replication-controller"), ResyncPeriod(&ctx.Options), replicationcontroller.BurstReplicas, int(ctx.Options.LookupCacheSizeForRC), ctx.Options.EnableGarbageCollector, ).Run(int(ctx.Options.ConcurrentRCSyncs), ctx.Stop) return true, nil }
上面分析到,controller-manager通過NewReplicationManager創建一個ReplicationManager對象,其實就是ReplicationController控制器。
pkg/controller/replication/replication_controller.go:122 // NewReplicationManager creates a replication manager func NewReplicationManager(podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicationManager { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.Core().Events("")}) return newReplicationManager( eventBroadcaster.NewRecorder(v1.EventSource{Component: "replication-controller"}), podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled) } pkg/controller/replication/replication_controller.go:132 // newReplicationManager configures a replication manager with the specified event recorder func newReplicationManager(eventRecorder record.EventRecorder, podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicationManager { if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil { metrics.RegisterMetricAndTrackRateLimiterUsage("replication_controller", kubeClient.Core().RESTClient().GetRateLimiter()) } rm := &ReplicationManager{ kubeClient: kubeClient, podControl: controller.RealPodControl{ KubeClient: kubeClient, Recorder: eventRecorder, }, burstReplicas: burstReplicas, expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "replicationmanager"), garbageCollectorEnabled: garbageCollectorEnabled, } rm.rcStore.Indexer, rm.rcController = cache.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options v1.ListOptions) (runtime.Object, error) { return rm.kubeClient.Core().ReplicationControllers(v1.NamespaceAll).List(options) }, WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { return rm.kubeClient.Core().ReplicationControllers(v1.NamespaceAll).Watch(options) }, }, &v1.ReplicationController{}, // TODO: Can we have much longer period here? FullControllerResyncPeriod, cache.ResourceEventHandlerFuncs{ AddFunc: rm.enqueueController, UpdateFunc: rm.updateRC, // This will enter the sync loop and no-op, because the controller has been deleted from the store. // Note that deleting a controller immediately after scaling it to 0 will not work. The recommended // way of achieving this is by performing a `stop` operation on the controller. DeleteFunc: rm.enqueueController, }, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: rm.addPod, // This invokes the rc for every pod change, eg: host assignment. Though this might seem like overkill // the most frequent pod update is status, and the associated rc will only list from local storage, so // it should be ok. UpdateFunc: rm.updatePod, DeleteFunc: rm.deletePod, }) rm.podStore.Indexer = podInformer.GetIndexer() rm.podController = podInformer.GetController() rm.syncHandler = rm.syncReplicationController rm.podStoreSynced = rm.podController.HasSynced rm.lookupCache = controller.NewMatchingCache(lookupCacheSize) return rm }
newReplicationManager中主要配置ReplicationManager,比如:
通過workqueue.NewNamedRateLimitingQueue配置queue。
通過controller.NewUIDTrackingControllerExpectations配置expectations。
配置rcStore, podStore, rcController, podController。
配置syncHandler為rm.syncReplicationController,這個很重要,所以我單獨列出來說。在后面會講到,syncReplicationController就是做核心工作的的方法,可以說Replica的自動維護都是由它來完成的。
ReplicationManager創建好了,接下來得干活啦。Run方法就是干活的起步點,開始進行watching and syncing。
pkg/controller/replication/replication_controller.go:217 // Run begins watching and syncing. func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() glog.Infof("Starting RC Manager") go rm.rcController.Run(stopCh) go rm.podController.Run(stopCh) for i := 0; i < workers; i++ { go wait.Until(rm.worker, time.Second, stopCh) } if rm.internalPodInformer != nil { go rm.internalPodInformer.Run(stopCh) } <-stopCh glog.Infof("Shutting down RC Manager") rm.queue.ShutDown() }
watching
go rm.rcController.Run(stopCh)
負責watch all rc。
go rm.podController.Run(stopCh)
負責watch all pod。
syncing
啟動workers數量的goroutine。
每個goroutine都不斷循環執行rm.worker,每個循環之間停留1s。而rm.worker就是負責從queue中獲取rc并調用syncHandler進行同步。
每個goroutine直到收到stopCh信號才結束。
下面是rcController和podController的Run方法實現,功能就是完成rc / pod的watch。
pkg/client/cache/controller.go:84 // Run begins processing items, and will continue until a value is sent down stopCh. // It's an error to call Run more than once. // Run blocks; call via go. func (c *Controller) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() r := NewReflector( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, c.config.FullResyncPeriod, ) c.reflectorMutex.Lock() c.reflector = r c.reflectorMutex.Unlock() r.RunUntil(stopCh) wait.Until(c.processLoop, time.Second, stopCh) }
sync的關鍵實現,就在ReplicationManager的worker方法中,代碼如下。
pkg/controller/replication/replication_controller.go:488 // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func (rm *ReplicationManager) worker() { workFunc := func() bool { key, quit := rm.queue.Get() if quit { return true } defer rm.queue.Done(key) err := rm.syncHandler(key.(string)) if err == nil { rm.queue.Forget(key) return false } rm.queue.AddRateLimited(key) utilruntime.HandleError(err) return false } for { if quit := workFunc(); quit { glog.Infof("replication controller worker shutting down") return } } }
worker中的主要邏輯為:
從rm的RateLimited Queue中獲取一個rc的key。
調用syncHandler Interface,對該rc進行sync。
在newReplicationManager時,通過rm.syncHandler = rm.syncReplicationController
注冊syncHandler為syncReplicationController了。因此sync rc的邏輯就在syncReplicationController中了。
pkg/controller/replication/replication_controller.go:639 // syncReplicationController will sync the rc with the given key if it has had its expectations fulfilled, meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked concurrently with the same key. func (rm *ReplicationManager) syncReplicationController(key string) error { trace := util.NewTrace("syncReplicationController: " + key) defer trace.LogIfLong(250 * time.Millisecond) startTime := time.Now() defer func() { glog.V(4).Infof("Finished syncing controller %q (%v)", key, time.Now().Sub(startTime)) }() if !rm.podStoreSynced() { // Sleep so we give the pod reflector goroutine a chance to run. time.Sleep(PodStoreSyncedPollPeriod) glog.Infof("Waiting for pods controller to sync, requeuing rc %v", key) rm.queue.Add(key) return nil } obj, exists, err := rm.rcStore.Indexer.GetByKey(key) if !exists { glog.Infof("Replication Controller has been deleted %v", key) rm.expectations.DeleteExpectations(key) return nil } if err != nil { return err } rc := *obj.(*v1.ReplicationController) trace.Step("ReplicationController restored") rcNeedsSync := rm.expectations.SatisfiedExpectations(key) trace.Step("Expectations restored") // NOTE: filteredPods are pointing to objects from cache - if you need to // modify them, you need to copy it first. // TODO: Do the List and Filter in a single pass, or use an index. var filteredPods []*v1.Pod if rm.garbageCollectorEnabled { // list all pods to include the pods that don't match the rc's selector // anymore but has the stale controller ref. pods, err := rm.podStore.Pods(rc.Namespace).List(labels.Everything()) if err != nil { glog.Errorf("Error getting pods for rc %q: %v", key, err) rm.queue.Add(key) return err } cm := controller.NewPodControllerRefManager(rm.podControl, rc.ObjectMeta, labels.Set(rc.Spec.Selector).AsSelectorPreValidated(), getRCKind()) matchesAndControlled, matchesNeedsController, controlledDoesNotMatch := cm.Classify(pods) // Adopt pods only if this replication controller is not going to be deleted. if rc.DeletionTimestamp == nil { for _, pod := range matchesNeedsController { err := cm.AdoptPod(pod) // continue to next pod if adoption fails. if err != nil { // If the pod no longer exists, don't even log the error. if !errors.IsNotFound(err) { utilruntime.HandleError(err) } } else { matchesAndControlled = append(matchesAndControlled, pod) } } } filteredPods = matchesAndControlled // remove the controllerRef for the pods that no longer have matching labels var errlist []error for _, pod := range controlledDoesNotMatch { err := cm.ReleasePod(pod) if err != nil { errlist = append(errlist, err) } } if len(errlist) != 0 { aggregate := utilerrors.NewAggregate(errlist) // push the RC into work queue again. We need to try to free the // pods again otherwise they will stuck with the stale // controllerRef. rm.queue.Add(key) return aggregate } } else { pods, err := rm.podStore.Pods(rc.Namespace).List(labels.Set(rc.Spec.Selector).AsSelectorPreValidated()) if err != nil { glog.Errorf("Error getting pods for rc %q: %v", key, err) rm.queue.Add(key) return err } filteredPods = controller.FilterActivePods(pods) } var manageReplicasErr error if rcNeedsSync && rc.DeletionTimestamp == nil { manageReplicasErr = rm.manageReplicas(filteredPods, &rc) } trace.Step("manageReplicas done") newStatus := calculateStatus(rc, filteredPods, manageReplicasErr) // Always updates status as pods come up or die. if err := updateReplicationControllerStatus(rm.kubeClient.Core().ReplicationControllers(rc.Namespace), rc, newStatus); err != nil { // Multiple things could lead to this update failing. Returning an error causes a requeue without forcing a hotloop return err } return manageReplicasErr }
syncReplicationController的主要邏輯為:
如果podStore還沒有被同步過一次,則將該rc的key重新加入到queue中,以等待podStore同步,流程結束,否則繼續后面的流程。
根據該rc的key值,從rcStore中獲取對應的rc object,如果不存在該rc object,則說明該rc已經被刪除了,然后根據key從epectations中刪除該rc并返回,流程結束。如果存在該rc object,則繼續后面的流程。
檢測expectations中的add和del以及距離上一個時間戳是否超時5min,來判斷該rc是否需要sync。
如果啟動了GC,則獲取podStore中整個namespace下的pods,然后將matchesAndControlled和matchesNeedsController的pods作為過濾后待同步的filteredPods。如果沒有啟動GC,則直接獲取podStore中該namespace下匹配rc.Spec.Selector的Active狀態的pods作為過濾后待同步的filteredPods。(關于matchesAndControlled和matchesNeedsController的理解,請參考pkg/controller/controller_ref_manager.go:57中定義的PodControllerRefManager.Classify函數)
如果第3步中檢測到該rc需要sync,并且DeletionTimestamp這個時間戳為nil,則調用manageReplicas方法,使得該rc管理的active狀態的pods數量和期望值一樣。
執行完manageReplicas后,需要馬上重新計算一下rc的status,更新status中的Conditions,Replicas,FullyLabeledReplicas,ReadyReplicas,AvailableReplicas信息。
通過updateReplicationControllerStatus方法調用kube-api-server的接口更新該rc的status為上一步重新計算后的新status,流程結束。
上面描述的syncReplicationController流程中,一個很關鍵的步驟是step 5中調用的manageReplicas方法,它負責rc對應replicas的修復工作(add or delete)。
pkg/controller/replication/replication_controller.go:516 // manageReplicas checks and updates replicas for the given replication controller. // Does NOT modify <filteredPods>. func (rm *ReplicationManager) manageReplicas(filteredPods []*v1.Pod, rc *v1.ReplicationController) error { diff := len(filteredPods) - int(*(rc.Spec.Replicas)) rcKey, err := controller.KeyFunc(rc) if err != nil { return err } if diff == 0 { return nil } if diff < 0 { diff *= -1 if diff > rm.burstReplicas { diff = rm.burstReplicas } // TODO: Track UIDs of creates just like deletes. The problem currently // is we'd need to wait on the result of a create to record the pod's // UID, which would require locking *across* the create, which will turn // into a performance bottleneck. We should generate a UID for the pod // beforehand and store it via ExpectCreations. errCh := make(chan error, diff) rm.expectations.ExpectCreations(rcKey, diff) var wg sync.WaitGroup wg.Add(diff) glog.V(2).Infof("Too few %q/%q replicas, need %d, creating %d", rc.Namespace, rc.Name, *(rc.Spec.Replicas), diff) for i := 0; i < diff; i++ { go func() { defer wg.Done() var err error if rm.garbageCollectorEnabled { var trueVar = true controllerRef := &metav1.OwnerReference{ APIVersion: getRCKind().GroupVersion().String(), Kind: getRCKind().Kind, Name: rc.Name, UID: rc.UID, Controller: &trueVar, } err = rm.podControl.CreatePodsWithControllerRef(rc.Namespace, rc.Spec.Template, rc, controllerRef) } else { err = rm.podControl.CreatePods(rc.Namespace, rc.Spec.Template, rc) } if err != nil { // Decrement the expected number of creates because the informer won't observe this pod glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name) rm.expectations.CreationObserved(rcKey) errCh <- err utilruntime.HandleError(err) } }() } wg.Wait() select { case err := <-errCh: // all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit. if err != nil { return err } default: } return nil } if diff > rm.burstReplicas { diff = rm.burstReplicas } glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rc.Namespace, rc.Name, *(rc.Spec.Replicas), diff) // No need to sort pods if we are about to delete all of them if *(rc.Spec.Replicas) != 0 { // Sort the pods in the order such that not-ready < ready, unscheduled // < scheduled, and pending < running. This ensures that we delete pods // in the earlier stages whenever possible. sort.Sort(controller.ActivePods(filteredPods)) } // Snapshot the UIDs (ns/name) of the pods we're expecting to see // deleted, so we know to record their expectations exactly once either // when we see it as an update of the deletion timestamp, or as a delete. // Note that if the labels on a pod/rc change in a way that the pod gets // orphaned, the rs will only wake up after the expectations have // expired even if other pods are deleted. deletedPodKeys := []string{} for i := 0; i < diff; i++ { deletedPodKeys = append(deletedPodKeys, controller.PodKey(filteredPods[i])) } // We use pod namespace/name as a UID to wait for deletions, so if the // labels on a pod/rc change in a way that the pod gets orphaned, the // rc will only wake up after the expectation has expired. errCh := make(chan error, diff) rm.expectations.ExpectDeletions(rcKey, deletedPodKeys) var wg sync.WaitGroup wg.Add(diff) for i := 0; i < diff; i++ { go func(ix int) { defer wg.Done() if err := rm.podControl.DeletePod(rc.Namespace, filteredPods[ix].Name, rc); err != nil { // Decrement the expected number of deletes because the informer won't observe this deletion podKey := controller.PodKey(filteredPods[ix]) glog.V(2).Infof("Failed to delete %v due to %v, decrementing expectations for controller %q/%q", podKey, err, rc.Namespace, rc.Name) rm.expectations.DeletionObserved(rcKey, podKey) errCh <- err utilruntime.HandleError(err) } }(i) } wg.Wait() select { case err := <-errCh: // all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit. if err != nil { return err } default: } return nil }
上面manageReplicas代碼的主要邏輯為:
首先計算filteredPods中Pods數量和rc.Spec.Replicas中定義的期望數量的差值diff。
如果差值diff為0,表示當前狀態和期望狀態一樣,直接返回,流程結束。
如果差值diff為負數,表示當前Active狀態的Pods數量不足,則啟動下面流程:
比較|diff|
和burstReplicas的值,以保證這次最多只創建burstReplicas數量的pods。
調用expectations.ExpectCreations接口設置expectations中的add大小為|diff|
的值,表示要新創建|diff|
數量的pods以達到期望狀態。
sync.WaitGroup啟動|diff|
數量的goroutine協程,每個goroutine分別負責調用podControl.CreatePods接口創建一個該namespace.rc管理的對應spec Template的pod。
待所有goroutine都執行完畢后,如果其中一個或者多個pod創建失敗,則返回err,否則返回nil,流程結束。
如果差值diff為正數,表示當前Active狀態的Pods數量超過了期望值,則啟動下面流程:
比較|diff|
和burstReplicas的值,以保證這次最多只刪除burstReplicas數量的pods。
對filteredPods中的pods進行排序,排序目的是:not-ready < ready, unscheduled < scheduled, and pending < running,讓stages越早的pods優先被delete。
排序完之后,挑選前面|diff|
個pods作為待delete的Pods。
調用expectations.ExpectDeletions接口設置expectations中的del大小為|diff|
的值,表示要新刪除|diff|
數量的pods以達到期望狀態。
sync.WaitGroup啟動|diff|
數量的goroutine協程,每個goroutine分別負責調用podControl.DeletePod接口刪除待delete Pods中的一個Pod。
待所有goroutine都執行完畢后,如果其中一個或者多個pod刪除失敗,則返回err,否則返回nil,流程結束。
到此,相信大家對“Kubernetes Replication Controller的結構定義是什么”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。