您好,登錄后才能下訂單哦!
選舉的父接口為Election,其定義了lookForLeader和shutdown兩個方法,lookForLeader表示尋找Leader,shutdown則表示關閉,如關閉服務端之間的連接。
1、AuthFastLeaderElection,同FastLeaderElection算法基本一致,只是在消息中加入了認證信息,在3.4.0版本后已經被棄用。
2、FastLeaderElection,是標準的fast paxos算法的實現,基于TCP協議進行選舉。
3、LeaderElection,在3.4.0版本后已經被棄用。
Notification表示收到的選舉投票信息(其他服務器發來的選舉投票信息),其包含了投票中被選舉者的服務器sid、zxid、選舉周期epoch,選舉者的服務器sid,狀態,選周期epoch
static public class Notification {
/*
* Format version, introduced in 3.4.6
*/
public final static int CURRENTVERSION = 0x2;
int version;
/*
* Proposed leader 被選舉者的服務器id
*/
long leader;
/*
* zxid of the proposed leader 被選舉者的事務zxid
*/
long zxid;
/*
* Epoch 選舉者的選舉周期
*/
long electionEpoch;
/*
* current state of sender 選舉者的節點狀態
* 總共有4中
* LOOKING 尋找leader狀態
* FOLLOWING 跟隨者
* LEADING leader狀態
*OBSERVING 不參與操作和選舉
*/
QuorumPeer.ServerState state;
/*
* Address of sender 選舉者的服務器id
*/
long sid;
QuorumVerifier qv;
/*
* epoch of the proposed leader 被選舉者的選舉周期
*/
long peerEpoch;
}
ToSend表示發送給其他服務器的選舉投票信息,也包含了被選舉者的sid、zxid、選舉周期等信息。
static public class ToSend {
static enum mType {crequest, challenge, notification, ack}
ToSend(mType type,
long leader,
long zxid,
long electionEpoch,
ServerState state,
long sid,
long peerEpoch,
byte[] configData) {
this.leader = leader;
this.zxid = zxid;
this.electionEpoch = electionEpoch;
this.state = state;
this.sid = sid;
this.peerEpoch = peerEpoch;
this.configData = configData;
}
/*
* Proposed leader in the case of notification 被推舉的leader的sid
*/
long leader;
/*
* id contains the tag for acks, and zxid for notifications
* 被推舉的leader的最大事務id
*/
long zxid;
/*
* Epoch 選舉者的選舉周期
*/
long electionEpoch;
/*
* Current state; 選舉者的節點狀態
*/
QuorumPeer.ServerState state;
/*
* Address of recipient選舉者的服務器sid
*/
long sid;
/*
* Used to send a QuorumVerifier (configuration info)
*/
byte[] configData = dummyData;
/*
* Leader epoch 被選舉者的選舉周期
*/
long peerEpoch;
}
Messenger包含了WorkerReceiver和WorkerSender兩個內部類
1、WorkerReceiver繼承了ZooKeeperThread,是選票接收器。
2、其會不斷地從QuorumCnxManager中的recvQueue獲取其他服務器發來的選舉消息,類型是Message
WorkerReceiver(QuorumCnxManager manager) {
super("WorkerReceiver");
this.stop = false;
this.manager = manager;
}
//從QuorumCnxManager中的recvQueue中獲取投票消息
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
if(response == null) continue;
并將其轉換成一個選票消息Notification,然后保存到recvqueue中,在選票接收過程中,如果發現該外部選票的選舉輪次小于當前服務器的,那么忽略該外部投票,同時立即發送自己的內部投票,把投票信息組裝成ToSend加入到sendqueue隊列。
ToSend notmsg = new ToSend(ToSend.mType.notification,
v.getId(),
v.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
v.getPeerEpoch(),
qv.toString().getBytes());
sendqueue.offer(notmsg);
3、WorkerSender也繼承了ZooKeeperThread,為選票發送器,其會不斷地從sendqueue中獲取待發送的選票
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
并將其傳遞到底層QuorumCnxManager中,其過程是將FastLeaderElection的ToSend轉化為QuorumCnxManager的Message。
void process(ToSend m) {
ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
m.leader,
m.zxid,
m.electionEpoch,
m.peerEpoch,
m.configData);
manager.toSend(m.sid, requestBuffer);
}
Messenger(QuorumCnxManager manager) {
//創建WorkerSender
this.ws = new WorkerSender(manager);
// 新創建線程
this.wsThread = new Thread(this.ws,
"WorkerSender[myid=" + self.getId() + "]");
// 設置為守護線程
this.wsThread.setDaemon(true);
// 創建WorkerReceiver
this.wr = new WorkerReceiver(manager);
// 新創建線程
this.wrThread = new Thread(this.wr,
"WorkerReceiver[myid=" + self.getId() + "]");
// 設置為守護線程
this.wrThread.setDaemon(true);
}
// 完成Leader選舉之后需要等待時長
final static int finalizeWait = 200;
// 兩個連續通知檢查之間的最大時長
final static int maxNotificationInterval = 60000;
// 管理服務器之間的連接
QuorumCnxManager manager;
// 選票發送隊列,用于保存待發送的選票
LinkedBlockingQueue<ToSend> sendqueue;
// 選票接收隊列,用于保存接收到的外部投票
LinkedBlockingQueue<Notification> recvqueue;
//投票者
QuorumPeer self;
Messenger messenger;
//邏輯始終,當前選舉周期
AtomicLong logicalclock = new AtomicLong(); /* Election instance */
//被選舉者服務器sid
long proposedLeader;
//被選舉者服務器zxid
long proposedZxid;
//被選舉者服務器選舉周期
long proposedEpoch;
其會遍歷所有的參與者投票集合,然后將自己的選票信息發送至上述所有的投票者集合,其并非同步發送,而是將ToSend消息放置于sendqueue中,之后由WorkerSender進行發送
private void sendNotifications() {
for (long sid : self.getCurrentAndNextConfigVoters()) {
QuorumVerifier qv = self.getQuorumVerifier();
// 構造發送消息
ToSend notmsg = new ToSend(ToSend.mType.notification,
proposedLeader,
proposedZxid,
logicalclock.get(),
QuorumPeer.ServerState.LOOKING,
sid,
proposedEpoch, qv.toString().getBytes());
if(LOG.isDebugEnabled()){
LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" +
Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) +
" (n.round), " + sid + " (recipient), " + self.getId() +
" (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
}
// 將發送消息放置于隊列
sendqueue.offer(notmsg);
}
}
該函數將接收的投票與自身投票進行PK,查看是否消息中包含的服務器id是否更優,其按照epoch、zxid、id的優先級進行PK。
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
if(self.getQuorumVerifier().getWeight(newId) == 0){
return false;
}
/*
* We return true if one of the following three cases hold:
* 1- New epoch is higher
* 2- New epoch is the same as current epoch, but new zxid is higher
* 3- New epoch is the same as current epoch, new zxid is the same
* as current zxid, but server id is higher.
*/
// 1. 判斷消息里的epoch是不是比當前的大,如果大則消息中id對應的服務器就是leader
// 2. 如果epoch相等則判斷zxid,如果消息里的zxid大,則消息中id對應的服務器就是leader
// 3. 如果前面兩個都相等那就比較服務器id,如果大,則其就是leader
return ((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}
該函數用于判斷Leader選舉是否結束,即是否有一半以上的服務器選出了相同的Leader,其過程是將收到的選票與當前選票進行對比,選票相同的放入同一個集合,之后判斷選票相同的集合是否超過了半數。
protected boolean termPredicate(Map<Long, Vote> votes, Vote vote) {
SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
voteSet.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier() != null
&& self.getLastSeenQuorumVerifier().getVersion() > self
.getQuorumVerifier().getVersion()) {
voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
/*
* First make the views consistent. Sometimes peers will have different
* zxids for a server depending on timing.
*/
for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
if (vote.equals(entry.getValue())) {
voteSet.addAck(entry.getKey());
}
}
return voteSet.hasAllQuorums();
}
1、該函數用于開始新一輪的Leader選舉,其首先會將邏輯時鐘自增,然后更新本服務器的選票信息(初始化選票),之后將選票信息放入sendqueue等待發送給其他服務器
2、每臺服務器會不斷地從recvqueue隊列中獲取外部選票,處理外部選票。
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
3、判斷選舉輪次,選票PK,更新選票
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
sendNotifications();
4、歸檔選票,統計選票,返回最后的選票
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
//設置leading狀態,否則設置為flowing
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
//最終選票
Vote endVote = new Vote(proposedLeader,
proposedZxid, logicalclock.get(),
proposedEpoch);
// 清空recvqueue隊列的選票
leaveInstance(endVote);
return endVote;
}
FastLeaderElection的算法,其是ZooKeeper的核心部分,比較復雜,梳理了一下大概的流程,好多細節沒有展開。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。