您好,登錄后才能下訂單哦!
如何進行main方法與Leader選舉分析,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。
主要對main方法的主要邏輯進行分析,以及分析下組件的EventHandler,看該組件list/watch哪些對象,對象事件來了怎么處理,以及claimQueue與volumeQueue的對象來源。
main方法主要邏輯:
(1)解析啟動參數;
(2)根據配置建立clientset;
(3)建立grpcclient;
(4)進行grpc探測(探測cephcsi-rbd服務是否準備好),直至探測成功;
(5)通過grpc獲取driver名稱與能力;
(6)根據clientset建立informers;
(7)構建provisionController對象;
(8)定義run方法(包括了provisionController.Run);
(9)根據--enable-leader-election
組件啟動參數配置決定是否開啟Leader 選舉,當不開啟時,直接運行run方法,開啟時調用le.Run()。
func main() { var config *rest.Config var err error flag.Var(utilflag.NewMapStringBool(&featureGates), "feature-gates", "A set of key=value pairs that describe feature gates for alpha/experimental features. "+ "Options are:\n"+strings.Join(utilfeature.DefaultFeatureGate.KnownFeatures(), "\n")) klog.InitFlags(nil) flag.CommandLine.AddGoFlagSet(goflag.CommandLine) flag.Set("logtostderr", "true") flag.Parse() if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(featureGates); err != nil { klog.Fatal(err) } if *showVersion { fmt.Println(os.Args[0], version) os.Exit(0) } klog.Infof("Version: %s", version) // get the KUBECONFIG from env if specified (useful for local/debug cluster) kubeconfigEnv := os.Getenv("KUBECONFIG") if kubeconfigEnv != "" { klog.Infof("Found KUBECONFIG environment variable set, using that..") kubeconfig = &kubeconfigEnv } if *master != "" || *kubeconfig != "" { klog.Infof("Either master or kubeconfig specified. building kube config from that..") config, err = clientcmd.BuildConfigFromFlags(*master, *kubeconfig) } else { klog.Infof("Building kube configs for running in cluster...") config, err = rest.InClusterConfig() } if err != nil { klog.Fatalf("Failed to create config: %v", err) } clientset, err := kubernetes.NewForConfig(config) if err != nil { klog.Fatalf("Failed to create client: %v", err) } // snapclientset.NewForConfig creates a new Clientset for VolumesnapshotV1beta1Client snapClient, err := snapclientset.NewForConfig(config) if err != nil { klog.Fatalf("Failed to create snapshot client: %v", err) } // The controller needs to know what the server version is because out-of-tree // provisioners aren't officially supported until 1.5 serverVersion, err := clientset.Discovery().ServerVersion() if err != nil { klog.Fatalf("Error getting server version: %v", err) } metricsManager := metrics.NewCSIMetricsManager("" /* driverName */) grpcClient, err := ctrl.Connect(*csiEndpoint, metricsManager) if err != nil { klog.Error(err.Error()) os.Exit(1) } // 循環探測,直至CSI driver即cephcsi-rbd服務準備好 err = ctrl.Probe(grpcClient, *operationTimeout) if err != nil { klog.Error(err.Error()) os.Exit(1) } // 從ceph-csi組件中獲取driver name provisionerName, err := ctrl.GetDriverName(grpcClient, *operationTimeout) if err != nil { klog.Fatalf("Error getting CSI driver name: %s", err) } klog.V(2).Infof("Detected CSI driver %s", provisionerName) metricsManager.SetDriverName(provisionerName) metricsManager.StartMetricsEndpoint(*metricsAddress, *metricsPath) // 從ceph-csi組件中獲取driver能力 pluginCapabilities, controllerCapabilities, err := ctrl.GetDriverCapabilities(grpcClient, *operationTimeout) if err != nil { klog.Fatalf("Error getting CSI driver capabilities: %s", err) } // Generate a unique ID for this provisioner timeStamp := time.Now().UnixNano() / int64(time.Millisecond) identity := strconv.FormatInt(timeStamp, 10) + "-" + strconv.Itoa(rand.Intn(10000)) + "-" + provisionerName // 開始構建infomer factory := informers.NewSharedInformerFactory(clientset, ctrl.ResyncPeriodOfCsiNodeInformer) // ------------------------------- // Listers // Create informer to prevent hit the API server for all resource request scLister := factory.Storage().V1().StorageClasses().Lister() claimLister := factory.Core().V1().PersistentVolumeClaims().Lister() var csiNodeLister storagelistersv1beta1.CSINodeLister var nodeLister v1.NodeLister if ctrl.SupportsTopology(pluginCapabilities) { csiNodeLister = factory.Storage().V1beta1().CSINodes().Lister() nodeLister = factory.Core().V1().Nodes().Lister() } // ------------------------------- // PersistentVolumeClaims informer rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax) claimQueue := workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims") claimInformer := factory.Core().V1().PersistentVolumeClaims().Informer() // Setup options provisionerOptions := []func(*controller.ProvisionController) error{ controller.LeaderElection(false), // Always disable leader election in provisioner lib. Leader election should be done here in the CSI provisioner level instead. controller.FailedProvisionThreshold(0), controller.FailedDeleteThreshold(0), controller.RateLimiter(rateLimiter), controller.Threadiness(int(*workerThreads)), controller.CreateProvisionedPVLimiter(workqueue.DefaultControllerRateLimiter()), controller.ClaimsInformer(claimInformer), } translator := csitrans.New() supportsMigrationFromInTreePluginName := "" if translator.IsMigratedCSIDriverByName(provisionerName) { supportsMigrationFromInTreePluginName, err = translator.GetInTreeNameFromCSIName(provisionerName) if err != nil { klog.Fatalf("Failed to get InTree plugin name for migrated CSI plugin %s: %v", provisionerName, err) } klog.V(2).Infof("Supports migration from in-tree plugin: %s", supportsMigrationFromInTreePluginName) provisionerOptions = append(provisionerOptions, controller.AdditionalProvisionerNames([]string{supportsMigrationFromInTreePluginName})) } // Create the provisioner: it implements the Provisioner interface expected by // the controller csiProvisioner := ctrl.NewCSIProvisioner( clientset, *operationTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient, provisionerName, pluginCapabilities, controllerCapabilities, supportsMigrationFromInTreePluginName, *strictTopology, translator, scLister, csiNodeLister, nodeLister, claimLister, *extraCreateMetadata, ) provisionController = controller.NewProvisionController( clientset, provisionerName, csiProvisioner, serverVersion.GitVersion, provisionerOptions..., ) csiClaimController := ctrl.NewCloningProtectionController( clientset, claimLister, claimInformer, claimQueue, ) // 主循環函數 run := func(context.Context) { stopCh := context.Background().Done() factory.Start(stopCh) cacheSyncResult := factory.WaitForCacheSync(stopCh) for _, v := range cacheSyncResult { if !v { klog.Fatalf("Failed to sync Informers!") } } // 跑兩個controller,后面主要分析provisionController go csiClaimController.Run(int(*finalizerThreads), stopCh) provisionController.Run(wait.NeverStop) } // Leader 選舉相關 if !*enableLeaderElection { run(context.TODO()) } else { // this lock name pattern is also copied from sigs.k8s.io/sig-storage-lib-external-provisioner/v5/controller // to preserve backwards compatibility lockName := strings.Replace(provisionerName, "/", "-", -1) // 使用endpoints或leases資源對象來選leader var le leaderElection if *leaderElectionType == "endpoints" { klog.Warning("The 'endpoints' leader election type is deprecated and will be removed in a future release. Use '--leader-election-type=leases' instead.") le = leaderelection.NewLeaderElectionWithEndpoints(clientset, lockName, run) } else if *leaderElectionType == "leases" { le = leaderelection.NewLeaderElection(clientset, lockName, run) } else { klog.Error("--leader-election-type must be either 'endpoints' or 'leases'") os.Exit(1) } if *leaderElectionNamespace != "" { le.WithNamespace(*leaderElectionNamespace) } // 處理Leader 選舉邏輯的方法 if err := le.Run(); err != nil { klog.Fatalf("failed to initialize leader election: %v", err) } } }
主要看到EventHandler,定義了該組件list/watch的對象,對象事件來了怎么處理,以及claimQueue與volumeQueue的對象來源。
可以看到,claimQueue的來源是pvc對象的新增、更新事件(對claimQueue的處理已在external-provisioner源碼分析(1)-主體處理邏輯分析中講過,忘了的話可以回頭看下)。
... // PersistentVolumeClaims claimHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { controller.enqueueClaim(obj) }, UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueClaim(newObj) }, DeleteFunc: func(obj interface{}) { // NOOP. The claim is either in claimsInProgress and in the queue, so it will be processed as usual // or it's not in claimsInProgress and then we don't care }, } if controller.claimInformer != nil { controller.claimInformer.AddEventHandlerWithResyncPeriod(claimHandler, controller.resyncPeriod) } else { controller.claimInformer = informer.Core().V1().PersistentVolumeClaims().Informer() controller.claimInformer.AddEventHandler(claimHandler) } ...
// enqueueClaim takes an obj and converts it into UID that is then put onto claim work queue. func (ctrl *ProvisionController) enqueueClaim(obj interface{}) { uid, err := getObjectUID(obj) if err != nil { utilruntime.HandleError(err) return } if ctrl.claimQueue.NumRequeues(uid) == 0 { ctrl.claimQueue.Add(uid) } }
可以看到,volumeQueue的來源是pv對象的新增、更新事件(對volumeQueue的處理已在external-provisioner源碼分析(1)-主體處理邏輯分析中講過,忘了的話可以回頭看下)。
... // PersistentVolumes volumeHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { controller.enqueueVolume(obj) }, UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueVolume(newObj) }, DeleteFunc: func(obj interface{}) { controller.forgetVolume(obj) }, } if controller.volumeInformer != nil { controller.volumeInformer.AddEventHandlerWithResyncPeriod(volumeHandler, controller.resyncPeriod) } else { controller.volumeInformer = informer.Core().V1().PersistentVolumes().Informer() controller.volumeInformer.AddEventHandler(volumeHandler) } ...
// enqueueVolume takes an obj and converts it into a namespace/name string which // is then put onto the given work queue. func (ctrl *ProvisionController) enqueueVolume(obj interface{}) { var key string var err error if key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err != nil { utilruntime.HandleError(err) return } // Re-Adding is harmless but try to add it to the queue only if it is not // already there, because if it is already there we *must* be retrying it if ctrl.volumeQueue.NumRequeues(key) == 0 { ctrl.volumeQueue.Add(key) } }
// forgetVolume Forgets an obj from the given work queue, telling the queue to // stop tracking its retries because e.g. the obj was deleted func (ctrl *ProvisionController) forgetVolume(obj interface{}) { var key string var err error if key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err != nil { utilruntime.HandleError(err) return } ctrl.volumeQueue.Forget(key) ctrl.volumeQueue.Done(key) }
在 Golang 中,k8s client-go 這個package 針對 Leader 相關功能進行了封裝,支持3種鎖資源,endpoint,configmap,lease,方便使用。
Leader 選舉基本原理其實就是利用通過Kubernetes中 configmap , endpoints 或者 lease 資源實現一個分布式鎖,搶(acqure)到鎖的節點成為leader,并且定期更新(renew)。其他進程也在不斷的嘗試進行搶占,搶占不到則繼續等待下次循環。當leader節點掛掉之后,租約到期,其他節點就成為新的leader。
搶到鎖其實就是成功把該進程的相關信息(如進程唯一標識)寫入configmap、endpoints 或者 lease 資源對象中;而定期更新其實就是定期更新該資源的鎖更新時間,以延續租期。
多個進程同時獲取鎖(更新鎖資源)時,由apiserver來保證鎖資源update的原子操作,通過對比resourceVersion版本號(resourceVersion的取值最終來源于etcd的modifiedindex),保證只有一個進程能修改成功,也即只有一個進程能成功獲取到鎖。
鎖示例如下:
apiVersion: coordination.k8s.io/v1 kind: Lease metadata: creationTimestamp: "2020-08-21T11:56:46Z" name: rbd-csi-ceph-com namespace: default resourceVersion: "69642798" selfLink: /apis/coordination.k8s.io/v1/namespaces/default/leases/rbd-csi-ceph-com uid: c9a7ea00-c000-4c5c-b90f-d0e7c85240ca spec: acquireTime: "2020-08-21T11:56:46.907075Z" holderIdentity: cld-dnode3-1091-i-nease-net leaseDurationSeconds: 15 leaseTransitions: 0 renewTime: "2020-09-07T02:38:24.587170Z"
其中holderIdentity
記錄了獲取到鎖的進程信息,renewTime
記錄了鎖更新時間。
從main方法代碼中可以看出,在external-provisioner組件中,僅支持endpoint與lease兩種鎖資源,且endpoints鎖會在后續被棄用,所以建議使用lease鎖。
external-provisioner組件的高可用選主邏輯與k8s中的kube-controller-manager、kube-scheduler等組件的高可用選主邏輯類似。
概要過程:
(1)組件啟動時,定期循環的去獲取lease鎖,獲取成功則成為leader且返回,否則一直阻塞;
(2)獲取lease鎖成功,則競選leader成功,然后運行external-provisioner組件的主體處理邏輯;
(3)競選leader成功后,繼續定期循環續約,以保證leader的長久性。
下面進行代碼的分析。
當--enable-leader-election
組件啟動參數為true時,運行該方法,主要邏輯為:
(1)定義leaderConfig結構體;
(2)調用leaderelection.RunOrDie做進一步的選主邏輯處理。
func (l *leaderElection) Run() error { if l.identity == "" { id, err := defaultLeaderElectionIdentity() if err != nil { return fmt.Errorf("error getting the default leader identity: %v", err) } l.identity = id } if l.namespace == "" { l.namespace = inClusterNamespace() } broadcaster := record.NewBroadcaster() broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: l.clientset.CoreV1().Events(l.namespace)}) eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("%s/%s", l.lockName, string(l.identity))}) rlConfig := resourcelock.ResourceLockConfig{ Identity: sanitizeName(l.identity), EventRecorder: eventRecorder, } lock, err := resourcelock.New(l.resourceLock, l.namespace, sanitizeName(l.lockName), l.clientset.CoreV1(), l.clientset.CoordinationV1(), rlConfig) if err != nil { return err } leaderConfig := leaderelection.LeaderElectionConfig{ Lock: lock, LeaseDuration: l.leaseDuration, RenewDeadline: l.renewDeadline, RetryPeriod: l.retryPeriod, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { klog.V(2).Info("became leader, starting") l.runFunc(ctx) }, OnStoppedLeading: func() { klog.Fatal("stopped leading") }, OnNewLeader: func(identity string) { klog.V(3).Infof("new leader detected, current leader: %s", identity) }, }, } leaderelection.RunOrDie(context.TODO(), leaderConfig) return nil // should never reach here }
主要邏輯:
(1)調用le.acquire()方法來嘗試競選為leader(acquire方法會定期循環的去獲取lease鎖,獲取成功則成為leader且返回,否則一直阻塞);
(2)競選leader成功,運行run方法;
(3)調用le.renew()續約方法,定期循環續約。
// RunOrDie starts a client with the provided config or panics if the config // fails to validate. func RunOrDie(ctx context.Context, lec LeaderElectionConfig) { le, err := NewLeaderElector(lec) if err != nil { panic(err) } if lec.WatchDog != nil { lec.WatchDog.SetLeaderElection(le) } le.Run(ctx) } // Run starts the leader election loop func (le *LeaderElector) Run(ctx context.Context) { defer func() { runtime.HandleCrash() le.config.Callbacks.OnStoppedLeading() }() if !le.acquire(ctx) { return // ctx signalled done } ctx, cancel := context.WithCancel(ctx) defer cancel() go le.config.Callbacks.OnStartedLeading(ctx) le.renew(ctx) } // acquire會不斷循環的去獲取lease鎖,獲取成功則成為leader且返回 // acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds. // Returns false if ctx signals done. func (le *LeaderElector) acquire(ctx context.Context) bool { ctx, cancel := context.WithCancel(ctx) defer cancel() succeeded := false desc := le.config.Lock.Describe() klog.Infof("attempting to acquire leader lease %v...", desc) wait.JitterUntil(func() { succeeded = le.tryAcquireOrRenew() le.maybeReportTransition() if !succeeded { klog.V(4).Infof("failed to acquire lease %v", desc) return } le.config.Lock.RecordEvent("became leader") le.metrics.leaderOn(le.config.Name) klog.Infof("successfully acquired lease %v", desc) cancel() }, le.config.RetryPeriod, JitterFactor, true, ctx.Done()) return succeeded } // 續約方法,不斷循環續約 // renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done. func (le *LeaderElector) renew(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) defer cancel() wait.Until(func() { timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline) defer timeoutCancel() err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) { done := make(chan bool, 1) go func() { defer close(done) done <- le.tryAcquireOrRenew() }() select { case <-timeoutCtx.Done(): return false, fmt.Errorf("failed to tryAcquireOrRenew %s", timeoutCtx.Err()) case result := <-done: return result, nil } }, timeoutCtx.Done()) le.maybeReportTransition() desc := le.config.Lock.Describe() if err == nil { klog.V(5).Infof("successfully renewed lease %v", desc) return } le.config.Lock.RecordEvent("stopped leading") le.metrics.leaderOff(le.config.Name) klog.Infof("failed to renew lease %v: %v", desc, err) cancel() }, le.config.RetryPeriod, ctx.Done()) // if we hold the lease, give it up if le.config.ReleaseOnCancel { le.release() } }
看完上述內容,你們掌握如何進行main方法與Leader選舉分析的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。