91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Kubelet PLEG怎么創建

發布時間:2021-12-20 10:05:42 來源:億速云 閱讀:162 作者:iii 欄目:云計算

這篇文章主要講解了“Kubelet PLEG怎么創建”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Kubelet PLEG怎么創建”吧!


NewMainKubelet --> New GenericPLEG

Q: GenericPLEG在哪里創建的?

A: 同其他Manager類似,PLEG在kubelet調用NewMainKubelet進行初始化時創建的。

// Capacity of the channel for receiving pod lifecycle events. This number
// is a bit arbitrary and may be adjusted in the future.
plegChannelCapacity = 1000
	
	// Generic PLEG relies on relisting for discovering container events.
// A longer period means that kubelet will take longer to detect container
// changes and to update pod status. On the other hand, a shorter period
// will cause more frequent relisting (e.g., container runtime operations),
// leading to higher cpu usage.
// Note that even though we set the period to 1s, the relisting itself can
// take more than 1s to finish if the container runtime responds slowly
// and/or when there are many container changes in one cycle.
plegRelistPeriod = time.Second * 1

pkg/kubelet/kubelet.go:692

func NewMainKubelet(...) (*Kubelet, error) {
	...
	klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
	...
}	
  • 通過調用pleg.NewGenericPLEG完成創建;

  • plegChannelCapacity是存放PodLifecycleEvent的channel容量,不可配,寫死為1000;

  • PLEG relist進行循環檢查的間隔,不可配,寫死為1s;

下面是GenericPLEG的結構體定義:

pkg/kubelet/pleg/generic.go:49
// GenericPLEG is an extremely simple generic PLEG that relies solely on
// periodic listing to discover container changes. It should be used
// as temporary replacement for container runtimes do not support a proper
// event generator yet.
//
// Note that GenericPLEG assumes that a container would not be created,
// terminated, and garbage collected within one relist period. If such an
// incident happens, GenenricPLEG would miss all events regarding this
// container. In the case of relisting failure, the window may become longer.
// Note that this assumption is not unique -- many kubelet internal components
// rely on terminated containers as tombstones for bookkeeping purposes. The
// garbage collector is implemented to work with such situations. However, to
// guarantee that kubelet can handle missing container events, it is
// recommended to set the relist period short and have an auxiliary, longer
// periodic sync in kubelet as the safety net.
type GenericPLEG struct {
	// The period for relisting.
	relistPeriod time.Duration
	// The container runtime.
	runtime kubecontainer.Runtime
	// The channel from which the subscriber listens events.
	eventChannel chan *PodLifecycleEvent
	// The internal cache for pod/container information.
	podRecords podRecords
	// Time of the last relisting.
	relistTime atomic.Value
	// Cache for storing the runtime states required for syncing pods.
	cache kubecontainer.Cache
	// For testability.
	clock clock.Clock
	// Pods that failed to have their status retrieved during a relist. These pods will be
	// retried during the next relisting.
	podsToReinspect map[types.UID]*kubecontainer.Pod
}
  • eventChannel: PLEG產生的PodLifecycleEvent要發送的Channel;

  • podRecords: map[types.UID]*podRecord,其中key為PodID,value為podRecord:

    	type podRecord struct {
    		old     *kubecontainer.Pod
    		current *kubecontainer.Pod
    	}


  • cache: kubecontainer.Cache,是記錄kubelet存放的PodStatus及PodLifecycleEvent的subscribers

    • kubelet podworkers是subscriber。

  • podsToReinspect: map[types.UID]*kubecontainer.Pod,用于保存那些relist失敗的pods,待下次relist時會去遍歷podsToReinspect中的Pods再次update cache;

    • updateCache會inspect pod并更新cache,如果inspect pod失敗,則會被加入到podsToReinspect中;

Kubelet.Run --> PLEG.Start

Q: 在kubelet初始化時完成了PLEG的創建,那么合適啟動的PLEG呢?

A: kubelet Run方法中會啟動大量managers,PLEG的啟動也在此時。

pkg/kubelet/kubelet.go:1326

// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
	if kl.logServer == nil {
		kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
	}
	if kl.kubeClient == nil {
		glog.Warning("No api server defined - no node status update will be sent.")
	}

	// Start the cloud provider sync manager
	if kl.cloudResourceSyncManager != nil {
		go kl.cloudResourceSyncManager.Run(wait.NeverStop)
	}

	if err := kl.initializeModules(); err != nil {
		kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
		glog.Fatal(err)
	}

	// Start volume manager
	go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)

	if kl.kubeClient != nil {
		// Start syncing node status immediately, this may set up things the runtime needs to run.
		go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
	}
	go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

	// Start loop to sync iptables util rules
	if kl.makeIPTablesUtilChains {
		go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop)
	}

	// Start a goroutine responsible for killing pods (that are not properly
	// handled by pod workers).
	go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)

	// Start component sync loops.
	kl.statusManager.Start()
	kl.probeManager.Start()

	// Start the pod lifecycle event generator.
	kl.pleg.Start()
	kl.syncLoop(updates, kl)
}
  • 在kubelet開始syncLoop前啟動PLEG;

