您好,登錄后才能下訂單哦!
本篇內容介紹了“SuperEdge分布式健康檢查怎么實現”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
SuperEdge 是基于原生 Kubernetes 的邊緣容器管理系統。該系統把云原生能力擴展到邊緣側,很好的實現了云端對邊緣端的管理和控制,極大簡化了應用從云端部署到邊緣端的過程。同時SuperEdge設計了分布式健康檢查機制規避了云邊網絡不穩定造成的大量pod遷移和重建,保證了服務的穩定。
邊緣計算場景下,邊緣節點與云端的網絡環境十分復雜,連接并不可靠,在原生 Kubernetes 集群中,會造成 apiserver 和節點連接的中斷,節點狀態的異常,最終導致pod的驅逐和 endpoint 的缺失,造成服務的中斷和波動,具體來說原生 Kubernetes 處理如下:
失聯的節點被置為 ConditionUnknown 狀態,并被添加 NoSchedule 和 NoExecute 的 taints
失聯的節點上的 pod 被驅逐,并在其他節點上進行重建
失聯的節點上的 pod 從 Service 的 Endpoint 列表中移除
因此,邊緣計算場景僅僅依賴邊端和 apiserver 的連接情況是不足以判斷節點是否異常的,會因為網絡的不可靠造成誤判,影響正常服務。而相較于云端和邊緣端的連接,顯然邊端節點之間的連接更為穩定,具有更高的參考價值,因此 superedge 提出了邊緣分布式健康檢查機制。該機制中節點狀態判定除了要考慮 apiserver 的因素外,還引入了節點的評估因素,進而對節點進行更為全面的狀態判斷。通過這個功能,能夠避免由于云邊網絡不可靠造成的大量的pod遷移和重建,保證服務的穩定
具體來說,主要通過如下三個層面增強節點狀態判斷的準確性:
每個節點定期探測其他節點健康狀態
集群內所有節點定期投票決定各節點的狀態
云端和邊端節點共同決定節點狀態
而分布式健康檢查最終的判斷處理如下:
在深入源碼之前先介紹一下分布式健康檢查的實現原理,其架構圖如下所示:
Kubernetes 每個 node 在 kube-node-lease namespace 下會對應一個 Lease object,kubelet 每隔 node-status-update-frequency 時間(默認10s)會更新對應node的 Lease object
node-controller 會每隔 node-monitor-period 時間(默認5s)檢查 Lease object 是否更新,如果超過 node-monitor-grace-period 時間(默認40s)沒有發生過更新,則認為這個 node 不健康,會更新 NodeStatus(ConditionUnknown)
而當節點心跳超時(ConditionUnknown)之后,node controller 會給該 node 添加如下 taints:
spec: ... taints: - effect: NoSchedule key: node.kubernetes.io/unreachable timeAdded: "2020-07-02T03:50:47Z" - effect: NoExecute key: node.kubernetes.io/unreachable timeAdded: "2020-07-02T03:50:53Z"
同時,endpoint controller 會從 endpoint backend 中踢掉該母機上的所有 pod
對于打上 NoSchedule taint 的母機,Scheduler 不會調度新的負載在該 node 上了;而對于打上 NoExecute(node.kubernetes.io/unreachable) taint 的母機,node controller 會在節點心跳超時之后一段時間(默認5mins)驅逐該節點上的 pod
分布式健康檢查邊端的 edge-health-daemon 組件會對同區域邊緣節點執行分布式健康檢查,并向 apiserver 發送健康狀態投票結果(給 node 打 annotation)
此外,為了實現在云邊斷連且分布式健康檢查狀態正常的情況下:
失聯的節點上的 pod 不會從 Service 的 Endpoint 列表中移除
失聯的節點上的 pod 不會被驅逐
還需要在云端運行 edge-health-admission(Kubernetes mutating admission webhook),不斷根據 node edge-health annotation 調整 kube-controller-manager 設置的 node taint(去掉NoExecute taint)以及 endpoints (將失聯節點上的 pods 從 endpoint subsets notReadyAddresses 移到 addresses中),從而實現云端和邊端共同決定節點狀態
本章將主要介紹 edge-health-daemon 原理,如下為 edge-health-daemon 的相關數據結構:
type EdgeHealthMetadata struct { *NodeMetadata *CheckMetadata } type NodeMetadata struct { NodeList []v1.Node sync.RWMutex } type CheckMetadata struct { CheckInfo map[string]map[string]CheckDetail // Checker ip:{Checked ip:Check detail} CheckPluginScoreInfo map[string]map[string]float64 // Checked ip:{Plugin name:Check score} sync.RWMutex } type CheckDetail struct { Normal bool Time time.Time } type CommunInfo struct { SourceIP string // ClientIP,Checker ip CheckDetail map[string]CheckDetail // Checked ip:Check detail Hmac string }
含義如下:
NodeMetadata:為了實現分區域分布式健康檢查機制而維護的邊緣節點 cache,其中包含該區域內的所有邊緣節點列表 NodeList
CheckMetadata:存放健康檢查的結果,具體來說包括兩個數據結構:
CheckPluginScoreInfo:為Checked ip:{Plugin name:Check score}
組織形式。第一級 key 表示:被檢查的ip;第二級 key 表示:檢查插件的名稱;value 表示:檢查分數
CheckInfo:為Checker ip:{Checked ip:Check detail}
組織形式。第一級key表示:執行檢查的ip;第二級key表示:被檢查的ip;value表示檢查結果 CheckDetail
CheckDetail:代表健康檢查的結果
Normal:Normal 為 true 表示檢查結果正常;false 表示異常
Time:表示得出該結果時的時間,用于結果有效性的判斷(超過一段時間沒有更新的結果將無效)
CommunInfo:邊緣節點向其它節點發送健康檢查結果時使用的數據,其中包括:
SourceIP:表示執行檢查的ip
CheckDetail:為Checked ip:Check detail
組織形式,包含被檢查的ip以及檢查結果
Hmac:SourceIP 以及 CheckDetail 進行 hmac 得到,用于邊緣節點通信過程中判斷傳輸數據的有效性(是否被篡改)
edge-health-daemon 主體邏輯包括四部分功能:
SyncNodeList:根據邊緣節點所在的 zone 刷新 node cache,同時更新 CheckMetadata相關數據
ExecuteCheck:對每個邊緣節點執行若干種類的健康檢查插件(ping,kubelet等),并將各插件檢查分數匯總,根據用戶設置的基準線得出節點是否健康的結果
Commun:將本節點對其它各節點健康檢查的結果發送給其它節點
Vote:對所有節點健康檢查的結果分類,如果某個節點被大多數(>1/2)節點判定為正常,則對該節點添加superedgehealth/node-health:true
annotation,表明該節點分布式健康檢查結果為正常;否則,對該節點添加superedgehealth/node-health:false
annotation,表明該節點分布式健康檢查結果為異常
下面依次對上述功能進行源碼分析:
1、SyncNodeList
SyncNodeList 每隔 HealthCheckPeriod 秒(health-check-period 選項)執行一次,會按照如下情況分類刷新 node cache:
如果 kube-system namespace 下不存在名為 edge-health-zone-config的configmap,則沒有開啟多地域探測,因此會獲取所有邊緣節點列表并刷新 node cache
否則,如果 edge-health-zone-config 的 configmap 數據部分 TaintZoneAdmission 為 false,則沒有開啟多地域探測,因此會獲取所有邊緣節點列表并刷新 node cache
如果 TaintZoneAdmission 為 true,且 node 有"superedgehealth/topology-zone"標簽(標示區域),則獲取"superedgehealth/topology-zone" label value 相同的節點列表并刷新 node cache
如果 node 沒有"superedgehealth/topology-zone" label,則只會將邊緣節點本身添加到分布式健康檢查節點列表中并刷新 node cache(only itself)
func (ehd *EdgeHealthDaemon) SyncNodeList() { // Only sync nodes when self-located found var host *v1.Node if host = ehd.metadata.GetNodeByName(ehd.cfg.Node.HostName); host == nil { klog.Errorf("Self-hostname %s not found", ehd.cfg.Node.HostName) return } // Filter cloud nodes and retain edge ones masterRequirement, err := labels.NewRequirement(common.MasterLabel, selection.DoesNotExist, []string{}) if err != nil { klog.Errorf("New masterRequirement failed %+v", err) return } masterSelector := labels.NewSelector() masterSelector = masterSelector.Add(*masterRequirement) if mrc, err := ehd.cmLister.ConfigMaps(metav1.NamespaceSystem).Get(common.TaintZoneConfigMap); err != nil { if apierrors.IsNotFound(err) { // multi-region configmap not found if NodeList, err := ehd.nodeLister.List(masterSelector); err != nil { klog.Errorf("Multi-region configmap not found and get nodes err %+v", err) return } else { ehd.metadata.SetByNodeList(NodeList) } } else { klog.Errorf("Get multi-region configmap err %+v", err) return } } else { // multi-region configmap found mrcv := mrc.Data[common.TaintZoneConfigMapKey] klog.V(4).Infof("Multi-region value is %s", mrcv) if mrcv == "false" { // close multi-region check if NodeList, err := ehd.nodeLister.List(masterSelector); err != nil { klog.Errorf("Multi-region configmap exist but disabled and get nodes err %+v", err) return } else { ehd.metadata.SetByNodeList(NodeList) } } else { // open multi-region check if hostZone, existed := host.Labels[common.TopologyZone]; existed { klog.V(4).Infof("Host %s has HostZone %s", host.Name, hostZone) zoneRequirement, err := labels.NewRequirement(common.TopologyZone, selection.Equals, []string{hostZone}) if err != nil { klog.Errorf("New masterZoneRequirement failed: %+v", err) return } masterZoneSelector := labels.NewSelector() masterZoneSelector = masterZoneSelector.Add(*masterRequirement, *zoneRequirement) if nodeList, err := ehd.nodeLister.List(masterZoneSelector); err != nil { klog.Errorf("TopologyZone label for hostname %s but get nodes err: %+v", host.Name, err) return } else { ehd.metadata.SetByNodeList(nodeList) } } else { // Only check itself if there is no TopologyZone label klog.V(4).Infof("Only check itself since there is no TopologyZone label for hostname %s", host.Name) ehd.metadata.SetByNodeList([]*v1.Node{host}) } } } // Init check plugin score ipList := make(map[string]struct{}) for _, node := range ehd.metadata.Copy() { for _, addr := range node.Status.Addresses { if addr.Type == v1.NodeInternalIP { ipList[addr.Address] = struct{}{} ehd.metadata.InitCheckPluginScore(addr.Address) } } } // Delete redundant check plugin score for _, checkedIp := range ehd.metadata.CopyCheckedIp() { if _, existed := ipList[checkedIp]; !existed { ehd.metadata.DeleteCheckPluginScore(checkedIp) } } // Delete redundant check info for checkerIp := range ehd.metadata.CopyAll() { if _, existed := ipList[checkerIp]; !existed { ehd.metadata.DeleteByIp(ehd.cfg.Node.LocalIp, checkerIp) } } klog.V(4).Infof("SyncNodeList check info %+v successfully", ehd.metadata) } ... func (cm *CheckMetadata) DeleteByIp(localIp, ip string) { cm.Lock() defer cm.Unlock() delete(cm.CheckInfo[localIp], ip) delete(cm.CheckInfo, ip) }
在按照如上邏輯更新node cache之后,會初始化CheckMetadata.CheckPluginScoreInfo,將節點ip賦值給CheckPluginScoreInfo key(Checked ip
:被檢查的ip)
另外,會刪除CheckMetadata.CheckPluginScoreInfo以及CheckMetadata.CheckInfo中多余的items(不屬于該邊緣節點檢查范圍)
2、ExecuteCheck
ExecuteCheck也是每隔HealthCheckPeriod秒(health-check-period選項)執行一次,會對每個邊緣節點執行若干種類的健康檢查插件(ping,kubelet等),并將各插件檢查分數匯總,根據用戶設置的基準線HealthCheckScoreLine(health-check-scoreline選項)得出節點是否健康的結果
func (ehd *EdgeHealthDaemon) ExecuteCheck() { util.ParallelizeUntil(context.TODO(), 16, len(ehd.checkPlugin.Plugins), func(index int) { ehd.checkPlugin.Plugins[index].CheckExecute(ehd.metadata.CheckMetadata) }) klog.V(4).Infof("CheckPluginScoreInfo is %+v after health check", ehd.metadata.CheckPluginScoreInfo) for checkedIp, pluginScores := range ehd.metadata.CopyCheckPluginScore() { totalScore := 0.0 for _, score := range pluginScores { totalScore += score } if totalScore >= ehd.cfg.Check.HealthCheckScoreLine { ehd.metadata.SetByCheckDetail(ehd.cfg.Node.LocalIp, checkedIp, metadata.CheckDetail{Normal: true}) } else { ehd.metadata.SetByCheckDetail(ehd.cfg.Node.LocalIp, checkedIp, metadata.CheckDetail{Normal: false}) } } klog.V(4).Infof("CheckInfo is %+v after health check", ehd.metadata.CheckInfo) }
這里會調用 ParallelizeUntil 并發執行各檢查插件,edge-health 目前支持 ping 以及 kubelet 兩種檢查插件,在 checkplugin 目錄(github.com/superedge/superedge/pkg/edge-health/checkplugin),通過 Register 注冊到 PluginInfo 單例(plugin列表)中,如下:
// TODO: handle flag parse errors func (pcp *PingCheckPlugin) Set(s string) error { var err error for _, para := range strings.Split(s, ",") { if len(para) == 0 { continue } arr := strings.Split(para, "=") trimKey := strings.TrimSpace(arr[0]) switch trimKey { case "timeout": timeout, _ := strconv.Atoi(strings.TrimSpace(arr[1])) pcp.HealthCheckoutTimeOut = timeout case "retries": retries, _ := strconv.Atoi(strings.TrimSpace(arr[1])) pcp.HealthCheckRetries = retries case "weight": weight, _ := strconv.ParseFloat(strings.TrimSpace(arr[1]), 64) pcp.Weight = weight case "port": port, _ := strconv.Atoi(strings.TrimSpace(arr[1])) pcp.Port = port } } PluginInfo = NewPlugin() PluginInfo.Register(pcp) return err } func (p *Plugin) Register(plugin CheckPlugin) { p.Plugins = append(p.Plugins, plugin) klog.V(4).Info("Register check plugin: %+v", plugin) } ... var ( PluginOnce sync.Once PluginInfo Plugin ) type Plugin struct { Plugins []CheckPlugin } func NewPlugin() Plugin { PluginOnce.Do(func() { PluginInfo = Plugin{ Plugins: []CheckPlugin{}, } }) return PluginInfo }
每種插件具體執行健康檢查的邏輯封裝在 CheckExecute 中,這里以 ping plugin 為例:
// github.com/superedge/superedge/pkg/edge-health/checkplugin/pingcheck.go func (pcp *PingCheckPlugin) CheckExecute(checkMetadata *metadata.CheckMetadata) { copyCheckedIp := checkMetadata.CopyCheckedIp() util.ParallelizeUntil(context.TODO(), 16, len(copyCheckedIp), func(index int) { checkedIp := copyCheckedIp[index] var err error for i := 0; i < pcp.HealthCheckRetries; i++ { if _, err := net.DialTimeout("tcp", checkedIp+":"+strconv.Itoa(pcp.Port), time.Duration(pcp.HealthCheckoutTimeOut)*time.Second); err == nil { break } } if err == nil { klog.V(4).Infof("Edge ping health check plugin %s for ip %s succeed", pcp.Name(), checkedIp) checkMetadata.SetByPluginScore(checkedIp, pcp.Name(), pcp.GetWeight(), common.CheckScoreMax) } else { klog.Warning("Edge ping health check plugin %s for ip %s failed, possible reason %s", pcp.Name(), checkedIp, err.Error()) checkMetadata.SetByPluginScore(checkedIp, pcp.Name(), pcp.GetWeight(), common.CheckScoreMin) } }) } // CheckPluginScoreInfo relevant functions func (cm *CheckMetadata) SetByPluginScore(checkedIp, pluginName string, weight float64, score int) { cm.Lock() defer cm.Unlock() if _, existed := cm.CheckPluginScoreInfo[checkedIp]; !existed { cm.CheckPluginScoreInfo[checkedIp] = make(map[string]float64) } cm.CheckPluginScoreInfo[checkedIp][pluginName] = float64(score) * weight }
CheckExecute 會對同區域每個節點執行 ping 探測(net.DialTimeout),如果失敗,則給該節點打 CheckScoreMin 分(0);否則,打 CheckScoreMax 分(100)
每種檢查插件會有一個 Weight 參數,表示了該檢查插件分數的權重值,所有權重參數之和應該為1,對應基準分數線 HealthCheckScoreLine 范圍0-100。因此這里在設置分數時,會乘以權重
回到 ExecuteCheck 函數,在調用各插件執行健康檢查得出權重分數(CheckPluginScoreInfo)后,還需要將該分數與基準線 HealthCheckScoreLine 對比:如果高于(>=)分數線,則認為該節點本次檢查正常;否則異常
func (ehd *EdgeHealthDaemon) ExecuteCheck() { util.ParallelizeUntil(context.TODO(), 16, len(ehd.checkPlugin.Plugins), func(index int) { ehd.checkPlugin.Plugins[index].CheckExecute(ehd.metadata.CheckMetadata) }) klog.V(4).Infof("CheckPluginScoreInfo is %+v after health check", ehd.metadata.CheckPluginScoreInfo) for checkedIp, pluginScores := range ehd.metadata.CopyCheckPluginScore() { totalScore := 0.0 for _, score := range pluginScores { totalScore += score } if totalScore >= ehd.cfg.Check.HealthCheckScoreLine { ehd.metadata.SetByCheckDetail(ehd.cfg.Node.LocalIp, checkedIp, metadata.CheckDetail{Normal: true}) } else { ehd.metadata.SetByCheckDetail(ehd.cfg.Node.LocalIp, checkedIp, metadata.CheckDetail{Normal: false}) } } klog.V(4).Infof("CheckInfo is %+v after health check", ehd.metadata.CheckInfo) }
3、Commun
在對同區域各邊緣節點執行健康檢查后,需要將檢查的結果傳遞給其它各節點,這也就是 commun 模塊負責的事情:
func (ehd *EdgeHealthDaemon) Run(stopCh <-chan struct{}) { // Execute edge health prepare and check ehd.PrepareAndCheck(stopCh) // Execute vote vote := vote.NewVoteEdge(&ehd.cfg.Vote) go vote.Vote(ehd.metadata, ehd.cfg.Kubeclient, ehd.cfg.Node.LocalIp, stopCh) // Execute communication communEdge := commun.NewCommunEdge(&ehd.cfg.Commun) communEdge.Commun(ehd.metadata.CheckMetadata, ehd.cmLister, ehd.cfg.Node.LocalIp, stopCh) <-stopCh }
既然是互相傳遞結果給其它節點,則必然會有接受和發送模塊:
func (c *CommunEdge) Commun(checkMetadata *metadata.CheckMetadata, cmLister corelisters.ConfigMapLister, localIp string, stopCh <-chan struct{}) { go c.communReceive(checkMetadata, cmLister, stopCh) wait.Until(func() { c.communSend(checkMetadata, cmLister, localIp) }, time.Duration(c.CommunPeriod)*time.Second, stopCh) }
其中 communSend 負責向其它節點發送本節點對它們的檢查結果;而 communReceive 負責接受其它邊緣節點的檢查結果。下面依次分析:
func (c *CommunEdge) communSend(checkMetadata *metadata.CheckMetadata, cmLister corelisters.ConfigMapLister, localIp string) { copyLocalCheckDetail := checkMetadata.CopyLocal(localIp) var checkedIps []string for checkedIp := range copyLocalCheckDetail { checkedIps = append(checkedIps, checkedIp) } util.ParallelizeUntil(context.TODO(), 16, len(checkedIps), func(index int) { // Only send commun information to other edge nodes(excluding itself) dstIp := checkedIps[index] if dstIp == localIp { return } // Send commun information communInfo := metadata.CommunInfo{SourceIP: localIp, CheckDetail: copyLocalCheckDetail} if hmac, err := util.GenerateHmac(communInfo, cmLister); err != nil { log.Errorf("communSend: generateHmac err %+v", err) return } else { communInfo.Hmac = hmac } commonInfoBytes, err := json.Marshal(communInfo) if err != nil { log.Errorf("communSend: json.Marshal commun info err %+v", err) return } commonInfoReader := bytes.NewReader(commonInfoBytes) for i := 0; i < c.CommunRetries; i++ { req, err := http.NewRequest("PUT", "http://"+dstIp+":"+strconv.Itoa(c.CommunServerPort)+"/result", commonInfoReader) if err != nil { log.Errorf("communSend: NewRequest for remote edge node %s err %+v", dstIp, err) continue } if err = util.DoRequestAndDiscard(c.client, req); err != nil { log.Errorf("communSend: DoRequestAndDiscard for remote edge node %s err %+v", dstIp, err) } else { log.V(4).Infof("communSend: put commun info %+v to remote edge node %s successfully", communInfo, dstIp) break } } }) }
發送邏輯如下:
構建 CommunInfo 結構體,包括:
SourceIP:表示執行檢查的ip
CheckDetail:為 Checked ip:Check detail 組織形式,包含被檢查的ip以及檢查結果
調用 GenerateHmac 構建 Hmac:實際上是以 kube-system 下的 hmac-config configmap hmackey 字段為 key,對 SourceIP 以及 CheckDetail進行 hmac 得到,用于判斷傳輸數據的有效性(是否被篡改)
func GenerateHmac(communInfo metadata.CommunInfo, cmLister corelisters.ConfigMapLister) (string, error) { addrBytes, err := json.Marshal(communInfo.SourceIP) if err != nil { return "", err } detailBytes, _ := json.Marshal(communInfo.CheckDetail) if err != nil { return "", err } hmacBefore := string(addrBytes) + string(detailBytes) if hmacConf, err := cmLister.ConfigMaps(metav1.NamespaceSystem).Get(common.HmacConfig); err != nil { return "", err } else { return GetHmacCode(hmacBefore, hmacConf.Data[common.HmacKey]) } } func GetHmacCode(s, key string) (string, error) { h := hmac.New(sha256.New, []byte(key)) if _, err := io.WriteString(h, s); err != nil { return "", err } return fmt.Sprintf("%x", h.Sum(nil)), nil }
發送上述構建的 CommunInfo 給其它邊緣節點(DoRequestAndDiscard)
communReceive邏輯也很清晰:
// TODO: support changeable server listen port func (c *CommunEdge) communReceive(checkMetadata *metadata.CheckMetadata, cmLister corelisters.ConfigMapLister, stopCh <-chan struct{}) { svr := &http.Server{Addr: ":" + strconv.Itoa(c.CommunServerPort)} svr.ReadTimeout = time.Duration(c.CommunTimeout) * time.Second svr.WriteTimeout = time.Duration(c.CommunTimeout) * time.Second http.HandleFunc("/debug/flags/v", pkgutil.UpdateLogLevel) http.HandleFunc("/result", func(w http.ResponseWriter, r *http.Request) { var communInfo metadata.CommunInfo if r.Body == nil { http.Error(w, "Invalid commun information", http.StatusBadRequest) return } err := json.NewDecoder(r.Body).Decode(&communInfo) if err != nil { http.Error(w, fmt.Sprintf("Invalid commun information %+v", err), http.StatusBadRequest) return } log.V(4).Infof("Received common information from %s : %+v", communInfo.SourceIP, communInfo.CheckDetail) if _, err := io.WriteString(w, "Received!\n"); err != nil { log.Errorf("communReceive: send response err %+v", err) http.Error(w, fmt.Sprintf("Send response err %+v", err), http.StatusInternalServerError) return } if hmac, err := util.GenerateHmac(communInfo, cmLister); err != nil { log.Errorf("communReceive: server GenerateHmac err %+v", err) http.Error(w, fmt.Sprintf("GenerateHmac err %+v", err), http.StatusInternalServerError) return } else { if hmac != communInfo.Hmac { log.Errorf("communReceive: Hmac not equal, hmac is %s but received commun info hmac is %s", hmac, communInfo.Hmac) http.Error(w, "Hmac not match", http.StatusForbidden) return } } log.V(4).Infof("communReceive: Hmac match") checkMetadata.SetByCommunInfo(communInfo) log.V(4).Infof("After communicate, check info is %+v", checkMetadata.CheckInfo) }) go func() { if err := svr.ListenAndServe(); err != http.ErrServerClosed { log.Fatalf("Server: exit with error %+v", err) } }() for { select { case <-stopCh: ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() if err := svr.Shutdown(ctx); err != nil { log.Errorf("Server: program exit, server exit error %+v", err) } return default: } } }
負責接受其它邊緣節點的檢查結果,并寫入自身檢查結果 CheckInfo,流程如下:
通過/result
路由接受請求,并將請求內容解析成 CommunInfo
對 CommunInfo 執行 GenerateHmac 獲取hmac值,并與 CommunInfo.Hmac 字段進行對比,檢查接受數據的有效性
最后將 CommunInfo 檢查結果寫入 CheckInfo,注意:CheckDetail.Time 設置為寫入時的時間
// CheckInfo relevant functions func (cm *CheckMetadata) SetByCommunInfo(c CommunInfo) { cm.Lock() defer cm.Unlock() if _, existed := cm.CheckInfo[c.SourceIP]; !existed { cm.CheckInfo[c.SourceIP] = make(map[string]CheckDetail) } for k, detail := range c.CheckDetail { // Update time to local timestamp since different machines have different ones detail.Time = time.Now() c.CheckDetail[k] = detail } cm.CheckInfo[c.SourceIP] = c.CheckDetail }
最后在接受到 stopCh 信號時,通過 svr.Shutdown 平滑關閉服務
4、Vote
在接受到其它節點的健康檢查結果后,vote 模塊會對結果進行統計得出最終判決,并向 apiserver 報告:
func (v *VoteEdge) Vote(edgeHealthMetadata *metadata.EdgeHealthMetadata, kubeclient clientset.Interface, localIp string, stopCh <-chan struct{}) { go wait.Until(func() { v.vote(edgeHealthMetadata, kubeclient, localIp, stopCh) }, time.Duration(v.VotePeriod)*time.Second, stopCh) }
首先根據檢查結果統計出狀態正常以及異常的節點列表:
type votePair struct { pros int cons int } ... var ( prosVoteIpList, consVoteIpList []string // Init votePair since cannot assign to struct field voteCountMap[checkedIp].pros in map vp votePair ) voteCountMap := make(map[string]votePair) // {"127.0.0.1":{"pros":1,"cons":2}} copyCheckInfo := edgeHealthMetadata.CopyAll() // Note that voteThreshold should be calculated by checked instead of checker // since checked represents the total valid edge health nodes while checker may contain partly ones. voteThreshold := (edgeHealthMetadata.GetCheckedIpLen() + 1) / 2 for _, checkedDetails := range copyCheckInfo { for checkedIp, checkedDetail := range checkedDetails { if !time.Now().After(checkedDetail.Time.Add(time.Duration(v.VoteTimeout) * time.Second)) { if _, existed := voteCountMap[checkedIp]; !existed { voteCountMap[checkedIp] = votePair{0, 0} } vp = voteCountMap[checkedIp] if checkedDetail.Normal { vp.pros++ if vp.pros >= voteThreshold { prosVoteIpList = append(prosVoteIpList, checkedIp) } } else { vp.cons++ if vp.cons >= voteThreshold { consVoteIpList = append(consVoteIpList, checkedIp) } } voteCountMap[checkedIp] = vp } } } log.V(4).Infof("Vote: voteCountMap is %+v", voteCountMap) ...
其中狀態判斷的邏輯如下:
如果超過一半(>)的節點對該節點的檢查結果為正常,則認為該節點狀態正常(注意時間差在 VoteTimeout 內)
如果超過一半(>)的節點對該節點的檢查結果為異常,則認為該節點狀態異常(注意時間差在 VoteTimeout 內)
除開上述情況,認為節點狀態判斷無效,對這些節點不做任何處理(可能存在腦裂的情況)
對狀態正常的節點做如下處理:
... // Handle prosVoteIpList util.ParallelizeUntil(context.TODO(), 16, len(prosVoteIpList), func(index int) { if node := edgeHealthMetadata.GetNodeByAddr(prosVoteIpList[index]); node != nil { log.V(4).Infof("Vote: vote pros to edge node %s begin ...", node.Name) nodeCopy := node.DeepCopy() needUpdated := false if nodeCopy.Annotations == nil { nodeCopy.Annotations = map[string]string{ common.NodeHealthAnnotation: common.NodeHealthAnnotationPros, } needUpdated = true } else { if healthy, existed := nodeCopy.Annotations[common.NodeHealthAnnotation]; existed { if healthy != common.NodeHealthAnnotationPros { nodeCopy.Annotations[common.NodeHealthAnnotation] = common.NodeHealthAnnotationPros needUpdated = true } } else { nodeCopy.Annotations[common.NodeHealthAnnotation] = common.NodeHealthAnnotationPros needUpdated = true } } if index, existed := admissionutil.TaintExistsPosition(nodeCopy.Spec.Taints, common.UnreachableNoExecuteTaint); existed { nodeCopy.Spec.Taints = append(nodeCopy.Spec.Taints[:index], nodeCopy.Spec.Taints[index+1:]...) needUpdated = true } if needUpdated { if _, err := kubeclient.CoreV1().Nodes().Update(context.TODO(), nodeCopy, metav1.UpdateOptions{}); err != nil { log.Errorf("Vote: update pros vote to edge node %s error %+v ", nodeCopy.Name, err) } else { log.V(2).Infof("Vote: update pros vote to edge node %s successfully", nodeCopy.Name) } } } else { log.Warningf("Vote: edge node addr %s not found", prosVoteIpList[index]) } }) ...
添加或者更新"superedgehealth/node-health" annotation 值為"true",表明分布式健康檢查判斷該節點狀態正常。
如果node存在 NoExecute(node.kubernetes.io/unreachable) taint,則將其去掉,并更新 node.
而對狀態異常的節點會添加或者更新"superedgehealth/node-health" annotation值為"false",表明分布式健康檢查判斷該節點狀態異常:
// Handle consVoteIpList util.ParallelizeUntil(context.TODO(), 16, len(consVoteIpList), func(index int) { if node := edgeHealthMetadata.GetNodeByAddr(consVoteIpList[index]); node != nil { log.V(4).Infof("Vote: vote cons to edge node %s begin ...", node.Name) nodeCopy := node.DeepCopy() needUpdated := false if nodeCopy.Annotations == nil { nodeCopy.Annotations = map[string]string{ common.NodeHealthAnnotation: common.NodeHealthAnnotationCons, } needUpdated = true } else { if healthy, existed := nodeCopy.Annotations[common.NodeHealthAnnotation]; existed { if healthy != common.NodeHealthAnnotationCons { nodeCopy.Annotations[common.NodeHealthAnnotation] = common.NodeHealthAnnotationCons needUpdated = true } } else { nodeCopy.Annotations[common.NodeHealthAnnotation] = common.NodeHealthAnnotationCons needUpdated = true } } if needUpdated { if _, err := kubeclient.CoreV1().Nodes().Update(context.TODO(), nodeCopy, metav1.UpdateOptions{}); err != nil { log.Errorf("Vote: update cons vote to edge node %s error %+v ", nodeCopy.Name, err) } else { log.V(2).Infof("Vote: update cons vote to edge node %s successfully", nodeCopy.Name) } } } else { log.Warningf("Vote: edge node addr %s not found", consVoteIpList[index]) } })
在邊端 edge-health-daemon 向 apiserver 發送節點健康結果后,云端運行 edge-health-admission(Kubernetes mutating admission webhook),會不斷根據 node edge-health annotation 調整 kube-controller-manager 設置的 node taint(去掉NoExecute taint) 以及 endpoints(將失聯節點上的 pods 從 endpoint subsets notReadyAddresses 移到 addresses中),從而實現即便云邊斷連,但是分布式健康檢查狀態正常的情況下:
失聯的節點上的 pod 不會從 Service 的 Endpoint 列表中移除
失聯的節點上的 pod 不會被驅逐
分布式健康檢查對于云邊斷連情況的處理區別原生 Kubernetes 如下:
失聯的節點被置為 ConditionUnknown 狀態,并被添加 NoSchedule 和 NoExecute 的 taints
失聯的節點上的pod被驅逐,并在其他節點上進行重建
失聯的節點上的pod從 Service 的 Endpoint 列表中移除
原生 Kubernetes:
分布式健康檢查:
分布式健康檢查主要通過如下三個層面增強節點狀態判斷的準確性:
每個節點定期探測其他節點健康狀態
集群內所有節點定期投票決定各節點的狀態
云端和邊端節點共同決定節點狀態
分布式健康檢查功能由邊端的 edge-health-daemon 以及云端的 edge-health-admission 組成,功能分別如下:
SyncNodeList:根據邊緣節點所在的 zone 刷新 node cache,同時更新 CheckMetadata 相關數據
ExecuteCheck:對每個邊緣節點執行若干種類的健康檢查插件(ping,kubelet等),并將各插件檢查分數匯總,根據用戶設置的基準線得出節點是否健康的結果
Commun:將本節點對其它各節點健康檢查的結果發送給其它節點
Vote:對所有節點健康檢查的結果分類,如果某個節點被大多數(>1/2)節點判定為正常,則對該節點添加 superedgehealth/node-health:true annotation,表明該節點分布式健康檢查結果為正常;否則,對該節點添加 superedgehealth/node-health:false annotation,表明該節點分布式健康檢查結果為異常
edge-health-daemon:對同區域邊緣節點執行分布式健康檢查,并向 apiserver 發送健康狀態投票結果(給 node 打 annotation),主體邏輯包括四部分功能:
edge-health-admission(Kubernetes mutating admission webhook):不斷根據 node edge-health annotation 調整 kube-controller-manager 設置的 node taint(去掉 NoExecute taint)以及endpoints(將失聯節點上的 pods 從 endpoint subsets notReadyAddresses 移到 addresses中),從而實現云端和邊端共同決定節點狀態
duyanghao kubernetes-reading-notes
“SuperEdge分布式健康檢查怎么實現”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。