您好,登錄后才能下訂單哦!
怎么實現CloudStack High Availability源碼分析,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。
我們先來看DirectAgentAttache的內部類PingTask,首先我們要知道每一個注冊到CS中的主機都有一個對應的DirectAgentAttache,這也就意味著每一個HOST都有一個PingTask線程在后臺循環運行,時間間隔是由全局變量ping.interval來指定的,默認是60s.
我們來看PingTask的代碼
ServerResource resource = _resource; if (resource != null) { PingCommand cmd = resource.getCurrentStatus(_id); int retried = 0; while (cmd == null && ++retried <= _HostPingRetryCount.value()) { Thread.sleep(1000*_HostPingRetryTimer.value()); cmd = resource.getCurrentStatus(_id); } if (cmd == null) { s_logger.warn("Unable to get current status on ">
_id代表host_id,當getCurrentStatus能返回正確的cmd就說明能夠Ping通該host,那接下來就是執行_agentMgr.handleCommands
public void handleCommands(final AgentAttache attache, final long sequence, final Command[] cmds) { for (final Pair<Integer, Listener> listener : _cmdMonitors) { final boolean processed = listener.second().processCommands(attache.getId(), sequence, cmds); } }
其中我們關心BehindOnPingListener,我們來看它的processCommands方法
@Override public boolean processCommands(final long agentId, final long seq, final Command[] commands) { final boolean processed = false; for (final Command cmd : commands) { if (cmd instanceof PingCommand) { pingBy(agentId); } } return processed; }
接下來是pingBy方法
public void pingBy(final long agentId) { // Update PingMap with the latest time if agent entry exists in the PingMap if (_pingMap.replace(agentId, InaccurateClock.getTimeInSeconds()) == null) { s_logger.info("PingMap for agent: " + agentId + " will not be updated because agent is no longer in the PingMap"); } }
這里重點就是這個_pingMap,我們看到它其實是一個ConcurrentHashMap,key是agentId(比如hostId),value是一個時間戳,就是當我們這一次如果Ping通之后會把當前時間作為value插入到_pingMap 中。我們回顧一下上面說過PingTask是每ping.interval時間間隔執行一次,所以如果我們的主機是在正常運行的話那么_pingMap就會幾乎每ping.interval更新一次。(當然執行getCurrentStatus方法會有一定的延遲)那如果主機出現突然的故障導致網絡無法連接的情況下,那_pingMap中的時間就會一直停留在上一次Ping通的那個時間戳。
所以我們來總結一下PingTask的邏輯:就是每隔ping.interval(默認60s)去Ping我們的主機,如果能夠Ping通就更新_pingMap中的value為當前時間戳,否則什么都不做。
接下來我們要看的另一個后臺線程是MonitorTask,同樣是每隔ping.interval執行一次,先是方法findAgentsBehindOnPing
protected List<Long> findAgentsBehindOnPing() { final List<Long> agentsBehind = new ArrayList<Long>(); final long cutoffTime = InaccurateClock.getTimeInSeconds() - getTimeout(); for (final Map.Entry<Long, Long> entry : _pingMap.entrySet()) { if (entry.getValue() < cutoffTime) { agentsBehind.add(entry.getKey()); } } return agentsBehind; } protected long getTimeout() { return (long) (PingTimeout.value() * PingInterval.value()); }
全局變量ping.timeout默認值是2.5,這段代碼的意思就是找出上一次Ping通的時間距離現在超過ping.interval的2.5倍的主機,簡單講就是Ping不通或者Ping通的延時超過我們認為的不合理時間的主機。 正常情況下該方法返回的會是一個空的List,這個時候MonitorTask就結束當前任務。但是如果出現網絡延時或者主機故障的時候,就要執行接下來的代碼。
final List<Long> behindAgents = findAgentsBehindOnPing(); for (final Long agentId : behindAgents) { final QueryBuilder<HostVO> sc = QueryBuilder.create(HostVO.class); sc.and(sc.entity().getId(), Op.EQ, agentId); final HostVO h = sc.find(); if (h != null) { final ResourceState resourceState = h.getResourceState(); if (resourceState == ResourceState.Disabled || resourceState == ResourceState.Maintenance || resourceState == ResourceState.ErrorInMaintenance) { disconnectWithoutInvestigation(agentId, Event.ShutdownRequested); } else { final HostVO host = _hostDao.findById(agentId); if (host != null && (host.getType() == Host.Type.ConsoleProxy || host.getType() == Host.Type.SecondaryStorageVM || host.getType() == Host.Type.SecondaryStorageCmdExecutor)) { disconnectWithoutInvestigation(agentId, Event.ShutdownRequested); } else { disconnectWithInvestigation(agentId, Event.PingTimeout); } } } }
我們假設出問題的是一臺計算節點,那么一路往下將要執行的將是AgentManagerImpl的handleDisconnectWithInvestigation方法
protected boolean handleDisconnectWithInvestigation(final AgentAttache attache, Status.Event event) { final long hostId = attache.getId(); HostVO host = _hostDao.findById(hostId); if (host != null) { Status nextStatus = null; nextStatus = host.getStatus().getNextStatus(event); if (nextStatus == Status.Alert) { Status determinedState = investigate(attache); if (determinedState == null) { if ((System.currentTimeMillis() >> 10) - host.getLastPinged() > AlertWait.value()) { determinedState = Status.Alert; } else { return false; } } final Status currentStatus = host.getStatus(); if (determinedState == Status.Down) { event = Status.Event.HostDown; } else if (determinedState == Status.Up) { agentStatusTransitTo(host, Status.Event.Ping, _nodeId); return false; } else if (determinedState == Status.Disconnected) { if (currentStatus == Status.Disconnected) { if ((System.currentTimeMillis() >> 10) - host.getLastPinged() > AlertWait.value()) { event = Status.Event.WaitedTooLong; } else { return false; } } else if (currentStatus == Status.Up) { event = Status.Event.AgentDisconnected; } } } } handleDisconnectWithoutInvestigation(attache, event, true, true); host = _hostDao.findById(hostId); // Maybe the host magically reappeared? if (host != null && host.getStatus() == Status.Down) { _haMgr.scheduleRestartForVmsOnHost(host, true); } return true; }
我們先看一下該方法最后的那個if,就是在特定的條件下我們最終的目的就是重啟該主機上的所有虛擬機,這才是 HA的真正目的。但是我們要記住我們進入這個handleDisconnectWithInvestigation方法的前提其實是很簡單的,就是只要我們發現距離上一次Ping通該主機的時間超過比如說2分半鐘就會進入該方法,而我們要真正執行HA應該是要非常確定該主機確實是掛掉了的情況下才發生的。所以該方法前面一大堆都是在反復的確認主機的狀態,就如方法名所示Inverstigation(調查)。 我們假設該主機的currentStatus是UP,event我們知道是PingTimeout,所以nextStatus就是Alert。接下來就是執行investigate方法
protected Status investigate(final AgentAttache agent) { final Long hostId = agent.getId(); final HostVO host = _hostDao.findById(hostId); if (host != null && host.getType() != null && !host.getType().isVirtual()) { final Answer answer = easySend(hostId, new CheckHealthCommand()); if (answer != null && answer.getResult()) { final Status status = Status.Up; return status; } return _haMgr.investigate(hostId); } return Status.Alert; }
該方法先會向該hostId發送一個CheckHealthCommand,這個時候會有兩種可能:
1、如果能夠接受到應答說明此時該主機是正常的就直接返回UP狀態,我們在回到handleDisconnectWithInvestigation就會發現此時該任務也就基本結束了意思就是觸發該方法的僅僅是臨時的網絡不通或者什么情況現在主機已經恢復正常
2.那另一種情況就是CheckHealthCommand沒有得到應答,也就是說我直接從management-server去請求你主機你沒有反應,那也不代表你就真的掛了,接下來怎么辦呢,我們去找各種偵探(investigators)去調查你是否alive
@Override public Status investigate(final long hostId) { final HostVO host = _hostDao.findById(hostId); if (host == null) { return Status.Alert; } Status hostState = null; for (Investigator investigator : investigators) { hostState = investigator.isAgentAlive(host); if (hostState != null) { return hostState; } } return hostState; }
那假如我們的主機是一臺XenServer的主機的話,最重要的當然是XenServerInvestigator,我們來看它的isAgentAlive方法
public Status isAgentAlive(Host agent) { CheckOnHostCommand cmd = new CheckOnHostCommand(agent); List<HostVO> neighbors = _resourceMgr.listAllHostsInCluster(agent.getClusterId()); for (HostVO neighbor : neighbors) { Answer answer = _agentMgr.easySend(neighbor.getId(), cmd); if (answer != null && answer.getResult()) { CheckOnHostAnswer ans = (CheckOnHostAnswer)answer; if (!ans.isDetermined()) { continue; } return ans.isAlive() ? null : Status.Down; } } return null; }
邏輯也很簡單就是我直接找不到你我就去找你同一個Cluster中的鄰居,我向你的每一個鄰居主機發送一個CheckOnHostCommand命令,看它們能不能知道你到底怎么了。關于CheckOnHostCommand命令的具體實現在開頭那篇官網的文章里有詳細的說明
If the network ping investigation returns that it cannot detect the status of the host, CloudStack HA then relies on the hypervisor specific investigation. For VmWare, there is no such investigation as the hypervisor host handles its own HA. For XenServer and KVM, CloudStack HA deploys a monitoring script that writes the current timestamp on to a heartbeat file on shared storage. If the timestamp cannot be written, the hypervisor host self-fences by rebooting itself. For these two hypervisors, CloudStack HA sends a CheckOnHostCommand to a neighboring hypervisor host that shares the same storage. The neighbor then checks on the heartbeat file on shared storage and see if the heartbeat is no longer being written. If the heartbeat is still being written, the host reports that the host in question is still alive. If the heartbeat file’s timestamp is lagging behind, after an acceptable timeout value, the host reports that the host in question is down and HA is started on the VMs on that host.
大致的意思是CS會在每一個XenServer和KVM的主機上運行一段監控腳本,這個腳本會將當前時間戳寫入一個在共享存儲的文件中。如果某一臺主機發現自己無法往文件中寫入數據將會強制自己重啟。 那上面那段代碼的邏輯就是向與該被調查的主機共享存儲的其他主機發送CheckOnHostCommand命令,鄰居主機接受到命令就去查看文件中被調查主機有沒有持續的更新時間戳,如果有它就返回相應說該主機is still alive,否則就返回說該主機is down. 這樣只有主機確實出了故障無法連接的情況下,handleDisconnectWithInvestigation方法中的determinedState才會是Status.Down,那么event就變成了Status.Event.HostDown,接下來就執行HighAvailabilityManagerImpl的scheduleRestartForVmsOnHost方法重起該主機上的所以虛擬機 ,然后是在數據庫中出入一個HaWorkVO,然后喚醒CS啟動的時候初始化好的WorkerThread,到很重要的同樣是HighAvailabilityManagerImpl的restart方法
protected Long restart(final HaWorkVO work) { boolean isHostRemoved = false; Boolean alive = null; if (work.getStep() == Step.Investigating) { if (!isHostRemoved) { Investigator investigator = null; for (Investigator it : investigators) { investigator = it; try { (1) alive = investigator.isVmAlive(vm, host); break; } catch (UnknownVM e) { s_logger.info(investigator.getName() + " could not find " + vm); } } boolean fenced = false; if (alive == null) { for (FenceBuilder fb : fenceBuilders) { (2) Boolean result = fb.fenceOff(vm, host); if (result != null && result) { fenced = true; break; } } } (3) _itMgr.advanceStop(vm.getUuid(), true); } } vm = _itMgr.findById(vm.getId()); (4)if (!_forceHA && !vm.isHaEnabled()) { return null; // VM doesn't require HA } try { HashMap<VirtualMachineProfile.Param, Object> params = new HashMap<VirtualMachineProfile.Param, Object>(); (5) if (_haTag != null) { params.put(VirtualMachineProfile.Param.HaTag, _haTag); } WorkType wt = work.getWorkType(); if (wt.equals(WorkType.HA)) { params.put(VirtualMachineProfile.Param.HaOperation, true); } (6) try{ _itMgr.advanceStart(vm.getUuid(), params, null); }catch (InsufficientCapacityException e){ s_logger.warn("Failed to deploy vm " + vmId + " with original planner, sending HAPlanner"); _itMgr.advanceStart(vm.getUuid(), params, _haPlanners.get(0)); } } return (System.currentTimeMillis() >> 10) + _restartRetryInterval; }
如上代碼我所標記的有5個重點需要關注的。 大致的流程如下:
(1)調用各個
investigator.isVmAlive
方法,如果isAlive則什么都不做,否則往下走(2)調用
fb.fenceOff
方法(3)執行
_itMgr.advanceStop
方法(4)關于_forceHA變量,因為我在全局變量和數據庫的configuration表中都沒有找到,所以初始化的值為 FALSE,那么也就是說只有
vm.isHaEnabled
為ture的VM才會繼續執行下去,否則直接return了(5)
_haTag
的值是由全局變量ha.tag來指定的,默認為空,如果指定了這個值對后面確定VM分配主機很重要,記住這行代碼params.put(VirtualMachineProfile.Param.HaTag, _haTag);
(6)這里有沒有很熟悉,是的,凡是讀過CS創建VM實例的過程代碼的人都知道這個方法就是去分配一個VM,那么到這里整個CS的HA執行代碼就完成大部分了,接下來就是重啟VM,至于該VM能否重啟就要依賴各種條件了,比如該Cluster中有沒有合適的主機、主機的物理資源是否充足、有沒有設置ha.tag、VM有沒有使用標簽等等這里就不再詳述了。
看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業資訊頻道,感謝您對億速云的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。