PLEG Start

PLEG Start就是啟動一個協程,每個relistPeriod(1s)就調用一次relist,根據最新的PodStatus生成PodLiftCycleEvent。

pkg/kubelet/pleg/generic.go:130

// Start spawns a goroutine to relist periodically.
func (g *GenericPLEG) Start() {
	go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)
}

PLEG relist

relist是PLEG的核心,它從container runtime中查詢屬于kubelet管理的containers/sandboxes的信息,生成最新的PodStatus,然后對比podRecords中記錄的Old PodStatus生成PodLifeCycleEvents,并發送到PLE Channel。

pkg/kubelet/pleg/generic.go:183

// relist queries the container runtime for list of pods/containers, compare
// with the internal pods/containers, and generates events accordingly.
func (g *GenericPLEG) relist() {
	glog.V(5).Infof("GenericPLEG: Relisting")

	if lastRelistTime := g.getRelistTime(); !lastRelistTime.IsZero() {
		metrics.PLEGRelistInterval.Observe(metrics.SinceInMicroseconds(lastRelistTime))
	}

	timestamp := g.clock.Now()
	defer func() {
		metrics.PLEGRelistLatency.Observe(metrics.SinceInMicroseconds(timestamp))
	}()

	// Get all the pods.
	podList, err := g.runtime.GetPods(true)
	if err != nil {
		glog.Errorf("GenericPLEG: Unable to retrieve pods: %v", err)
		return
	}

	g.updateRelistTime(timestamp)

	pods := kubecontainer.Pods(podList)
	g.podRecords.setCurrent(pods)

	// Compare the old and the current pods, and generate events.
	eventsByPodID := map[types.UID][]*PodLifecycleEvent{}
	for pid := range g.podRecords {
		oldPod := g.podRecords.getOld(pid)
		pod := g.podRecords.getCurrent(pid)
		// Get all containers in the old and the new pod.
		allContainers := getContainersFromPods(oldPod, pod)
		for _, container := range allContainers {
			events := computeEvents(oldPod, pod, &container.ID)
			for _, e := range events {
				updateEvents(eventsByPodID, e)
			}
		}
	}

	var needsReinspection map[types.UID]*kubecontainer.Pod
	if g.cacheEnabled() {
		needsReinspection = make(map[types.UID]*kubecontainer.Pod)
	}

	// If there are events associated with a pod, we should update the
	// podCache.
	for pid, events := range eventsByPodID {
		pod := g.podRecords.getCurrent(pid)
		if g.cacheEnabled() {
			// updateCache() will inspect the pod and update the cache. If an
			// error occurs during the inspection, we want PLEG to retry again
			// in the next relist. To achieve this, we do not update the
			// associated podRecord of the pod, so that the change will be
			// detect again in the next relist.
			// TODO: If many pods changed during the same relist period,
			// inspecting the pod and getting the PodStatus to update the cache
			// serially may take a while. We should be aware of this and
			// parallelize if needed.
			if err := g.updateCache(pod, pid); err != nil {
				glog.Errorf("PLEG: Ignoring events for pod %s/%s: %v", pod.Name, pod.Namespace, err)

				// make sure we try to reinspect the pod during the next relisting
				needsReinspection[pid] = pod

				continue
			} else if _, found := g.podsToReinspect[pid]; found {
				// this pod was in the list to reinspect and we did so because it had events, so remove it
				// from the list (we don't want the reinspection code below to inspect it a second time in
				// this relist execution)
				delete(g.podsToReinspect, pid)
			}
		}
		// Update the internal storage and send out the events.
		g.podRecords.update(pid)
		for i := range events {
			// Filter out events that are not reliable and no other components use yet.
			if events[i].Type == ContainerChanged {
				continue
			}
			g.eventChannel <- events[i]
		}
	}

	if g.cacheEnabled() {
		// reinspect any pods that failed inspection during the previous relist
		if len(g.podsToReinspect) > 0 {
			glog.V(5).Infof("GenericPLEG: Reinspecting pods that previously failed inspection")
			for pid, pod := range g.podsToReinspect {
				if err := g.updateCache(pod, pid); err != nil {
					glog.Errorf("PLEG: pod %s/%s failed reinspection: %v", pod.Name, pod.Namespace, err)
					needsReinspection[pid] = pod
				}
			}
		}

		// Update the cache timestamp.  This needs to happen *after*
		// all pods have been properly updated in the cache.
		g.cache.UpdateTime(timestamp)
	}

	// make sure we retain the list of pods that need reinspecting the next time relist is called
	g.podsToReinspect = needsReinspection
}
  • 通過runtime獲取所有本機所有PodList,并設置給podRecord的Current Pods;

  • 聚合Current和Old Pods中的所有Containers進行遍歷,根據CurrentPod,OldPod,ConainerID生成PodLifecycleEvents;

