您好,登錄后才能下訂單哦!
這篇文章主要講解了“Kubelet PLEG怎么創建”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Kubelet PLEG怎么創建”吧!
Q: GenericPLEG在哪里創建的?
A: 同其他Manager類似,PLEG在kubelet調用NewMainKubelet進行初始化時創建的。
// Capacity of the channel for receiving pod lifecycle events. This number // is a bit arbitrary and may be adjusted in the future. plegChannelCapacity = 1000 // Generic PLEG relies on relisting for discovering container events. // A longer period means that kubelet will take longer to detect container // changes and to update pod status. On the other hand, a shorter period // will cause more frequent relisting (e.g., container runtime operations), // leading to higher cpu usage. // Note that even though we set the period to 1s, the relisting itself can // take more than 1s to finish if the container runtime responds slowly // and/or when there are many container changes in one cycle. plegRelistPeriod = time.Second * 1 pkg/kubelet/kubelet.go:692 func NewMainKubelet(...) (*Kubelet, error) { ... klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{}) ... }
通過調用pleg.NewGenericPLEG完成創建;
plegChannelCapacity是存放PodLifecycleEvent的channel容量,不可配,寫死為1000;
PLEG relist進行循環檢查的間隔,不可配,寫死為1s;
下面是GenericPLEG的結構體定義:
pkg/kubelet/pleg/generic.go:49 // GenericPLEG is an extremely simple generic PLEG that relies solely on // periodic listing to discover container changes. It should be used // as temporary replacement for container runtimes do not support a proper // event generator yet. // // Note that GenericPLEG assumes that a container would not be created, // terminated, and garbage collected within one relist period. If such an // incident happens, GenenricPLEG would miss all events regarding this // container. In the case of relisting failure, the window may become longer. // Note that this assumption is not unique -- many kubelet internal components // rely on terminated containers as tombstones for bookkeeping purposes. The // garbage collector is implemented to work with such situations. However, to // guarantee that kubelet can handle missing container events, it is // recommended to set the relist period short and have an auxiliary, longer // periodic sync in kubelet as the safety net. type GenericPLEG struct { // The period for relisting. relistPeriod time.Duration // The container runtime. runtime kubecontainer.Runtime // The channel from which the subscriber listens events. eventChannel chan *PodLifecycleEvent // The internal cache for pod/container information. podRecords podRecords // Time of the last relisting. relistTime atomic.Value // Cache for storing the runtime states required for syncing pods. cache kubecontainer.Cache // For testability. clock clock.Clock // Pods that failed to have their status retrieved during a relist. These pods will be // retried during the next relisting. podsToReinspect map[types.UID]*kubecontainer.Pod }
eventChannel: PLEG產生的PodLifecycleEvent要發送的Channel;
podRecords: map[types.UID]*podRecord,其中key為PodID,value為podRecord:
type podRecord struct { old *kubecontainer.Pod current *kubecontainer.Pod }
cache: kubecontainer.Cache,是記錄kubelet存放的PodStatus及PodLifecycleEvent的subscribers
kubelet podworkers是subscriber。
podsToReinspect: map[types.UID]*kubecontainer.Pod,用于保存那些relist失敗的pods,待下次relist時會去遍歷podsToReinspect中的Pods再次update cache;
updateCache會inspect pod并更新cache,如果inspect pod失敗,則會被加入到podsToReinspect中;
Q: 在kubelet初始化時完成了PLEG的創建,那么合適啟動的PLEG呢?
A: kubelet Run方法中會啟動大量managers,PLEG的啟動也在此時。
pkg/kubelet/kubelet.go:1326 // Run starts the kubelet reacting to config updates func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { if kl.logServer == nil { kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/"))) } if kl.kubeClient == nil { glog.Warning("No api server defined - no node status update will be sent.") } // Start the cloud provider sync manager if kl.cloudResourceSyncManager != nil { go kl.cloudResourceSyncManager.Run(wait.NeverStop) } if err := kl.initializeModules(); err != nil { kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error()) glog.Fatal(err) } // Start volume manager go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop) if kl.kubeClient != nil { // Start syncing node status immediately, this may set up things the runtime needs to run. go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop) } go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop) // Start loop to sync iptables util rules if kl.makeIPTablesUtilChains { go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop) } // Start a goroutine responsible for killing pods (that are not properly // handled by pod workers). go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop) // Start component sync loops. kl.statusManager.Start() kl.probeManager.Start() // Start the pod lifecycle event generator. kl.pleg.Start() kl.syncLoop(updates, kl) }
在kubelet開始syncLoop前啟動PLEG;
PLEG Start就是啟動一個協程,每個relistPeriod(1s)就調用一次relist,根據最新的PodStatus生成PodLiftCycleEvent。
pkg/kubelet/pleg/generic.go:130 // Start spawns a goroutine to relist periodically. func (g *GenericPLEG) Start() { go wait.Until(g.relist, g.relistPeriod, wait.NeverStop) }
relist是PLEG的核心,它從container runtime中查詢屬于kubelet管理的containers/sandboxes的信息,生成最新的PodStatus,然后對比podRecords中記錄的Old PodStatus生成PodLifeCycleEvents,并發送到PLE Channel。
pkg/kubelet/pleg/generic.go:183 // relist queries the container runtime for list of pods/containers, compare // with the internal pods/containers, and generates events accordingly. func (g *GenericPLEG) relist() { glog.V(5).Infof("GenericPLEG: Relisting") if lastRelistTime := g.getRelistTime(); !lastRelistTime.IsZero() { metrics.PLEGRelistInterval.Observe(metrics.SinceInMicroseconds(lastRelistTime)) } timestamp := g.clock.Now() defer func() { metrics.PLEGRelistLatency.Observe(metrics.SinceInMicroseconds(timestamp)) }() // Get all the pods. podList, err := g.runtime.GetPods(true) if err != nil { glog.Errorf("GenericPLEG: Unable to retrieve pods: %v", err) return } g.updateRelistTime(timestamp) pods := kubecontainer.Pods(podList) g.podRecords.setCurrent(pods) // Compare the old and the current pods, and generate events. eventsByPodID := map[types.UID][]*PodLifecycleEvent{} for pid := range g.podRecords { oldPod := g.podRecords.getOld(pid) pod := g.podRecords.getCurrent(pid) // Get all containers in the old and the new pod. allContainers := getContainersFromPods(oldPod, pod) for _, container := range allContainers { events := computeEvents(oldPod, pod, &container.ID) for _, e := range events { updateEvents(eventsByPodID, e) } } } var needsReinspection map[types.UID]*kubecontainer.Pod if g.cacheEnabled() { needsReinspection = make(map[types.UID]*kubecontainer.Pod) } // If there are events associated with a pod, we should update the // podCache. for pid, events := range eventsByPodID { pod := g.podRecords.getCurrent(pid) if g.cacheEnabled() { // updateCache() will inspect the pod and update the cache. If an // error occurs during the inspection, we want PLEG to retry again // in the next relist. To achieve this, we do not update the // associated podRecord of the pod, so that the change will be // detect again in the next relist. // TODO: If many pods changed during the same relist period, // inspecting the pod and getting the PodStatus to update the cache // serially may take a while. We should be aware of this and // parallelize if needed. if err := g.updateCache(pod, pid); err != nil { glog.Errorf("PLEG: Ignoring events for pod %s/%s: %v", pod.Name, pod.Namespace, err) // make sure we try to reinspect the pod during the next relisting needsReinspection[pid] = pod continue } else if _, found := g.podsToReinspect[pid]; found { // this pod was in the list to reinspect and we did so because it had events, so remove it // from the list (we don't want the reinspection code below to inspect it a second time in // this relist execution) delete(g.podsToReinspect, pid) } } // Update the internal storage and send out the events. g.podRecords.update(pid) for i := range events { // Filter out events that are not reliable and no other components use yet. if events[i].Type == ContainerChanged { continue } g.eventChannel <- events[i] } } if g.cacheEnabled() { // reinspect any pods that failed inspection during the previous relist if len(g.podsToReinspect) > 0 { glog.V(5).Infof("GenericPLEG: Reinspecting pods that previously failed inspection") for pid, pod := range g.podsToReinspect { if err := g.updateCache(pod, pid); err != nil { glog.Errorf("PLEG: pod %s/%s failed reinspection: %v", pod.Name, pod.Namespace, err) needsReinspection[pid] = pod } } } // Update the cache timestamp. This needs to happen *after* // all pods have been properly updated in the cache. g.cache.UpdateTime(timestamp) } // make sure we retain the list of pods that need reinspecting the next time relist is called g.podsToReinspect = needsReinspection }
通過runtime獲取所有本機所有PodList,并設置給podRecord的Current Pods;
聚合Current和Old Pods中的所有Containers進行遍歷,根據CurrentPod,OldPod,ConainerID生成PodLifecycleEvents;
pkg/kubelet/pleg/generic.go:317 func computeEvents(oldPod, newPod *kubecontainer.Pod, cid *kubecontainer.ContainerID) []*PodLifecycleEvent { var pid types.UID if oldPod != nil { pid = oldPod.ID } else if newPod != nil { pid = newPod.ID } oldState := getContainerState(oldPod, cid) newState := getContainerState(newPod, cid) return generateEvents(pid, cid.ID, oldState, newState) } pkg/kubelet/pleg/generic.go:143 func generateEvents(podID types.UID, cid string, oldState, newState plegContainerState) []*PodLifecycleEvent { if newState == oldState { return nil } glog.V(4).Infof("GenericPLEG: %v/%v: %v -> %v", podID, cid, oldState, newState) switch newState { case plegContainerRunning: return []*PodLifecycleEvent{{ID: podID, Type: ContainerStarted, Data: cid}} case plegContainerExited: return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}} case plegContainerUnknown: return []*PodLifecycleEvent{{ID: podID, Type: ContainerChanged, Data: cid}} case plegContainerNonExistent: switch oldState { case plegContainerExited: // We already reported that the container died before. return []*PodLifecycleEvent{{ID: podID, Type: ContainerRemoved, Data: cid}} default: return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}, {ID: podID, Type: ContainerRemoved, Data: cid}} } default: panic(fmt.Sprintf("unrecognized container state: %v", newState)) } }
New/Old plegContainerState與PodLifecycleEvent的映射關系如下圖:
遍歷生成的PodLifecycleEvents,調用updateCache:
通過runtime查詢當前PodStatus(包括Pod對應的所有containerStatues,sandboxStatuses);
將PodStatus更新到cache中;
如果updateCache失敗,則將該Pod重新加入到podsToReinspect,待下次relist時會遍歷podsToReinspect中的Pods,再次調用updateCache。
如果updateCache成功,則檢查該Pod是否已經在podsToReinspect中,如果存在,則從podsToReinspect中刪除給Pod;
uodateCache成功后,更新podRecords(Current賦值給Old,Current設為nil),并將非ContainerChanged類型的PodLifecycleEvent發送到eventChannel中;
ContainerChanged類型的Event已經被Disabled;
遍歷podsToReinspect中的Pods,調用updateCache更新cache,如果updateCache失敗,則仍然重新放回到podsToReinspect中待下次relist。
Q: PodLifecycleEvent是發送到eventChannel了了,誰拿去用了呢?
A: kubelet syncLoop!!!
kubelet syncLoop是kubelet來維護Pod狀態的核心邏輯,每次sync都會檢查Pod的狀態并進行修復。
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) { glog.Info("Starting kubelet main sync loop.") // The resyncTicker wakes up kubelet to checks if there are any pod workers // that need to be sync'd. A one-second period is sufficient because the // sync interval is defaulted to 10s. syncTicker := time.NewTicker(time.Second) defer syncTicker.Stop() housekeepingTicker := time.NewTicker(housekeepingPeriod) defer housekeepingTicker.Stop() plegCh := kl.pleg.Watch() const ( base = 100 * time.Millisecond max = 5 * time.Second factor = 2 ) duration := base for { if rs := kl.runtimeState.runtimeErrors(); len(rs) != 0 { glog.Infof("skipping pod synchronization - %v", rs) // exponential backoff time.Sleep(duration) duration = time.Duration(math.Min(float64(max), factor*float64(duration))) continue } // reset backoff if we have a success duration = base kl.syncLoopMonitor.Store(kl.clock.Now()) if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) { break } kl.syncLoopMonitor.Store(kl.clock.Now()) } }
syncLoop調用pleg.Watch()返回PodLifecycleEvent Channel。
syncLoop中死循環的調用syncLoopIteration進行每次迭代修復。
syncLoopIteration方法中一個重要的參數就是pleg channel,syncLoopIteration會從pleg channel中獲取PodLifecycleEvent進行消費。
pkg/kubelet/kubelet.go:1796 func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler, syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool { select { case u, open := <-configCh: ... case e := <-plegCh: if isSyncPodWorthy(e) { // PLEG event for a pod; sync it. if pod, ok := kl.podManager.GetPodByUID(e.ID); ok { glog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e) handler.HandlePodSyncs([]*v1.Pod{pod}) } else { // If the pod no longer exists, ignore the event. glog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e) } } if e.Type == pleg.ContainerDied { if containerID, ok := e.Data.(string); ok { kl.cleanUpContainersInPod(e.ID, containerID) } } case <-syncCh: ... case update := <-kl.livenessManager.Updates(): ... case <-housekeepingCh: ... return true }
syncLoopIteration會從config channel, pleg channel, sync channel, housekeeping channel中獲取信息,然后就行消費。我們主要看pleg channel的分支:
如果**eventType != ContainerRemoved
**,那么會根據event中PodID從pod manager中獲取Pod對象;
然后嗲都用handler.HandlePodSyncs將該pod dispatchWork到對應的pod worker進行UpdatePod操作;
podWorkers.UpdatePod會封裝UpdatePodOptions對象并發送到UpdatePodOptions Channel;
kubelet syncPod從UpdatePodOptions Channel中獲取UpdatePodOptions對象進行Pod sync操作;
如果**eventType == ContainerDied
**,則從event.Data中獲取containerID;
如果enable了CPU Manager Policy,那么先通過internalLifecycle.PostStopContainer調用CPU Manager對該Container占用的cpus進行釋放;
然后調用KubeGenericRuntimeManager.removeContainerLog將/var/logs/containrs/
及/var/log/pods/
中對應該containerID的log刪除;
最后調用docker刪除該container。
然后調用cleanUpContainersInPod將ContainerID發動到podContainerDeletor Channel;
Kubelet podContainerDeletor負責消費podContainerDeletor Channel中的ContainerID;
podContainerDeletor調用KubeGenericRuntimeManager.removeContainer啟動容器remove流程:
Q: kubelet如何過濾本機containers中非k8s管理的container?
A: kubelet通過以下Label Filter找出k8s管理的container sandbox
"io.kubernetes.docker.type": "container"
Q: kubelet是如何關聯Pod和Container的?
A: 通過給container打上如下Label,標識對應的Pod
"io.kubernetes.pod.name": "xxx"
"io.kubernetes.pod.uid": "xxxxxx"
Q: kubelet管理的container,都打了哪些Label?
A: Sample如下:
"annotation.io.kubernetes.container.hash": "b7c1651a", "annotation.io.kubernetes.container.ports": "[{\"name\":\"web\",\"containerPort\":80,\"protocol\":\"TCP\"}]", "annotation.io.kubernetes.container.restartCount": "0", "annotation.io.kubernetes.container.terminationMessagePath": "/dev/termination-log", "annotation.io.kubernetes.container.terminationMessagePolicy": "File", "annotation.io.kubernetes.pod.terminationGracePeriod": "10", "io.kubernetes.container.logpath": "/var/log/pods/381f8cc6-d84a-11e8-b596-5254000a5151/nginx/0.log", "io.kubernetes.container.name": "nginx", "io.kubernetes.docker.type": "container", "io.kubernetes.pod.name": "web-0", "io.kubernetes.pod.namespace": "default", "io.kubernetes.pod.uid": "381f8cc6-d84a-11e8-b596-5254000a5151", "io.kubernetes.sandbox.id": "cc3be54bf8e9bd386423d23eaccfd2f05e8be2156d60f933791a25c261c8d8a8"
注: 綠色圖塊表示與PLEG有交互的kubelet模塊。
感謝各位的閱讀,以上就是“Kubelet PLEG怎么創建”的內容了,經過本文的學習后,相信大家對Kubelet PLEG怎么創建這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。