您好,登錄后才能下訂單哦!
本篇內容主要講解“zookeeper的Leader選舉機制是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“zookeeper的Leader選舉機制是什么”吧!
一個分布式服務框架,主要解決分布式應用中常見的多種數據問題,例如集群管理,狀態同步等。為解決這些問題zookeeper需要Leader選舉進行保障數據的強一致性機制和穩定性。
Leader選舉機制采用半數選舉算法。
每一個zookeeper服務端稱之為一個節點,每個節點都有投票權,把其選票投向每一個有選舉權的節點,當其中一個節點選舉出票數過半,這個節點就會成為Leader,其它節點成為Follower。
重命名zoo_sample.cfg文件為zoo1.cfg ,zoo2.cfg,zoo3.cfg,zoo4.cfg
修改zoo.cfg文件,修改值如下:
【plain】 zoo1.cfg文件內容: dataDir=/export/data/zookeeper-1 clientPort=2181 server.1=127.0.0.1:2001:3001 server.2=127.0.0.1:2002:3002:participant server.3=127.0.0.1:2003:3003:participant server.4=127.0.0.1:2004:3004:observer zoo2.cfg文件內容: dataDir=/export/data/zookeeper-2 clientPort=2182 server.1=127.0.0.1:2001:3001 server.2=127.0.0.1:2002:3002:participant server.3=127.0.0.1:2003:3003:participant server.4=127.0.0.1:2004:3004:observer zoo3.cfg文件內容: dataDir=/export/data/zookeeper-3 clientPort=2183 server.1=127.0.0.1:2001:3001 server.2=127.0.0.1:2002:3002:participant server.3=127.0.0.1:2003:3003:participant server.4=127.0.0.1:2004:3004:observer zoo4.cfg文件內容: dataDir=/export/data/zookeeper-4 clientPort=2184 server.1=127.0.0.1:2001:3001 server.2=127.0.0.1:2002:3002:participant server.3=127.0.0.1:2003:3003:participant server.4=127.0.0.1:2004:3004:observer
server.第幾號服務器(對應myid文件內容)=ip:數據同步端口:選舉端口:選舉標識
participant默認參與選舉標識,可不寫. observer不參與選舉
4.在/export/data/zookeeper-1,/export/data/zookeeper-2,/export/data/zookeeper-3,/export/data/zookeeper-4目錄下創建myid文件,文件內容分別寫1 ,2,3,4,用于標識sid(全稱:Server ID)賦值。
啟動三個zookeeper實例:
bin/zkServer.sh start conf/zoo1.cfg
bin/zkServer.sh start conf/zoo2.cfg
bin/zkServer.sh start conf/zoo3.cfg
每啟動一個實例,都會讀取啟動參數配置zoo.cfg文件,這樣實例就可以知道其作為服務端身份信息sid以及集群中有多少個實例參與選舉。
圖1 第一輪到第二輪投票流程
前提:
設定票據數據格式vote(sid,zxid,epoch)
sid是Server ID每臺服務的唯一標識,是myid文件內容;
zxid是數據事務id號;
epoch為選舉周期,為方便理解下面講解內容暫定為1初次選舉,不寫入下面內容里。
按照順序啟動sid=1,sid=2節點
第一輪投票:
sid=1節點:初始選票為自己,將選票vote(1,0)發送給sid=2節點;
sid=2節點:初始選票為自己,將選票vote(2,0)發送給sid=1節點;
sid=1節點:收到sid=2節點選票vote(2,0)和當前自己的選票vote(1,0),首先比對zxid值,zxid越大代表數據最新,優先選擇zxid最大的選票,如果zxid相同,選舉最大sid。當前投票選舉結果為vote(2,0),sid=1節點的選票變為vote(2,0);
sid=2節點:收到sid=1節點選票vote(1,0)和當前自己的選票vote(2,0),參照上述選舉方式,選舉結果為vote(2,0),sid=2節點的選票不變;
第一輪投票選舉結束。
第二輪投票:
sid=1節點:當前自己的選票為vote(2,0),將選票vote(2,0)發送給sid=2節點;
sid=2節點:當前自己的選票為vote(2,0),將選票vote(2,0)發送給sid=1節點;
sid=1節點:收到sid=2節點選票vote(2,0)和自己的選票vote(2,0), 按照半數選舉算法,總共3個節點參與選舉,已有2個節點選舉出相同選票,推舉sid=2節點為Leader,自己角色變為Follower;
sid=2節點:收到sid=1節點選票vote(2,0)和自己的選票vote(2,0),按照半數選舉算法推舉sid=2節點為Leader,自己角色變為Leader。
這時啟動sid=3節點后,集群里已經選舉出leader,sid=1和sid=2節點會將自己的leader選票發回給sid=3節點,通過半數選舉結果還是sid=2節點為leader。
zookeeper選舉底層主要分為選舉應用層和消息傳輸隊列層,第一層應用層隊列統一接收和發送選票,而第二層傳輸層隊列,是按照服務端sid分成了多個隊列,是為了避免給每臺服務端發送消息互相影響。比如對某臺機器發送不成功不會影響正常服務端的發送。
圖2 多層隊列上下關系交互流程圖
通過查看zkServer.sh文件內容找到服務啟動類:
org.apache.zookeeper.server.quorum.QuorumPeerMain
加載配置文件QuorumPeerConfig.parse(path);
針對 Leader選舉關鍵配置信息如下:
讀取dataDir目錄找到myid文件內容,設置當前應用sid標識,做為投票人身份信息。下面遇到myid變量為當前節點自己sid標識。
設置peerType當前應用是否參與選舉
new QuorumMaj()解析server.前綴加載集群成員信息,加載allMembers所有成員,votingMembers參與選舉成員,observingMembers觀察者成員,設置half值votingMembers.size()/2.
【Java】 public QuorumMaj(Properties props) throws ConfigException { for (Entry<Object, Object> entry : props.entrySet()) { String key = entry.getKey().toString(); String value = entry.getValue().toString(); //讀取集群配置文件中的server.開頭的應用實例配置信息 if (key.startsWith("server.")) { int dot = key.indexOf('.'); long sid = Long.parseLong(key.substring(dot + 1)); QuorumServer qs = new QuorumServer(sid, value); allMembers.put(Long.valueOf(sid), qs); if (qs.type == LearnerType.PARTICIPANT) //應用實例綁定的角色為PARTICIPANT意為參與選舉 votingMembers.put(Long.valueOf(sid), qs); else { //觀察者成員 observingMembers.put(Long.valueOf(sid), qs); } } else if (key.equals("version")) { version = Long.parseLong(value, 16); } } //過半基數 half = votingMembers.size() / 2; }
QuorumPeerMain.runFromConfig(config) 啟動服務;
QuorumPeer.startLeaderElection() 開啟選舉服務;
設置當前選票new Vote(sid,zxid,epoch)
【plain】 synchronized public void startLeaderElection(){ try { if (getPeerState() == ServerState.LOOKING) { //首輪:當前節點默認投票對象為自己 currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); } } catch(IOException e) { RuntimeException re = new RuntimeException(e.getMessage()); re.setStackTrace(e.getStackTrace()); throw re; } //........ }
創建選舉管理類:QuorumCnxnManager;
初始化recvQueue<Message(sid,ByteBuffer)>接收投票隊列(第二層傳輸隊列);
初始化queueSendMap<sid,queue>按sid發送投票隊列(第二層傳輸隊列);
初始化senderWorkerMap<sid,SendWorker>發送投票工作線程容器,表示著與sid投票節點已連接;
初始化選舉監聽線程類QuorumCnxnManager.Listener。
【Java】 //QuorumPeer.createCnxnManager() public QuorumCnxManager(QuorumPeer self, final long mySid, Map<Long,QuorumPeer.QuorumServer> view, QuorumAuthServer authServer, QuorumAuthLearner authLearner, int socketTimeout, boolean listenOnAllIPs, int quorumCnxnThreadsSize, boolean quorumSaslAuthEnabled) { //接收投票隊列(第二層傳輸隊列) this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY); //按sid發送投票隊列(第二層傳輸隊列) this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>(); //發送投票工作線程容器,表示著與sid投票節點已連接 this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>(); this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>(); String cnxToValue = System.getProperty("zookeeper.cnxTimeout"); if(cnxToValue != null){ this.cnxTO = Integer.parseInt(cnxToValue); } this.self = self; this.mySid = mySid; this.socketTimeout = socketTimeout; this.view = view; this.listenOnAllIPs = listenOnAllIPs; initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize, quorumSaslAuthEnabled); // Starts listener thread that waits for connection requests //創建選舉監聽線程 接收選舉投票請求 listener = new Listener(); listener.setName("QuorumPeerListener"); } //QuorumPeer.createElectionAlgorithm protected Election createElectionAlgorithm(int electionAlgorithm){ Election le=null; //TODO: use a factory rather than a switch switch (electionAlgorithm) { case 0: le = new LeaderElection(this); break; case 1: le = new AuthFastLeaderElection(this); break; case 2: le = new AuthFastLeaderElection(this, true); break; case 3: qcm = createCnxnManager();// new QuorumCnxManager(... new Listener()) QuorumCnxManager.Listener listener = qcm.listener; if(listener != null){ listener.start();//啟動選舉監聽線程 FastLeaderElection fle = new FastLeaderElection(this, qcm); fle.start(); le = fle; } else { LOG.error("Null listener when initializing cnx manager"); } break; default: assert false; } return le;}
開啟選舉監聽線程QuorumCnxnManager.Listener;
創建ServerSockket等待大于自己sid節點連接,連接信息存儲到senderWorkerMap<sid,SendWorker>;
sid>self.sid才可以連接過來。
【Java】 //上面的listener.start()執行后,選擇此方法 public void run() { int numRetries = 0; InetSocketAddress addr; Socket client = null; while((!shutdown) && (numRetries < 3)){ try { ss = new ServerSocket(); ss.setReuseAddress(true); if (self.getQuorumListenOnAllIPs()) { int port = self.getElectionAddress().getPort(); addr = new InetSocketAddress(port); } else { // Resolve hostname for this server in case the // underlying ip address has changed. self.recreateSocketAddresses(self.getId()); addr = self.getElectionAddress(); } LOG.info("My election bind port: " + addr.toString()); setName(addr.toString()); ss.bind(addr); while (!shutdown) { client = ss.accept(); setSockOpts(client); LOG.info("Received connection request " + client.getRemoteSocketAddress()); // Receive and handle the connection request // asynchronously if the quorum sasl authentication is // enabled. This is required because sasl server // authentication process may take few seconds to finish, // this may delay next peer connection requests. if (quorumSaslAuthEnabled) { receiveConnectionAsync(client); } else { //接收連接信息 receiveConnection(client); } numRetries = 0; } } catch (IOException e) { if (shutdown) { break; } LOG.error("Exception while listening", e); numRetries++; try { ss.close(); Thread.sleep(1000); } catch (IOException ie) { LOG.error("Error closing server socket", ie); } catch (InterruptedException ie) { LOG.error("Interrupted while sleeping. " + "Ignoring exception", ie); } closeSocket(client); } } LOG.info("Leaving listener"); if (!shutdown) { LOG.error("As I'm leaving the listener thread, " + "I won't be able to participate in leader " + "election any longer: " + self.getElectionAddress()); } else if (ss != null) { // Clean up for shutdown. try { ss.close(); } catch (IOException ie) { // Don't log an error for shutdown. LOG.debug("Error closing server socket", ie); } } } //代碼執行路徑:receiveConnection()->handleConnection(...) private void handleConnection(Socket sock, DataInputStream din) throws IOException { //...省略 if (sid < self.getId()) { /* * This replica might still believe that the connection to sid is * up, so we have to shut down the workers before trying to open a * new connection. */ SendWorker sw = senderWorkerMap.get(sid); if (sw != null) { sw.finish(); } /* * Now we start a new connection */ LOG.debug("Create new connection to server: {}", sid); closeSocket(sock); if (electionAddr != null) { connectOne(sid, electionAddr); } else { connectOne(sid); } } else { // Otherwise start worker threads to receive data. SendWorker sw = new SendWorker(sock, sid); RecvWorker rw = new RecvWorker(sock, din, sid, sw); sw.setRecv(rw); SendWorker vsw = senderWorkerMap.get(sid); if (vsw != null) { vsw.finish(); } //存儲連接信息<sid,SendWorker> senderWorkerMap.put(sid, sw); queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY)); sw.start(); rw.start(); } }
創建FastLeaderElection快速選舉服務;
初始選票發送隊列sendqueue(第一層隊列)
初始選票接收隊列recvqueue(第一層隊列)
創建線程WorkerSender
創建線程WorkerReceiver
【Java】 //FastLeaderElection.starter private void starter(QuorumPeer self, QuorumCnxManager manager) { this.self = self; proposedLeader = -1; proposedZxid = -1; //發送隊列sendqueue(第一層隊列) sendqueue = new LinkedBlockingQueue<ToSend>(); //接收隊列recvqueue(第一層隊列) recvqueue = new LinkedBlockingQueue<Notification>(); this.messenger = new Messenger(manager); } //new Messenger(manager) Messenger(QuorumCnxManager manager) { //創建線程WorkerSender this.ws = new WorkerSender(manager); this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]"); this.wsThread.setDaemon(true); //創建線程WorkerReceiver this.wr = new WorkerReceiver(manager); this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]"); this.wrThread.setDaemon(true); }
開啟WorkerSender和WorkerReceiver線程。
WorkerSender線程自旋獲取sendqueue第一層隊列元素
sendqueue隊列元素內容為相關選票信息詳見ToSend類;
首先判斷選票sid是否和自己sid值相同,相等直接放入到recvQueue隊列中;
不相同將sendqueue隊列元素轉儲到queueSendMap<sid,queue>第二層傳輸隊列中。
【Java】//FastLeaderElection.Messenger.WorkerSenderclass WorkerSender extends ZooKeeperThread{ //... public void run() { while (!stop) { try { ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS); if(m == null) continue; //將投票信息發送出去 process(m); } catch (InterruptedException e) { break; } } LOG.info("WorkerSender is down"); } } //QuorumCnxManager#toSend public void toSend(Long sid, ByteBuffer b) { /* * If sending message to myself, then simply enqueue it (loopback). */ if (this.mySid == sid) { b.position(0); addToRecvQueue(new Message(b.duplicate(), sid)); /* * Otherwise send to the corresponding thread to send. */ } else { /* * Start a new connection if doesn't have one already. */ ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>( SEND_CAPACITY); ArrayBlockingQueue<ByteBuffer> oldq = queueSendMap.putIfAbsent(sid, bq); //轉儲到queueSendMap<sid,queue>第二層傳輸隊列中 if (oldq != null) { addToSendQueue(oldq, b); } else { addToSendQueue(bq, b); } connectOne(sid); } }
WorkerReceiver線程自旋獲取recvQueue第二層傳輸隊列元素轉存到recvqueue第一層隊列中。
【Java】 //WorkerReceiver public void run() { Message response; while (!stop) { // Sleeps on receive try { //自旋獲取recvQueue第二層傳輸隊列元素 response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS); if(response == null) continue; // The current protocol and two previous generations all send at least 28 bytes if (response.buffer.capacity() < 28) { LOG.error("Got a short response: " + response.buffer.capacity()); continue; } //... if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){ //第二層傳輸隊列元素轉存到recvqueue第一層隊列中 recvqueue.offer(n); //... } } //... }
啟動線程QuorumPeer
開始Leader選舉投票makeLEStrategy().lookForLeader();
sendNotifications()向其它節點發送選票信息,選票信息存儲到sendqueue隊列中。sendqueue隊列由WorkerSender線程處理。
【plain】 //QuorunPeer.run //... try { reconfigFlagClear(); if (shuttingDownLE) { shuttingDownLE = false; startLeaderElection(); } //makeLEStrategy().lookForLeader() 發送投票 setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { LOG.warn("Unexpected exception", e); setPeerState(ServerState.LOOKING); } //... //FastLeaderElection.lookLeader public Vote lookForLeader() throws InterruptedException { //... //向其他應用發送投票 sendNotifications(); //... } private void sendNotifications() { //獲取應用節點 for (long sid : self.getCurrentAndNextConfigVoters()) { QuorumVerifier qv = self.getQuorumVerifier(); ToSend notmsg = new ToSend(ToSend.mType.notification, proposedLeader, proposedZxid, logicalclock.get(), QuorumPeer.ServerState.LOOKING, sid, proposedEpoch, qv.toString().getBytes()); if(LOG.isDebugEnabled()){ LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" + Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) + " (n.round), " + sid + " (recipient), " + self.getId() + " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)"); } //儲存投票信息 sendqueue.offer(notmsg); } } class WorkerSender extends ZooKeeperThread { //... public void run() { while (!stop) { try { //提取已儲存的投票信息 ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS); if(m == null) continue; process(m); } catch (InterruptedException e) { break; } } LOG.info("WorkerSender is down"); } //... }
自旋recvqueue隊列元素獲取投票過來的選票信息:
【Java】 public Vote lookForLeader() throws InterruptedException { //... /* * Loop in which we exchange notifications until we find a leader */ while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){ /* * Remove next notification from queue, times out after 2 times * the termination time */ //提取投遞過來的選票信息 Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS); /* * Sends more notifications if haven't received enough. * Otherwise processes new notification. */ if(n == null){ if(manager.haveDelivered()){ //已全部連接成功,并且前一輪投票都完成,需要再次發起投票 sendNotifications(); } else { //如果未收到選票信息,manager.contentAll()自動連接其它socket節點 manager.connectAll(); } /* * Exponential backoff */ int tmpTimeOut = notTimeout*2; notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval); LOG.info("Notification time out: " + notTimeout); } //.... } //... }
【Java】 //manager.connectAll()->connectOne(sid)->initiateConnection(...)->startConnection(...) private boolean startConnection(Socket sock, Long sid) throws IOException { DataOutputStream dout = null; DataInputStream din = null; try { // Use BufferedOutputStream to reduce the number of IP packets. This is // important for x-DC scenarios. BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream()); dout = new DataOutputStream(buf); // Sending id and challenge // represents protocol version (in other words - message type) dout.writeLong(PROTOCOL_VERSION); dout.writeLong(self.getId()); String addr = self.getElectionAddress().getHostString() + ":" + self.getElectionAddress().getPort(); byte[] addr_bytes = addr.getBytes(); dout.writeInt(addr_bytes.length); dout.write(addr_bytes); dout.flush(); din = new DataInputStream( new BufferedInputStream(sock.getInputStream())); } catch (IOException e) { LOG.warn("Ignoring exception reading or writing challenge: ", e); closeSocket(sock); return false; } // authenticate learner QuorumPeer.QuorumServer qps = self.getVotingView().get(sid); if (qps != null) { // TODO - investigate why reconfig makes qps null. authLearner.authenticate(sock, qps.hostname); } // If lost the challenge, then drop the new connection //保證集群中所有節點之間只有一個通道連接 if (sid > self.getId()) { LOG.info("Have smaller server identifier, so dropping the " + "connection: (" + sid + ", " + self.getId() + ")"); closeSocket(sock); // Otherwise proceed with the connection } else { SendWorker sw = new SendWorker(sock, sid); RecvWorker rw = new RecvWorker(sock, din, sid, sw); sw.setRecv(rw); SendWorker vsw = senderWorkerMap.get(sid); if(vsw != null) vsw.finish(); senderWorkerMap.put(sid, sw); queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>( SEND_CAPACITY)); sw.start(); rw.start(); return true; } return false; }
如上述代碼中所示,sid>self.sid才可以創建連接Socket和SendWorker,RecvWorker線程,存儲到senderWorkerMap<sid,SendWorker>中。對應第2步中的sid<self.sid邏輯,保證集群中所有節點之間只有一個通道連接。
節點之間連接方式
【Java】 public Vote lookForLeader() throws InterruptedException { //... if (n.electionEpoch > logicalclock.get()) { //當前選舉周期小于選票周期,重置recvset選票池 //大于當前周期更新當前選票信息,再次發送投票 logicalclock.set(n.electionEpoch); recvset.clear(); if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); } else { updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications(); } else if (n.electionEpoch < logicalclock.get()) { if(LOG.isDebugEnabled()){ LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) + ", logicalclock=0x" + Long.toHexString(logicalclock.get())); } break; } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {//相同選舉周期 //接收的選票與當前選票PK成功后,替換當前選票 updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); } //... }
在上代碼中,自旋從recvqueue隊列中獲取到選票信息。開始進行選舉:
判斷當前選票和接收過來的選票周期是否一致
大于當前周期更新當前選票信息,再次發送投票
周期相等:當前選票信息和接收的選票信息進行PK
【Java】 //接收的選票與當前選票PK protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) { LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" + Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid)); if(self.getQuorumVerifier().getWeight(newId) == 0){ return false; } /* * We return true if one of the following three cases hold: * 1- New epoch is higher * 2- New epoch is the same as current epoch, but new zxid is higher * 3- New epoch is the same as current epoch, new zxid is the same * as current zxid, but server id is higher. */ return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));wId > curId))))); }
在上述代碼中的totalOrderPredicate方法邏輯如下:
競選周期大于當前周期為true
競選周期相等,競選zxid大于當前zxid為true
競選周期相等,競選zxid等于當前zxid,競選sid大于當前sid為true
經過上述條件判斷為true將當前選票信息替換為競選成功的選票,同時再次將新的選票投出去。
【Java】 public Vote lookForLeader() throws InterruptedException { //... //存儲節點對應的選票信息 // key:選票來源sid value:選票推舉的Leader sid recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); //半數選舉開始 if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch))) { // Verify if there is any change in the proposed leader while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){ recvqueue.put(n); break; } } /*WorkerSender * This predicate is true once we don't read any new * relevant message from the reception queue */ if (n == null) { //已選舉出leader 更新當前節點是否為leader self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(proposedLeader, proposedZxid, proposedEpoch); leaveInstance(endVote); return endVote; } } //... } /** * Termination predicate. Given a set of votes, determines if have * sufficient to declare the end of the election round. * * @param votes * Set of votes * @param vote * Identifier of the vote received last PK后的選票 */ private boolean termPredicate(HashMap<Long, Vote> votes, Vote vote) { SyncedLearnerTracker voteSet = new SyncedLearnerTracker(); voteSet.addQuorumVerifier(self.getQuorumVerifier()); if (self.getLastSeenQuorumVerifier() != null && self.getLastSeenQuorumVerifier().getVersion() > self .getQuorumVerifier().getVersion()) { voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier()); } /* * First make the views consistent. Sometimes peers will have different * zxids for a server depending on timing. */ //votes 來源于recvset 存儲各個節點推舉出來的選票信息 for (Map.Entry<Long, Vote> entry : votes.entrySet()) { //選舉出的sid和其它節點選擇的sid相同存儲到voteSet變量中。 if (vote.equals(entry.getValue())) { //保存推舉出來的sid voteSet.addAck(entry.getKey()); } } //判斷選舉出來的選票數量是否過半 return voteSet.hasAllQuorums(); } //QuorumMaj#containsQuorum public boolean containsQuorum(Set<Long> ackSet) { return (ackSet.size() > half); }
在上述代碼中:recvset是存儲每個sid推舉的選票信息。
第一輪 sid1:vote(1,0,1) ,sid2:vote(2,0,1);
第二輪 sid1:vote(2,0,1) ,sid2:vote(2,0,1)。
最終經過選舉信息vote(2,0,1)為推薦leader,并用推薦leader在recvset選票池里比對持相同票數量為2個。因為總共有3個節點參與選舉,sid1和sid2都選舉sid2為leader,滿足票數過半要求,故確認sid2為leader。
setPeerState更新當前節點角色;
proposedLeader選舉出來的sid和自己sid相等,設置為Leader;
上述條件不相等,設置為Follower或Observing;
更新currentVote當前選票為Leader的選票vote(2,0,1)。
到此,相信大家對“zookeeper的Leader選舉機制是什么”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。