pkg/kubelet/pleg/generic.go:317
func computeEvents(oldPod, newPod *kubecontainer.Pod, cid *kubecontainer.ContainerID) []*PodLifecycleEvent {
	var pid types.UID
	if oldPod != nil {
		pid = oldPod.ID
	} else if newPod != nil {
		pid = newPod.ID
	}
	oldState := getContainerState(oldPod, cid)
	newState := getContainerState(newPod, cid)
	return generateEvents(pid, cid.ID, oldState, newState)
}

pkg/kubelet/pleg/generic.go:143
func generateEvents(podID types.UID, cid string, oldState, newState plegContainerState) []*PodLifecycleEvent {
	if newState == oldState {
		return nil
	}

	glog.V(4).Infof("GenericPLEG: %v/%v: %v -> %v", podID, cid, oldState, newState)
	switch newState {
	case plegContainerRunning:
		return []*PodLifecycleEvent{{ID: podID, Type: ContainerStarted, Data: cid}}
	case plegContainerExited:
		return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}}
	case plegContainerUnknown:
		return []*PodLifecycleEvent{{ID: podID, Type: ContainerChanged, Data: cid}}
	case plegContainerNonExistent:
		switch oldState {
		case plegContainerExited:
			// We already reported that the container died before.
			return []*PodLifecycleEvent{{ID: podID, Type: ContainerRemoved, Data: cid}}
		default:
			return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}, {ID: podID, Type: ContainerRemoved, Data: cid}}
		}
	default:
		panic(fmt.Sprintf("unrecognized container state: %v", newState))
	}
}
  • New/Old plegContainerState與PodLifecycleEvent的映射關系如下圖:

Kubelet PLEG怎么創建

  • 遍歷生成的PodLifecycleEvents,調用updateCache:

    • 通過runtime查詢當前PodStatus(包括Pod對應的所有containerStatues,sandboxStatuses);

    • 將PodStatus更新到cache中;

  • 如果updateCache失敗,則將該Pod重新加入到podsToReinspect,待下次relist時會遍歷podsToReinspect中的Pods,再次調用updateCache。

  • 如果updateCache成功,則檢查該Pod是否已經在podsToReinspect中,如果存在,則從podsToReinspect中刪除給Pod;

  • uodateCache成功后,更新podRecords(Current賦值給Old,Current設為nil),并將非ContainerChanged類型的PodLifecycleEvent發送到eventChannel中;

    • ContainerChanged類型的Event已經被Disabled;

  • 遍歷podsToReinspect中的Pods,調用updateCache更新cache,如果updateCache失敗,則仍然重新放回到podsToReinspect中待下次relist。

Kubelet SycnLoop

Q: PodLifecycleEvent是發送到eventChannel了了,誰拿去用了呢?

A: kubelet syncLoop!!!

kubelet syncLoop是kubelet來維護Pod狀態的核心邏輯,每次sync都會檢查Pod的狀態并進行修復。

func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
	glog.Info("Starting kubelet main sync loop.")
	// The resyncTicker wakes up kubelet to checks if there are any pod workers
	// that need to be sync'd. A one-second period is sufficient because the
	// sync interval is defaulted to 10s.
	syncTicker := time.NewTicker(time.Second)
	defer syncTicker.Stop()
	housekeepingTicker := time.NewTicker(housekeepingPeriod)
	defer housekeepingTicker.Stop()
	plegCh := kl.pleg.Watch()
	const (
		base   = 100 * time.Millisecond
		max    = 5 * time.Second
		factor = 2
	)
	duration := base
	for {
		if rs := kl.runtimeState.runtimeErrors(); len(rs) != 0 {
			glog.Infof("skipping pod synchronization - %v", rs)
			// exponential backoff
			time.Sleep(duration)
			duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
			continue
		}
		// reset backoff if we have a success
		duration = base

		kl.syncLoopMonitor.Store(kl.clock.Now())
		if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
			break
		}
		kl.syncLoopMonitor.Store(kl.clock.Now())
	}
}
  • syncLoop調用pleg.Watch()返回PodLifecycleEvent Channel。

  • syncLoop中死循環的調用syncLoopIteration進行每次迭代修復。

  • syncLoopIteration方法中一個重要的參數就是pleg channel,syncLoopIteration會從pleg channel中獲取PodLifecycleEvent進行消費。

pkg/kubelet/kubelet.go:1796

func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
	select {
	case u, open := <-configCh:
		...
	case e := <-plegCh:
		if isSyncPodWorthy(e) {
			// PLEG event for a pod; sync it.
			if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
				glog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)
				handler.HandlePodSyncs([]*v1.Pod{pod})
			} else {
				// If the pod no longer exists, ignore the event.
				glog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)
			}
		}

		if e.Type == pleg.ContainerDied {
			if containerID, ok := e.Data.(string); ok {
				kl.cleanUpContainersInPod(e.ID, containerID)
			}
		}
	case <-syncCh:
		...
	case update := <-kl.livenessManager.Updates():
		...
	case <-housekeepingCh:
		...
	return true
}

syncLoopIteration會從config channel, pleg channel, sync channel, housekeeping channel中獲取信息,然后就行消費。我們主要看pleg channel的分支:

  • 如果**eventType != ContainerRemoved**,那么會根據event中PodID從pod manager中獲取Pod對象;

    • 然后嗲都用handler.HandlePodSyncs將該pod dispatchWork到對應的pod worker進行UpdatePod操作;

    • podWorkers.UpdatePod會封裝UpdatePodOptions對象并發送到UpdatePodOptions Channel;

    • kubelet syncPod從UpdatePodOptions Channel中獲取UpdatePodOptions對象進行Pod sync操作;

  • 如果**eventType == ContainerDied**,則從event.Data中獲取containerID;

    • 如果enable了CPU Manager Policy,那么先通過internalLifecycle.PostStopContainer調用CPU Manager對該Container占用的cpus進行釋放;

    • 然后調用KubeGenericRuntimeManager.removeContainerLog將/var/logs/containrs//var/log/pods/中對應該containerID的log刪除;

    • 最后調用docker刪除該container

    • 然后調用cleanUpContainersInPod將ContainerID發動到podContainerDeletor Channel;

    • Kubelet podContainerDeletor負責消費podContainerDeletor Channel中的ContainerID;

    • podContainerDeletor調用KubeGenericRuntimeManager.removeContainer啟動容器remove流程:

Q: kubelet如何過濾本機containers中非k8s管理的container?

A: kubelet通過以下Label Filter找出k8s管理的container sandbox

  • "io.kubernetes.docker.type": "container"

Q: kubelet是如何關聯Pod和Container的?

A: 通過給container打上如下Label,標識對應的Pod

  • "io.kubernetes.pod.name": "xxx"

  • "io.kubernetes.pod.uid": "xxxxxx"

Q: kubelet管理的container,都打了哪些Label?

A: Sample如下:

"annotation.io.kubernetes.container.hash": "b7c1651a",
"annotation.io.kubernetes.container.ports": "[{\"name\":\"web\",\"containerPort\":80,\"protocol\":\"TCP\"}]",
"annotation.io.kubernetes.container.restartCount": "0",
"annotation.io.kubernetes.container.terminationMessagePath": "/dev/termination-log",
"annotation.io.kubernetes.container.terminationMessagePolicy": "File",
"annotation.io.kubernetes.pod.terminationGracePeriod": "10",
"io.kubernetes.container.logpath": "/var/log/pods/381f8cc6-d84a-11e8-b596-5254000a5151/nginx/0.log",
"io.kubernetes.container.name": "nginx",
"io.kubernetes.docker.type": "container",
"io.kubernetes.pod.name": "web-0",
"io.kubernetes.pod.namespace": "default",
"io.kubernetes.pod.uid": "381f8cc6-d84a-11e8-b596-5254000a5151",
"io.kubernetes.sandbox.id": "cc3be54bf8e9bd386423d23eaccfd2f05e8be2156d60f933791a25c261c8d8a8"

PLEG Core Logic Diagram

注: 綠色圖塊表示與PLEG有交互的kubelet模塊。

Kubelet PLEG怎么創建

感謝各位的閱讀,以上就是“Kubelet PLEG怎么創建”的內容了,經過本文的學習后,相信大家對Kubelet PLEG怎么創建這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

玉溪市| 新平| 清新县| 句容市| 澄江县| 武宣县| 庆城县| 天峻县| 怀远县| 大化| 株洲县| 巴林右旗| 青龙| 滨海县| 八宿县| 象山县| 吴川市| 云南省| 台安县| 铜陵市| 香格里拉县| 馆陶县| 鄢陵县| 远安县| 碌曲县| 综艺| 孟州市| 峨眉山市| 萨迦县| 永吉县| 宜黄县| 如东县| 古蔺县| 忻州市| 临夏市| 克什克腾旗| 富锦市| 甘泉县| 南城县| 平乐县| 信宜市|