您好,登錄后才能下訂單哦!
本篇內容介紹了“Raft分布式一致性算法怎么實現”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
CAP定理
Consistency:一致性
Availability:可用性
Partition-tolerance:分區容錯性
CAP定理指出,在異步網絡模型中,不存在一個系統可以同時滿足上述3個屬性。換句話說,分布式系統必須舍棄其中的一個屬性。對于需要在分布式條件下運行的系統來說,如何在一致性、可用性和分區容錯性中取舍,或者說要弱化哪一個屬性,是首先要考慮的問題。
對于高可用性的系統來說,往往會保留強一致性。但對于強一致性的系統來說,有一類專門解決這種問題的算法——共識算法。"共識"的意思是保證所有的參與者都有相同的認知(可以理解為強一致性)。共識算法本身可以依據是否有惡意節點分為兩類,大部分時候共識算法指的是沒有惡意節點的那一類,即系統中的節點不會向其他節點發送惡意請求,比如欺騙請求。共識算法中最有名的是Paxos算法。其次是Raft和ZAB算法(Zookeeper中的實現)
Raft核心算法
Raft算法的核心是選舉和日志復制。
當多臺服務器同時對外服務時,服務器如何同步變更成了一個問題。一般是采用主從模型,即一個主服務器(Leader),多個從服務器(Follower),所有請求都通過Leader服務器處理,Follower服務器只負責備份數據。但假設Leader服務器宕機了,那么Follower服務器中哪個服務器成為新的Leader服務器呢?理論上所有的Follower服務器的副本數據應該是和Leader服務器一致的,但是由于數據延遲、發送順序不一致等問題,導致某個時刻每個Follower服務器擁有的數據有可能不一樣。由此產生的問題需要從以下兩方面進行處理。
使用日志寫入,而不是直接修改,保證到Follower服務器的同步請求有序而且能夠重新計算當前狀態,也就是日志狀態機模型。
寫入時,過半服務器寫入成功才算整體成功,也就是Quorum機制。
日志狀態機模型
日志索引 | 操作 | 當前狀態 |
---|---|---|
1 | X = 1 | {X:1} |
2 | Y = 2 | {X:1,Y:2} |
3 | X = 3 | {X:3,Y:2} |
4 | Z = 4 | {X:3,Y:2,Z:4} |
在狀態機模型中,日志從上往下不斷追加,當前狀態的任何時間點都可以從索引為1的日志開始計算。有了狀態機模型后,分布式一致性的問題就轉換成了如何保證所有參與的節點按照同一順序寫入的問題。
基于Quorum機制的寫入
在一些master/slave模式中,有些master并不關心slave的復制進度。master只負責不斷寫入自己的日志,通過某些傳輸方式把變更同步給slave服務器。而在一些嚴格的全量復制中,當所有的slave服務器全部同步之后,master服務器才會繼續寫入。主從復制在master服務器宕機之后數據會丟失,而全量復制則性能非常差。相比之下,過半寫入的Quorum機制既可以減少數據丟失的風險,性能也不會太差。
現在假設有3臺服務器,節點A、B、C。此時正在向這三臺服務器寫入值,此時節點A的值是2(最新值),而節點B和C的值都是舊值1.此時當客戶端向這個集群取值的時候,如果讀取任意兩個節點的數據,客戶端讀取到的數據版本有以下可能。
節點A和B:2與1
節點A和C:2與1
節點B和C:1與1
此時我們可以看到,當讀取到B和C的時候,客戶端沒有讀取到最新數據。
此時B節點也寫入了新值2,此時我們稱為過半寫入完成。
當客戶端向這個集群任意兩個節點取值的時候。
節點A和B:2與2
節點A和C:2與1
節點B和C:2與1
由以上結果我們可以看到,當過半寫入的時候,無論哪一種情況,客戶端都能讀取到最新的值。對于master/slave或者leader/follower模型的分布式系統來說,客戶端并不能直接訪問所有節點,但是對于系統內的服務器節點來說,可以通過比較各自持有的日志來決定誰成為新的Leader節點,在此過程中,過半寫入的數據往往是有效的數據。
基于日志比較的選舉
假設Leader節點宕機,那么如何從剩下的服務器節點中選舉新的Leader節點呢?一般情況下,肯定是希望選擇擁有最新數據的節點。
理論上,這個擁有最新數據的節點應該有過半節點的支持,也就是說,集群中超過半數的節點(包括這個擁有最新數據的節點自身)的數據不會比這個節點更新。如果不滿足這個條件,集群中可能出現"腦裂"現象,比如幾個節點擁護一個Leader節點,而另外幾個節點擁護另一個Leader節點。
對于如何判斷誰的數據更新,可以通過比較來自其他節點的投票請求中的日志索引和自己本地的日志索引來確定。如果自己本地的日志索引比較大,則不支持對方,否則就支持。
根據這個規則,如果三個節點的日志索引都是2,則A會支持B和C以及自己,其他節點相同,每個節點都是三票。為了減少同時成為Leader節點的概率,要求節點不能重復投票,即每個節點只能投一票。
編號 | A | A票數 | B | B票數 | C | C票數 | Leader選出 |
---|---|---|---|---|---|---|---|
1 | 自薦 | 3 | 投票給A | 0 | 投票給A | 0 | A |
2 | 投票給B | 0 | 自薦 | 3 | 投票給B | 0 | B |
3 | 投票給C | 0 | 投票給C | 0 | 自薦 | 3 | C |
4 | 自薦 | 2 | 自薦 | 1 | 投票給A | 0 | A |
5 | 自薦 | 1 | 自薦 | 2 | 投票給B | 0 | B |
6 | 投票給B | 0 | 自薦 | 2 | 自薦 | 1 | B |
7 | 投票給C | 0 | 自薦 | 1 | 自薦 | 2 | C |
8 | 自薦 | 2 | 投票給A | 0 | 自薦 | 1 | A |
9 | 自薦 | 1 | 投票給C | 0 | 自薦 | 2 | C |
10 | 自薦 | 1 | 自薦 | 1 | 自薦 | 1 |
從以上結果可以看出,除了全部自薦,必會有一個節點被選舉出來成為Leader。
對于N個節點的集群(N>0),假設有M個節點成為Leader節點,那么這些節點都需要有過半的支持票,則總票數為
M * N過半
當節點數為奇數時,N過半為(N + 1) / 2
當節點數為偶數時,N過半為N / 2 + 1
而 M * N過半 <= N
要滿足該式成立,M(Leader節點數)為1,N過半<= N成立,而M為2的時候
當節點數為奇數時,2 * (N + 1) / 2 = N + 1,而N + 1 <= N是不滿足的
當節點數為偶數時,2 * (N / 2 + 1) = N + 2,而N + 2 <= N也是不滿足的
以此類推,M >= 2的時候,M * N過半 <= N都是不滿足的。
因此最多只能選出1個Leader節點。
Raft算法中的選舉
在Raft算法中,節點有3個角色
Leader
Candidate(Leader候選人)
Follower
在整個集群穩定狀態下,Leader節點為一個,它會通過心跳消息與各個Follower節點保持聯系。
包括心跳消息在內,Raft算法中使用的消息類型有以下兩種。
RequestVote,即請求其他節點給自己投票,一般由Candidate節點發出。
AppendEntries,用于日志復制,增加條目,在增加日志條目數量為0時作為心跳信息,一般只由Leader節點發出。
邏輯時鐘term
為了避免服務器時間不一致,系統也可以安全地推進邏輯時間,Raft算法中的選舉有一個整形的term參數。這是一個邏輯時鐘值,全局遞增。它是Lamport Timestamp算法的一個變體。
當多個進程要維護一個全局時間,首先要讓每個進程本地有一個全局時間的副本。Lamport Timestamp算法的流程如下
每個進程在事件發生時遞增自己本地的時間副本(加1)。
當進程發送消息時,帶上自己本地的時間副本。
當進程收到消息時,比較消息中的時間值和自己本地的時間副本,選擇比較大的時間值加1,并更新自己的時間副本。
選舉中的term和角色遷移
Raft算法中主要使用term作為Leader節點的任期號,term是一個遞增的參數。
在選舉的過程中,節點的角色會有所變化,Raft算法中的角色遷移如上圖所示。步驟如下
系統啟動時,所有節點都是Follower節點。
當沒有收到來自Leader節點心跳消息時,即心跳超時,Follower節點變成Candidate節點,即自薦成為選舉的候選人。
Candidate節點收到過半的支持后,變成Leader節點。
出現Leader節點以后,Leader節點會發送心跳消息給其他節點,防止其他節點從Follower節點變成Candidate節點。
在上面的第3步中,如果Candidate節點都沒有得到過半支持,無法選出Leader節點,此時Candidate節點選舉超時,進入下一輪選舉。
票數對半的現象在Raft算法中被稱為split vote(分割選舉),在偶數個節點的集群中有可能發生。Raft算法使用隨機選舉超時來降低split vote出現的概率。
選舉超時
如果一個偶數節點的集群總是出現分割選舉,比如上面的4個節點,最壞情況下是無限次2票對2票,那就無法正常選出Leader節點了。Raft算法中錯開了Follower成為Candidate的時間點,提高了選出Leader節點的概率。
在選出Leader節點之后,各個節點需要在最短的時間內獲取新Leader節點的信息,否則選舉超時又會進入一輪選舉,即要求心跳消息間隔遠遠小于最小選舉間隔。
節點的選舉超時時間在收到心跳消息后會重置。如果不重置,節點會頻繁發起選舉,系統難以收斂于穩定狀態。
假設選舉超時時間間隔為3-4秒,心跳間隔1秒,則節點會以類似于下面的方式不斷修改實際選舉超時時間。
節點以Follower角色啟動,隨機選擇選舉超時時間為3.3秒,即3.3秒后系統會發起選舉。
節點啟動1秒后,收到來自Leader節點的心跳消息,節點重新隨機選擇一個選舉超時時間(假設是3.4秒),并修改下一次選舉時間為現在時間的3.4秒后。
節點啟動2秒后,再次收到來自Leader節點的心跳消息,節點再次隨機選擇一個選舉超時時間(假設是4秒),并修改下一次選舉時間為現在時間的4秒后。
只要Leader持續不斷地發送心跳消息,Follower節點就不會成為Candidate角色并發起選舉。
Raft算法中的日志復制
所有來自客戶端的數據變更請求都會被當作一個日志條目追加到節點日志中。日志條目分為以下兩種狀態
已追加但是尚未持久化
已持久化
Raft算法中的節點會維護一個已持久化的日志條目索引——commitIndex。小于等于commitIndex的日志條目被認為是已提交,或者說是有效的日志條目(已持久化),否則就是尚未持久化的數據。在系統啟動時commitIndex為0.
復制進度
為了跟蹤各節點的復制進度,Leader負責記錄各個節點的nextIndex(下一個需要復制日志條目的索引)和matchIndex(已匹配日志索引)。
選出Leader節點后,Leader節點會重置各節點的nextIndex和matchIndex。matchIndex設為0,nextIndex設置為Leader節點的下一條日志條目的索引,通過和各節點之間發送AppendEntries消息來更新nextIndex和matchIndex。當系統達到穩定狀態時,Leader跟蹤的各個節點的matchIndex與Leader的commitIndex一致,nextIndex與Leader節點的下一條日志的索引一致。
當客戶端向Leader節點發送數據變更請求時,Leader節點會先向自己的日志中加一條日志,但是不提交(不增加commitIndex)。此時Leader節點通過AppendEntries消息向其他節點同步數據,消息包含了最新追加的日志。當超過半數節點(包含Leader節點自己)追加新日志成功之后,Leader節點會持久化日志并推進commitIndex,然后再次通過AppendEntries消息通知其他節點持久化日志。AppendEntries消息除了包含需要復制的日志條目外,還有Leader節點最新的commitIndex。Follower節點參考Leader節點的commitIndex推進自己的commitIndex,也就是持久化日志。如果追加日志成功的節點沒有過半,Leader節點不會推進自己的commitIndex,也不會要求其他節點推進commitIndex。在Leader節點推進commitIndex的同時,狀態機執行日志中的命令,并把計算后的結果返回客戶端。雖然在上圖中,Follower節點都持久化完成后才開始計算結果,但實際上Raft算法允許Follower的日志持久化和狀態機應用日志同時進行。換句話說,只要節點的commitIndex推進了,那么表示狀態機應用哪條日志的lastApplied也可以同時推進。
假如不確認過半追加,碰到"腦裂"或者網絡分區的情況下,會出現嚴重不一致問題。
以5個服務器節點的系統為例,5個節點分別為A、B、C、D、E
一開始Leader節點為A,其他節點都是Follower.
在某個時間點,A、B兩個節點與C、D、E 3個節點產生網絡分區。網絡分區時,節點A無法與節點B以外的節點通信。
節點B依舊接收得到A的心跳消息,所以不會變成Candidate。
節點C、D、E 收不到來自節點A的心跳消息,進行了選舉,假設C節點成為了新的Leader。
客戶端連接節點A和C分別寫入,因為Leader節點并不確認過半寫入,所以會導致節點A和C各自增加不同的日志。
當網絡分區恢復時,由于分區內節點A、B和分區內節點C、D、E 各自的日志沖突,因此無法合并。
但如果上述過程中,Leader節點確認過半追加后再推進commitIndex,節點A不會持久化日志,并且在網絡分區恢復后,分區內節點C、D、E 的日志可以正確復制到分區節點A、B 上,保證數據一致性。
現在我們來設計相關的類
為了保證節點的唯一性,我們有一個節點的ID
/** * 節點ID */@AllArgsConstructor@Getterpublic class NodeId implements Serializable {//節點的ID值,一經確定不可改變 //可以簡單為A、B、C.... @NonNull private final String value; public static NodeId of(String value) {return new NodeId(value); }@Override public boolean equals(Object o) {if (this == o) return true; if (o == null || !(o instanceof NodeId)) return false; NodeId nodeId = (NodeId) o; return value.equals(nodeId.value); }@Override public int hashCode() {return value.hashCode(); }@Override public String toString() {return value; } }
集群成員表
/** * 集群成員表 */public class NodeGroup {//當前節點ID private NodeId selfId; //成員映射表 private Map<NodeId,GroupMember> memberMap; /** * 單節點構造函數 * @param endpoint */ public NodeGroup(NodeEndpoint endpoint) {this(Collections.singleton(endpoint),endpoint.getId()); }/** * 多節點構造函數 * @param endpoints * @param selfId */ public NodeGroup(Collection<NodeEndpoint> endpoints,NodeId selfId) {this.memberMap = buildMemberMap(endpoints); this.selfId = selfId; }/** * 從節點列表中構造成員映射表 * @param endpoints * @return */ private Map<NodeId,GroupMember> buildMemberMap(Collection<NodeEndpoint> endpoints) { Map<NodeId,GroupMember> map = new HashMap<>(); endpoints.stream().forEach(endpoint -> map.put(endpoint.getId(),new GroupMember(endpoint))); if (map.isEmpty()) {throw new IllegalArgumentException("endpoints is empty"); }return map; }/** * 重置其他節點的復制進度 * @param nextLogIndex */ public void resetReplicatingStates(int nextLogIndex) {memberMap.values().stream() .filter(member -> !member.idEquals(selfId)) .forEach(member -> member.setReplicatingState(new ReplicatingState(nextLogIndex))); }/** * 按照節點ID查找成員,找不到時返回空 * @param id * @return */ public GroupMember getMember(NodeId id) {return memberMap.get(id); }/** * 按照節點呢ID查找成員,找不到時拋出異常 * @param id * @return */ public GroupMember findMember(NodeId id) { GroupMember member = getMember(id); if (member == null) {throw new IllegalArgumentException("no such node " + id); }return member; }/** * 獲取主要節點的數量 * @return */ public int getCountOfMajor() {return (int) memberMap.values().stream().filter(GroupMember::isMajor).count(); } /** * 列出日志復制的對象節點,獲取除自己以外的所有節點 * @return */ public Collection<GroupMember> listReplicationTarget() {return memberMap.values().stream() .filter(m -> !m.idEquals(selfId)) .collect(Collectors.toList()); }/** * 獲取當前節點之外的其他節點 * @return */ public Set<NodeEndpoint> listEndpointExceptSelf() { Set<NodeEndpoint> endpoints = new HashSet<>(); memberMap.values().stream() .filter(member -> !member.idEquals(selfId)) .forEach(member -> endpoints.add(member.getEndpoint())); return endpoints; } }
集群成員
服務器地址
/** * 服務器地址 */@AllArgsConstructor@Getter@ToStringpublic class Address {@NonNull private final String host; //IP private final int port; //端口}
/** * 連接節點 */@AllArgsConstructor@Getterpublic class NodeEndpoint {@NonNull private final NodeId id; @NonNull private final Address address; public NodeEndpoint(NodeId id,String host,int port) {this(id,new Address(host,port)); } }
/** * 集群成員 */@AllArgsConstructorpublic class GroupMember {//連接節點 @Getter private final NodeEndpoint endpoint; //復制進度 @Setter private ReplicatingState replicatingState; //是否主要成員 @Getter @Setter private boolean major; public GroupMember(NodeEndpoint endpoint) {this(endpoint,null,true); }//獲取下一條日志索引 public int getNextIndex() {return ensureReplicatingState().getNextIndex(); }//獲取匹配日志索引 public int getMatchIndex() {return ensureReplicatingState().getMatchIndex(); }/** * 獲取復制進度 * @return */ private ReplicatingState ensureReplicatingState() {if (replicatingState == null) {throw new IllegalArgumentException("replicating state not set"); }return replicatingState; }/** * 判斷是否同一個連接節點 * @param id * @return */ public boolean idEquals(NodeId id) {return endpoint.getId().equals(id); } }
/** * 日志復制進度 */@ToStringpublic class ReplicatingState {//下一個需要復制日志條目的索引 @Getter private int nextIndex; //匹配的日志條目索引 @Getter private int matchIndex; //是否開始復制 @Getter @Setter private boolean replicating = false; //最后復制的位置 @Getter @Setter private long lastReplicatedAt = 0; public ReplicatingState(int nextIndex,int matchIndex) {this.nextIndex = nextIndex; this.matchIndex = matchIndex; }public ReplicatingState(int nextIndex) {this(nextIndex,0); }/** * 回退 * @return */ public boolean backOffNextIndex() {if (nextIndex > 1) {nextIndex--; return true; }return false; }/** * 建議是否推進索引 * @param lastEntryIndex * @return */ public boolean advice(int lastEntryIndex) {boolean result = matchIndex != lastEntryIndex || nextIndex != lastEntryIndex + 1; matchIndex = lastEntryIndex; nextIndex = lastEntryIndex + 1; return result; } }
選舉實現
節點角色
public enum RoleName {FOLLOWER, //從節點 CANDIDATE, //選舉節點 LEADER //主節點}
各節點角色的統一抽象類
@AllArgsConstructor@Getterpublic abstract class AbstractNodeRole {//節點角色 private final RoleName name; //選舉周期 protected final int term; /** * 取消每個角色對應的選舉超時或者日志復制定時任務 */ public abstract void cancelTimeoutOrTask(); public abstract NodeId getLeaderId(NodeId selfId);}
選舉超時類
/** * 選舉超時 */@RequiredArgsConstructorpublic class ElectionTimeout {//定時任務結果 private final ScheduledFuture<?> scheduledFuture; public static final ElectionTimeout NONE = new ElectionTimeout(new NullScheduledFuture()); /** * 取消選舉 */ public void cancel() {scheduledFuture.cancel(false); }@Override public String toString() {if (scheduledFuture.isCancelled()) {return "ElectionTimeout(state=cancelled)"; }if (scheduledFuture.isDone()) {return "ElectionTimeout(state=done)"; }return "ElectionTimeout(delay=" + scheduledFuture.getDelay(TimeUnit.MILLISECONDS) + "ms)"; } }
從節點角色
/** * 從節點角色 */@ToStringpublic class FollowerNodeRole extends AbstractNodeRole {//投過票的節點 @Getter private final NodeId votedFor; //當前主節點 @Getter private final NodeId leaderId; //選舉超時 private final ElectionTimeout electionTimeout; public FollowerNodeRole(int term,NodeId votedFor,NodeId leaderId,ElectionTimeout electionTimeout) {super(RoleName.FOLLOWER, term); this.votedFor = votedFor; this.leaderId = leaderId; this.electionTimeout = electionTimeout; }@Override public void cancelTimeoutOrTask() {electionTimeout.cancel(); }@Override public NodeId getLeaderId(NodeId selfId) {return leaderId; } }
選舉節點角色
/** * 選舉節點角色 */@ToStringpublic class CandidateNodeRole extends AbstractNodeRole{//票數 @Getter private final int votesCount; //選舉超時 private final ElectionTimeout electionTimeout; /** * 增加任意票構造 * @param term * @param votesCount * @param electionTimeout */ public CandidateNodeRole(int term,int votesCount,ElectionTimeout electionTimeout) {super(RoleName.CANDIDATE, term); this.votesCount = votesCount; this.electionTimeout = electionTimeout; }/** * 增加1票構造 * @param term * @param electionTimeout */ public CandidateNodeRole(int term,ElectionTimeout electionTimeout) {this(term,1,electionTimeout); }/** * 票數+1 * @param electionTimeout * @return */ public CandidateNodeRole increaseVotesCount(ElectionTimeout electionTimeout) {this.electionTimeout.cancel(); return new CandidateNodeRole(term,votesCount + 1,electionTimeout); }@Override public void cancelTimeoutOrTask() {electionTimeout.cancel(); }@Override public NodeId getLeaderId(NodeId selfId) {return null; } }
主節點角色
/** * 日志復制任務 */@Slf4j@RequiredArgsConstructorpublic class LogReplicationTask {private final ScheduledFuture<?> scheduledFuture; public static final LogReplicationTask NONE = new LogReplicationTask(new NullScheduledFuture()); public void cancel() {log.debug("cancel log replication task"); scheduledFuture.cancel(false); }@Override public String toString() {return "LogReplicationTask{delay=" + scheduledFuture.getDelay(TimeUnit.MILLISECONDS) + "}"; } }
/** * 主節點角色 */@ToStringpublic class LeaderNodeRole extends AbstractNodeRole {//日志復制任務 private final LogReplicationTask logReplicationTask; public LeaderNodeRole(int term,LogReplicationTask logReplicationTask) {super(RoleName.LEADER, term); this.logReplicationTask = logReplicationTask; }@Override public void cancelTimeoutOrTask() {logReplicationTask.cancel(); }@Override public NodeId getLeaderId(NodeId selfId) {return selfId; } }
定時調度接口
/** * 定時器 */public interface Scheduler {/** * 創建日志復制定時任務 * @param task * @return */ LogReplicationTask scheduleLogReplicationTask(Runnable task); /** * 創建選舉超時器 * @param task * @return */ ElectionTimeout scheduleElectionTimeout(Runnable task); /** * 關閉定時器 * @throws InterruptedException */ void stop() throws InterruptedException;}
定時調度接口實現類
/** * 節點配置類 */@Datapublic class NodeConfig {//最小選舉超時時間 private int minElectionTimeout = 3000; //最大選舉超時時間內 private int maxElectionTimeout = 4000; //初次日志復制延遲時間 private int logReplicationDelay = 0; //日志復制間隔 private int logReplicationInterval = 1000;}
@Slf4jpublic class DefaultScheduler implements Scheduler {//最小選舉超時時間 private final int minElectionTimeout; //最大選舉超時時間內 private final int maxElectionTimeout; //初次日志復制延遲時間 private final int logReplicationDelay; //日志復制間隔 private final int logReplicationInterval; //隨機數生成器 private final Random electionTimeoutRandom; //定時任務線程池 private final ScheduledExecutorService scheduledExecutorService; public DefaultScheduler(int minElectionTimeout,int maxElectionTimeout,int logReplicationDelay, int logReplicationInterval) {if (minElectionTimeout <= 0 || maxElectionTimeout <= 0 || minElectionTimeout > maxElectionTimeout) {throw new IllegalArgumentException("election timeout should not be 0 or min > max"); }if (logReplicationDelay < 0 || logReplicationInterval <= 0) {throw new IllegalArgumentException("log replication delay < 0 or log replication interval <= 0"); }this.minElectionTimeout = minElectionTimeout; this.maxElectionTimeout = maxElectionTimeout; this.logReplicationDelay = logReplicationDelay; this.logReplicationInterval = logReplicationInterval; electionTimeoutRandom = new Random(); scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r,"scheduler")); }public DefaultScheduler(NodeConfig config) {this(config.getMinElectionTimeout(), config.getMaxElectionTimeout(), config.getLogReplicationDelay(), config.getLogReplicationInterval()); }@Override public LogReplicationTask scheduleLogReplicationTask(Runnable task) { ScheduledFuture<?> scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay(task, logReplicationDelay,logReplicationInterval,TimeUnit.MILLISECONDS); return new LogReplicationTask(scheduledFuture); }@Override public ElectionTimeout scheduleElectionTimeout(Runnable task) {int timeout = electionTimeoutRandom.nextInt(maxElectionTimeout - minElectionTimeout) + minElectionTimeout; ScheduledFuture<?> scheduledFuture = scheduledExecutorService.schedule(task,timeout, TimeUnit.MILLISECONDS); return new ElectionTimeout(scheduledFuture); }@Override public void stop() throws InterruptedException {log.debug("stop scheduler"); scheduledExecutorService.shutdown(); scheduledExecutorService.awaitTermination(1,TimeUnit.SECONDS); } }
/** * 空調度 */public class NullScheduledFuture implements ScheduledFuture<Object> {@Override public long getDelay(TimeUnit unit) {return 0; }@Override public int compareTo(Delayed o) {return 0; }@Override public boolean cancel(boolean mayInterruptIfRunning) {return false; }@Override public boolean isCancelled() {return false; }@Override public boolean isDone() {return false; }@Override public Object get() throws InterruptedException, ExecutionException {return null; }@Override public Object get(long timeout,TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {return null; } }
/** * 測試用定時器組件 */@Slf4jpublic class NullScheduler implements Scheduler {@Override public LogReplicationTask scheduleLogReplicationTask(Runnable task) {log.debug("schedule log replication task"); return LogReplicationTask.NONE; }@Override public ElectionTimeout scheduleElectionTimeout(Runnable task) {log.debug("schedule election timeout"); return ElectionTimeout.NONE; }@Override public void stop() throws InterruptedException { } }
有關ScheduledExecutorService的內容可以參考線程池整理
節點之間發送的消息
選舉消息
/** * 選舉請求消息 */@Data@ToStringpublic class RequestVoteRpc {//選舉term private int term; //候選者節點Id,一般都是發送者自己 private NodeId candidateId; //候選者最后一條日志的索引 private int lastLogIndex = 0; //候選者最后一條日志的term private int lastLogTerm = 0;}
/** * 選舉響應消息 */@AllArgsConstructor@Getter@ToStringpublic class RequestVoteResult {//選舉term private final int term; //是否投票 private final boolean voteGranted;}
日志復制消息
/** * 復制日志請求消息 */@Datapublic class AppendEntriesRpc {//消息Id private String messageId; //選舉term private int term; //leader節點Id private NodeId leaderId; //前一條日志的索引 private int prevLogIndex = 0; //前一條日志的term private int prevLogTerm; //復制的日志條目 private List<Entry> entries = Collections.emptyList(); //leader節點的commitIndex private int leaderCommit; @Override public String toString() {return "AppendEntriesRpc{" +"term=" + term +", leaderId=" + leaderId +", prevLogIndex=" + prevLogIndex +", prevLogTerm=" + prevLogTerm +", entries.size=" + entries.size() +", leaderCommit=" + leaderCommit +'}'; } }
/** * 復制日志響應消息 */@AllArgsConstructor@Getter@ToStringpublic class AppendEntriesResult {//消息Id private final String rpcMessageId; //選舉term private final int term; //是否追加成功 private final boolean success;}
/** * 抽象rpc消息 * @param <T> */@AllArgsConstructor@Getterpublic abstract class AbstractRpcMessage<T> {//消息體 private final T rpc; //發送節點Id private final NodeId sourceNodeId; //Netty通道 private final Channel channel;}
/** * 投票選舉消息 */public class RequestVoteRpcMessage extends AbstractRpcMessage<RequestVoteRpc> {public RequestVoteRpcMessage(RequestVoteRpc rpc, NodeId sourceNodeId, Channel channel) {super(rpc, sourceNodeId, channel); } }
/** * 日志復制消息 */public class AppendEntriesRpcMessage extends AbstractRpcMessage<AppendEntriesRpc> {public AppendEntriesRpcMessage(AppendEntriesRpc rpc, NodeId sourceNodeId, Channel channel) {super(rpc, sourceNodeId, channel); } }
/** * 日志復制響應消息 */@AllArgsConstructor@Getterpublic class AppendEntriesResultMessage {private final AppendEntriesResult result; private final NodeId sourceNodeId; @NonNull private final AppendEntriesRpc rpc;}
消息連接處理器接口
/** * 消息連接處理器 */public interface Connector {/** * 初始化 */ void initialize(); /** * 發送選舉請求消息 * @param rpc 選舉請求消息 * @param destinationEndpoints 目標連接節點集群 */ void sendRequestVote(RequestVoteRpc rpc, Collection<NodeEndpoint> destinationEndpoints); /** * 回復選舉請求消息 * @param result 選舉回復消息 * @param destinationEndpoint 目標節點 */ void replyRequestVote(RequestVoteResult result,NodeEndpoint destinationEndpoint); /** * 發送復制日志請求消息 * @param rpc 復制日志請求消息 * @param destinationEndpoint 目標節點 */ void sendAppendEntries(AppendEntriesRpc rpc,NodeEndpoint destinationEndpoint); /** * 回復復制日志消息 * @param result 復制日志回復消息 * @param destinationEndpoint 目標節點 */ void replyAppendEntries(AppendEntriesResult result,NodeEndpoint destinationEndpoint); /** * 重置通道 */ void resetChannels(); /** * 關閉消息處理器 */ void close();}
接口適配器
/** * 消息連接處理器適配器 */public abstract class ConnectorAdapter implements Connector {@Override public void initialize() { }@Override public void sendRequestVote(RequestVoteRpc rpc, Collection<NodeEndpoint> destinationEndpoints) { }@Override public void replyRequestVote(RequestVoteResult result, NodeEndpoint destinationEndpoint) { }@Override public void sendAppendEntries(AppendEntriesRpc rpc, NodeEndpoint destinationEndpoint) { }@Override public void replyAppendEntries(AppendEntriesResult result, NodeEndpoint destinationEndpoint) { }@Override public void resetChannels() { }@Override public void close() { } }
模擬消息連接處理器
/** * 模擬消息連接處理器 */public class MockConnector extends ConnectorAdapter {@ToString @Getter public class Message {private Object rpc; private NodeId destinationNodeId; private Object result; }@Getter private List<Message> messages = new LinkedList<>(); @Override public void sendRequestVote(RequestVoteRpc rpc, Collection<NodeEndpoint> destinationEndpoints) { Message m = new Message(); m.rpc = rpc; messages.add(m); }@Override public void replyRequestVote(RequestVoteResult result, NodeEndpoint destinationEndpoint) { Message m = new Message(); m.result = result; m.destinationNodeId = destinationEndpoint.getId(); messages.add(m); }@Override public void sendAppendEntries(AppendEntriesRpc rpc, NodeEndpoint destinationEndpoint) { Message m = new Message(); m.rpc = rpc; m.destinationNodeId = destinationEndpoint.getId(); messages.add(m); }@Override public void replyAppendEntries(AppendEntriesResult result, NodeEndpoint destinationEndpoint) { Message m = new Message(); m.result = result; m.destinationNodeId = destinationEndpoint.getId(); messages.add(m); }public Message getLastMessage() {return messages.isEmpty() ? null : (Message) ((LinkedList)messages).getLast(); }private Message getLastMessageOrDefault() {return messages.isEmpty() ? new Message() : (Message) ((LinkedList)messages).getLast(); }public Object getRpc() {return getLastMessageOrDefault().rpc; }public Object getResult() {return getLastMessageOrDefault().result; }public NodeId getDestinationNodeId() {return getLastMessageOrDefault().destinationNodeId; }public int getMessageCount() {return messages.size(); }public List<Message> getMessages() {return new ArrayList<>(messages); }public void clearMessage() {messages.clear(); } }
基于Netty的消息連接器處理器
/** * 消息通道 */public interface Channel {/** * 寫入選舉請求消息 * @param rpc rpc */ void writeRequestVoteRpc(RequestVoteRpc rpc); /** * 寫入選舉響應消息 * @param result result */ void writeRequestVoteResult(RequestVoteResult result); /** * 寫入復制日志請求消息 * @param rpc rpc */ void writeAppendEntriesRpc(AppendEntriesRpc rpc); /** * 寫入復制日志響應消息 * @param result result */ void writeAppendEntriesResult(AppendEntriesResult result); /** * 關閉通道 */ void close();}
/** * 通道異常 */public class ChannelException extends RuntimeException {public ChannelException(Throwable cause) {super(cause); }public ChannelException(String message, Throwable cause) {super(message, cause); } }
/** * 通道連接異常 */public class ChannelConnectException extends ChannelException {public ChannelConnectException(Throwable cause) {super(cause); }public ChannelConnectException(String message, Throwable cause) {super(message, cause); } }
/** * Netty通道 */@AllArgsConstructor@Getterpublic class NioChannel implements Channel {private final io.netty.channel.Channel nettyChannel; @Override public void writeRequestVoteRpc(RequestVoteRpc rpc) {nettyChannel.writeAndFlush(rpc); }@Override public void writeRequestVoteResult(RequestVoteResult result) {nettyChannel.writeAndFlush(result); }@Override public void writeAppendEntriesRpc(AppendEntriesRpc rpc) {nettyChannel.writeAndFlush(rpc); }@Override public void writeAppendEntriesResult(AppendEntriesResult result) {nettyChannel.writeAndFlush(result); }@Override public void close() {try {nettyChannel.close().sync(); } catch (InterruptedException e) {throw new ChannelException("failed to close",e); } } }
/** * Netty消息連接處理器 */@Slf4jpublic class NioConnector implements Connector {//歡迎線程組 private final NioEventLoopGroup bossNioEventLoopGroup = new NioEventLoopGroup(1); //工作線程組 private final NioEventLoopGroup workerNioEventLoopGroup; //Netty服務端和連接端是否共享工作線程組 private final boolean workerGroupShared; //一種觀察者模式的消息發布/訂閱的工具 private final EventBus eventBus; //端口 private final int port; //入站消息通道組 private final InboundChannelGroup inboundChannelGroup = new InboundChannelGroup(); //出站消息通道組 private final OutboundChannelGroup outboundChannelGroup; public NioConnector(NodeId selfNodeId, EventBus eventBus, int port) {this(new NioEventLoopGroup(), false, selfNodeId, eventBus, port); }public NioConnector(NioEventLoopGroup workerNioEventLoopGroup, NodeId selfNodeId, EventBus eventBus, int port) {this(workerNioEventLoopGroup, true, selfNodeId, eventBus, port); }public NioConnector(NioEventLoopGroup workerNioEventLoopGroup, boolean workerGroupShared, NodeId selfNodeId, EventBus eventBus, int port) {this.workerNioEventLoopGroup = workerNioEventLoopGroup; this.workerGroupShared = workerGroupShared; this.eventBus = eventBus; this.port = port; outboundChannelGroup = new OutboundChannelGroup(workerNioEventLoopGroup, eventBus, selfNodeId); }/** * 建立Netty服務端 */ @Override public void initialize() { ServerBootstrap serverBootstrap = new ServerBootstrap() .group(bossNioEventLoopGroup, workerNioEventLoopGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,1024) .childOption(ChannelOption.TCP_NODELAY,true) .childHandler(new ChannelInitializer<SocketChannel>() {@Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new Decoder()); pipeline.addLast(new Encoder()); pipeline.addLast(new FromRemoteHandler(eventBus, inboundChannelGroup)); } }); log.debug("node listen on port {}", port); try { serverBootstrap.bind(port).sync(); } catch (InterruptedException e) {throw new ConnectorException("failed to bind port", e); } }@Override public void sendRequestVote(RequestVoteRpc rpc, Collection<NodeEndpoint> destinationEndpoints) { destinationEndpoints.forEach(endpoint -> {log.debug("send {} to node {}", rpc, endpoint.getId()); try { getChannel(endpoint).writeRequestVoteRpc(rpc); } catch (Exception e) { logException(e); } }); }private void logException(Exception e) {if (e instanceof ChannelConnectException) {log.warn(e.getMessage()); } else {log.warn("failed to process channel", e); } }@Override public void replyRequestVote(RequestVoteResult result,NodeEndpoint destinationEndpoint) {log.debug("reply {} to node {}", result, destinationEndpoint.getId()); try { getChannel(destinationEndpoint).writeRequestVoteResult(result); } catch (Exception e) { logException(e); } }@Override public void sendAppendEntries(AppendEntriesRpc rpc, NodeEndpoint destinationEndpoint) {log.debug("send {} to node {}", rpc, destinationEndpoint.getId()); try { getChannel(destinationEndpoint).writeAppendEntriesRpc(rpc); } catch (Exception e) { logException(e); } }@Override public void replyAppendEntries(AppendEntriesResult result, NodeEndpoint destinationEndpoint) {log.debug("reply {} to node {}", result, destinationEndpoint.getId()); try { getChannel(destinationEndpoint).writeAppendEntriesResult(result); } catch (Exception e) { logException(e); } }/** * 建立Netty連接端 * @param endpoint * @return */ private Channel getChannel(NodeEndpoint endpoint) {return outboundChannelGroup.getOrConnect(endpoint.getId(), endpoint.getAddress()); }@Override public void resetChannels() {inboundChannelGroup.closeAll(); }@Override public void close() {log.debug("close connector"); inboundChannelGroup.closeAll(); outboundChannelGroup.closeAll(); bossNioEventLoopGroup.shutdownGracefully(); if (!workerGroupShared) {workerNioEventLoopGroup.shutdownGracefully(); } } }
有關Netty的說明請參考Netty整理 Netty整理(二) Netty整理(三)
/** * 自定義解碼器 */public class Decoder extends ByteToMessageDecoder {//日志復制創建工廠 private final EntryFactory entryFactory = new EntryFactory(); /** * 解碼,消息體占不小于8個字節,前8個字節即2個整數,第一個整數為消息類型 * 第二個整數為后續字節的長度,后續字節為ProtoBuffer序列化后的二進制碼 * @param ctx * @param in 讀取緩沖區 * @param out ProtoBuffer反序列化后的對象列表 * @throws Exception */ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {int availableBytes = in.readableBytes(); if (availableBytes < 8) return; //標記ByteBuf的讀取位置 in.markReaderIndex(); int messageType = in.readInt(); int payloadLength = in.readInt(); if (in.readableBytes() < payloadLength) { in.resetReaderIndex(); return; }byte[] payload = new byte[payloadLength]; in.readBytes(payload); switch (messageType) {case MessageConstants.MSG_TYPE_NODE_ID: out.add(new NodeId(new String(payload))); break; case MessageConstants.MSG_TYPE_REQUEST_VOTE_RPC: Protos.RequestVoteRpc protoRVRpc = Protos.RequestVoteRpc.parseFrom(payload); RequestVoteRpc rpc = new RequestVoteRpc(); rpc.setTerm(protoRVRpc.getTerm()); rpc.setCandidateId(new NodeId(protoRVRpc.getCandidateId())); rpc.setLastLogIndex(protoRVRpc.getLastLogIndex()); rpc.setLastLogTerm(protoRVRpc.getLastLogTerm()); out.add(rpc); break; case MessageConstants.MSG_TYPE_REQUEST_VOTE_RESULT: Protos.RequestVoteResult protoRVResult = Protos.RequestVoteResult.parseFrom(payload); out.add(new RequestVoteResult(protoRVResult.getTerm(), protoRVResult.getVoteGranted())); break; case MessageConstants.MSG_TYPE_APPEND_ENTRIES_RPC: Protos.AppendEntriesRpc protoAERpc = Protos.AppendEntriesRpc.parseFrom(payload); AppendEntriesRpc aeRpc = new AppendEntriesRpc(); aeRpc.setMessageId(protoAERpc.getMessageId()); aeRpc.setTerm(protoAERpc.getTerm()); aeRpc.setLeaderId(new NodeId(protoAERpc.getLeaderId())); aeRpc.setLeaderCommit(protoAERpc.getLeaderCommit()); aeRpc.setPrevLogIndex(protoAERpc.getPrevLogIndex()); aeRpc.setPrevLogTerm(protoAERpc.getPrevLogTerm()); aeRpc.setEntries(protoAERpc.getEntriesList().stream().map(e ->entryFactory.create(e.getKind(), e.getIndex(), e.getTerm(), e.getCommand().toByteArray()) ).collect(Collectors.toList())); out.add(aeRpc); break; case MessageConstants.MSG_TYPE_APPEND_ENTRIES_RESULT: Protos.AppendEntriesResult protoAEResult = Protos.AppendEntriesResult.parseFrom(payload); out.add(new AppendEntriesResult(protoAEResult.getRpcMessageId(), protoAEResult.getTerm(), protoAEResult.getSuccess())); break; } } }
/** * 自定義編碼器 */public class Encoder extends MessageToByteEncoder<Object> { /** * 將消息進行ProtoBuffer序列化后寫入ByteBuf中 * @param ctx * @param msg * @param out * @throws Exception */ @Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {if (msg instanceof NodeId) {this.writeMessage(out, MessageConstants.MSG_TYPE_NODE_ID, ((NodeId) msg).getValue().getBytes()); } else if (msg instanceof RequestVoteRpc) { RequestVoteRpc rpc = (RequestVoteRpc) msg; Protos.RequestVoteRpc protoRpc = Protos.RequestVoteRpc.newBuilder() .setTerm(rpc.getTerm()) .setCandidateId(rpc.getCandidateId().getValue()) .setLastLogIndex(rpc.getLastLogIndex()) .setLastLogTerm(rpc.getLastLogTerm()) .build(); this.writeMessage(out, MessageConstants.MSG_TYPE_REQUEST_VOTE_RPC, protoRpc); } else if (msg instanceof RequestVoteResult) { RequestVoteResult result = (RequestVoteResult) msg; Protos.RequestVoteResult protoResult = Protos.RequestVoteResult.newBuilder() .setTerm(result.getTerm()) .setVoteGranted(result.isVoteGranted()) .build(); this.writeMessage(out, MessageConstants.MSG_TYPE_REQUEST_VOTE_RESULT, protoResult); } else if (msg instanceof AppendEntriesRpc) { AppendEntriesRpc rpc = (AppendEntriesRpc) msg; Protos.AppendEntriesRpc protoRpc = Protos.AppendEntriesRpc.newBuilder() .setMessageId(rpc.getMessageId()) .setTerm(rpc.getTerm()) .setLeaderId(rpc.getLeaderId().getValue()) .setLeaderCommit(rpc.getLeaderCommit()) .setPrevLogIndex(rpc.getPrevLogIndex()) .setPrevLogTerm(rpc.getPrevLogTerm()) .addAllEntries( rpc.getEntries().stream().map(e -> Protos.AppendEntriesRpc.Entry.newBuilder() .setKind(e.getKind()) .setIndex(e.getIndex()) .setTerm(e.getTerm()) .setCommand(ByteString.copyFrom(e.getCommandBytes())) .build() ).collect(Collectors.toList()) ).build(); this.writeMessage(out, MessageConstants.MSG_TYPE_APPEND_ENTRIES_RPC, protoRpc); } else if (msg instanceof AppendEntriesResult) { AppendEntriesResult result = (AppendEntriesResult) msg; Protos.AppendEntriesResult protoResult = Protos.AppendEntriesResult.newBuilder() .setRpcMessageId(result.getRpcMessageId()) .setTerm(result.getTerm()) .setSuccess(result.isSuccess()) .build(); this.writeMessage(out, MessageConstants.MSG_TYPE_APPEND_ENTRIES_RESULT, protoResult); } }private void writeMessage(ByteBuf out, int messageType, MessageLite message) throws IOException { ByteArrayOutputStream byteOutput = new ByteArrayOutputStream(); message.writeTo(byteOutput); out.writeInt(messageType); this.writeBytes(out, byteOutput.toByteArray()); }private void writeMessage(ByteBuf out, int messageType, byte[] bytes) {// 4 + 4 + VAR out.writeInt(messageType); this.writeBytes(out, bytes); }private void writeBytes(ByteBuf out, byte[] bytes) { out.writeInt(bytes.length); out.writeBytes(bytes); } }
/** * 消息抽象處理器 * ChannelDuplexHandler實現了ChannelInboundHandler * 和ChannelOutboundHandler兩個接口 */@Slf4j@RequiredArgsConstructorpublic abstract class AbstractHandler extends ChannelDuplexHandler {//一種觀察者模式的消息發布/訂閱的工具 protected final EventBus eventBus; //遠程節點Id protected NodeId remoteId; //消息通道 protected Channel channel; //最后一個日志復制請求消息 private AppendEntriesRpc lastAppendEntriesRpc; /** * 從通道中獲取的消息進行eventBus的消息投遞 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//斷言,需要在JVM參數中加入-ea,否則不起作用 assert remoteId != null; assert channel != null; if (msg instanceof RequestVoteRpc) { RequestVoteRpc rpc = (RequestVoteRpc) msg; eventBus.post(new RequestVoteRpcMessage(rpc, remoteId, channel)); } else if (msg instanceof RequestVoteResult) {eventBus.post(msg); } else if (msg instanceof AppendEntriesRpc) { AppendEntriesRpc rpc = (AppendEntriesRpc) msg; eventBus.post(new AppendEntriesRpcMessage(rpc, remoteId, channel)); } else if (msg instanceof AppendEntriesResult) { AppendEntriesResult result = (AppendEntriesResult) msg; if (lastAppendEntriesRpc == null) {log.warn("no last append entries rpc"); } else {if (!Objects.equals(result.getRpcMessageId(), lastAppendEntriesRpc.getMessageId())) {log.warn("incorrect append entries rpc message id {}, expected {}", result.getRpcMessageId(), lastAppendEntriesRpc.getMessageId()); } else {eventBus.post(new AppendEntriesResultMessage(result, remoteId, lastAppendEntriesRpc)); lastAppendEntriesRpc = null; } } } }@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {if (msg instanceof AppendEntriesRpc) {lastAppendEntriesRpc = (AppendEntriesRpc) msg; }super.write(ctx, msg, promise); }@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.warn(cause.getMessage(), cause); ctx.close(); } }
/** * 服務端消息事件處理器 */@Slf4jpublic class FromRemoteHandler extends AbstractHandler {//入站消息通道組 private final InboundChannelGroup channelGroup; public FromRemoteHandler(EventBus eventBus, InboundChannelGroup channelGroup) {super(eventBus); this.channelGroup = channelGroup; }/** * 針對節點Id重寫消息讀取事件 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof NodeId) {remoteId = (NodeId) msg; NioChannel nioChannel = new NioChannel(ctx.channel()); channel = nioChannel; channelGroup.add(remoteId, nioChannel); return; }log.debug("receive {} from {}", msg, remoteId); super.channelRead(ctx, msg); } }
/** * 入站消息通道組 */@Slf4jpublic class InboundChannelGroup {//所有的入站消息通道 private List<NioChannel> channels = new CopyOnWriteArrayList<>(); /** * 消息進入后的處理 * @param remoteId * @param channel */ public void add(NodeId remoteId, NioChannel channel) {log.debug("channel INBOUND-{} connected", remoteId); //一個ChannelFuture對象代表尚未發生的IO操作,因為在Netty中所有的操作都是異步的 //同步阻塞監聽端口關閉后移除該通道(這是一個尚未發生的未來事件) channel.getNettyChannel().closeFuture().addListener((ChannelFutureListener) future -> {log.debug("channel INBOUND-{} disconnected", remoteId); remove(channel); }); }private void remove(NioChannel channel) {channels.remove(channel); }/** * 關閉所有入站連接通道 */ public void closeAll() {log.debug("close all inbound channels"); channels.forEach(NioChannel::close); } }
/** * 出站消息通道組 */@Slf4j@RequiredArgsConstructorpublic class OutboundChannelGroup {//工作線程組 private final EventLoopGroup workerGroup; //一種觀察者模式的消息發布/訂閱的工具 private final EventBus eventBus; //自身的節點Id private final NodeId selfNodeId; //節點Id和Netty通道異步運行結果的映射 private Map<NodeId, Future<NioChannel>> channelMap = new ConcurrentHashMap<>(); /** * 獲取或連接服務端的Netty通道 * @param nodeId * @param address * @return */ public NioChannel getOrConnect(NodeId nodeId, Address address) { Future<NioChannel> future = channelMap.get(nodeId); if (future == null) { FutureTask<NioChannel> newFuture = new FutureTask<>(() -> connect(nodeId, address)); future = channelMap.putIfAbsent(nodeId, newFuture); if (future == null) { future = newFuture; newFuture.run(); } }try {return future.get(); } catch (Exception e) {channelMap.remove(nodeId); if (e instanceof ExecutionException) { Throwable cause = e.getCause(); if (cause instanceof ConnectException) {throw new ChannelConnectException("failed to get channel to node " + nodeId +", cause " + cause.getMessage(), cause); } }throw new ChannelException("failed to get channel to node " + nodeId, e); } }/** * 建立連接端并連接到服務端 * @param nodeId * @param address * @return * @throws InterruptedException */ private NioChannel connect(NodeId nodeId, Address address) throws InterruptedException { Bootstrap bootstrap = new Bootstrap() .group(workerGroup) .channel(NioSocketChannel.class) .option(ChannelOption.SO_BACKLOG,1024) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() {@Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new Decoder()); pipeline.addLast(new Encoder()); pipeline.addLast(new ToRemoteHandler(eventBus, nodeId, selfNodeId)); } }); ChannelFuture future = bootstrap.connect(address.getHost(), address.getPort()).sync(); if (!future.isSuccess()) {throw new ChannelException("failed to connect", future.cause()); }log.debug("channel OUTBOUND-{} connected", nodeId); Channel nettyChannel = future.channel(); //當通道連接關閉時移除節點Id的Netty通道映射 nettyChannel.closeFuture().addListener((ChannelFutureListener) cf -> {log.debug("channel OUTBOUND-{} disconnected", nodeId); channelMap.remove(nodeId); }); return new NioChannel(nettyChannel); }/** * 關閉所有連接端通道 */ public void closeAll() {log.debug("close all outbound channels"); channelMap.forEach((nodeId, nioChannelFuture) -> {try { nioChannelFuture.get().close(); } catch (Exception e) {log.warn("failed to close", e); } }); } }
/** * 連接端消息事件處理器 */@Slf4jclass ToRemoteHandler extends AbstractHandler {//自身節點Id private final NodeId selfNodeId; ToRemoteHandler(EventBus eventBus, NodeId remoteId, NodeId selfNodeId) {super(eventBus); this.remoteId = remoteId; this.selfNodeId = selfNodeId; }@Override public void channelActive(ChannelHandlerContext ctx) { ctx.write(selfNodeId); channel = new NioChannel(ctx.channel()); }@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("receive {} from {}", msg, remoteId); super.channelRead(ctx, msg); } }
任務執行
/** * 任務執行器 */public interface TaskExecutor {/** * 提交任務 * @param task * @return */ Future<?> submit(Runnable task); /** * 提交任務,任務有返回值 * @param task * @param <V> * @return */ <V> Future<V> submit(Callable<V> task); /** * 關閉任務執行器 * @throws InterruptedException */ void shutdown() throws InterruptedException;}
異步單線程任務執行器實現類
/** * 異步單線程任務執行器 */public class SingleThreadTaskExecutor implements TaskExecutor {private final ExecutorService executorService; public SingleThreadTaskExecutor() {this(Executors.defaultThreadFactory()); }public SingleThreadTaskExecutor(String name) {this(r -> new Thread(r,name)); }public SingleThreadTaskExecutor(ThreadFactory threadFactory) {executorService = Executors.newSingleThreadExecutor(threadFactory); }@Override public Future<?> submit(Runnable task) {return executorService.submit(task); }@Override public <V> Future<V> submit(Callable<V> task) {return executorService.submit(task); }@Override public void shutdown() throws InterruptedException {executorService.shutdown(); executorService.awaitTermination(1,TimeUnit.SECONDS); } }
直接任務執行器實現類
/** * 直接任務執行器 */public class DirectTaskExecutor implements TaskExecutor {@Override public Future<?> submit(Runnable task) { FutureTask<?> futureTask = new FutureTask<>(task,null); futureTask.run(); return futureTask; }@Override public <V> Future<V> submit(Callable<V> task) { FutureTask<V> futureTask = new FutureTask<>(task); futureTask.run(); return futureTask; }@Override public void shutdown() throws InterruptedException { } }
部分角色狀態持久化
/** * 節點存儲 */public interface NodeStore {/** * 獲取currentTerm * @return */ int getTerm(); /** * 設置currentTerm * @param term */ void setTerm(int term); /** * 獲取voterFor * @return */ NodeId getVotedFor(); /** * 設置votedFor * @param votedFor */ void setVotedFor(NodeId votedFor); /** * 關閉文件 */ void close();}
基于內存實現節點存儲
/** * 基于內存實現節點存儲,便于測試 */@AllArgsConstructorpublic class MemoryNodeStore implements NodeStore {private int term; private NodeId votedFor; public MemoryNodeStore() {this(0,null); }@Override public int getTerm() {return term; }@Override public void setTerm(int term) {this.term = term; }@Override public NodeId getVotedFor() {return votedFor; }@Override public void setVotedFor(NodeId votedFor) {this.votedFor = votedFor; }@Override public void close() { } }
基于文件存儲
節點存儲異常
/** * 節點存儲異常 */public class NodeStoreException extends RuntimeException {public NodeStoreException(Throwable cause) {super(cause); }public NodeStoreException(String message, Throwable cause) {super(message, cause); } }
/** * 可定位的文件接口 */public interface SeekableFile {/** * 獲取文件當前的位置 * @return * @throws IOException */ long position() throws IOException; /** * 定位文件指針 * @param position * @throws IOException */ void seek(long position) throws IOException; /** * 文件中寫入一個整數 * @param i * @throws IOException */ void writeInt(int i) throws IOException; /** * 文件呢中寫入一個長整數 * @param l * @throws IOException */ void writeLong(long l) throws IOException; /** * 文件呢中寫入字節數組 * @param b * @throws IOException */ void write(byte[] b) throws IOException; /** * 文件中讀取一個整數 * @return * @throws IOException */ int readInt() throws IOException; /** * 文件中讀取一個長整數 * @return * @throws IOException */ long readLong() throws IOException; /** * 文件中讀取字節數組 * @param b * @return * @throws IOException */ int read(byte[] b) throws IOException; /** * 獲取文件的長度 * @return * @throws IOException */ long size() throws IOException; /** * 設置文件的長度 * @param size * @throws IOException */ void truncate(long size) throws IOException; /** * 獲取一段字節流 * @param start * @return * @throws IOException */ InputStream inputStream(long start) throws IOException; /** * 刷新 * @throws IOException */ void flush() throws IOException; /** * 關閉文件 * @throws IOException */ void close() throws IOException;}
可定位文件接口適配器實現類
public class RandomAccessFileAdapter implements SeekableFile {private final File file; private final RandomAccessFile randomAccessFile; public RandomAccessFileAdapter(File file) throws FileNotFoundException {this(file, "rw"); }public RandomAccessFileAdapter(File file, String mode) throws FileNotFoundException {this.file = file; randomAccessFile = new RandomAccessFile(file, mode); }@Override public void seek(long position) throws IOException {randomAccessFile.seek(position); }@Override public void writeInt(int i) throws IOException {randomAccessFile.writeInt(i); }@Override public void writeLong(long l) throws IOException {randomAccessFile.writeLong(l); }@Override public void write(byte[] b) throws IOException {randomAccessFile.write(b); }@Override public int readInt() throws IOException {return randomAccessFile.readInt(); }@Override public long readLong() throws IOException {return randomAccessFile.readLong(); }@Override public int read(byte[] b) throws IOException {return randomAccessFile.read(b); }@Override public long size() throws IOException {return randomAccessFile.length(); }@Override public void truncate(long size) throws IOException {randomAccessFile.setLength(size); }@Override public InputStream inputStream(long start) throws IOException { FileInputStream input = new FileInputStream(file); if (start > 0) { input.skip(start); }return input; }@Override public long position() throws IOException {return randomAccessFile.getFilePointer(); }@Override public void flush() throws IOException { }@Override public void close() throws IOException {randomAccessFile.close(); } }
/** * 基于文件節點存儲 */public class FileNodeStore implements NodeStore {//文件名 private static final String FILE_NAME = "node.bin"; //currentTerm在文件中的位置 private static final long OFFSET_TERM = 0; //已投票的節點Id在文件中的位置 private static final long OFFSET_VOTED_FOR = 4; //文件操作接口 private final SeekableFile seekableFile; //currentTerm private int term = 0; //已投票節點Id private NodeId votedFor = null; public FileNodeStore(File file) {try {if (!file.exists()) { Files.touch(file); }seekableFile = new RandomAccessFileAdapter(file); initializeOrLoad(); } catch (IOException e) {throw new NodeStoreException(e); } }public FileNodeStore(SeekableFile seekableFile) {this.seekableFile = seekableFile; try { initializeOrLoad(); } catch (IOException e) {throw new NodeStoreException(e); } }/** * 初始化或從文件導入term和votedFor屬性 * @throws IOException */ private void initializeOrLoad() throws IOException {if (seekableFile.size() == 0) {seekableFile.truncate(8); seekableFile.seek(0); seekableFile.writeInt(0); seekableFile.writeInt(0); }else {term = seekableFile.readInt(); int length = seekableFile.readInt(); if (length > 0) {byte[] bytes = new byte[length]; seekableFile.read(bytes); votedFor = new NodeId(new String(bytes)); } } }@Override public int getTerm() {return term; }@Override public void setTerm(int term) {try {seekableFile.seek(OFFSET_TERM); seekableFile.writeInt(term); } catch (IOException e) {throw new NodeStoreException(e); }this.term = term; }@Override public NodeId getVotedFor() {return votedFor; }@Override public void setVotedFor(NodeId votedFor) {try {seekableFile.seek(OFFSET_VOTED_FOR); if (votedFor == null) {seekableFile.writeInt(0); seekableFile.truncate(8); }else {byte[] bytes = votedFor.getValue().getBytes(); seekableFile.writeInt(bytes.length); seekableFile.write(bytes); } } catch (IOException e) {throw new NodeStoreException(e); }this.votedFor = votedFor; }@Override public void close() {try {seekableFile.close(); } catch (IOException e) {throw new NodeStoreException(e); } } }
/** * 文件工具 */public class Files {/** * 文件創建的檢測 * @param file * @throws IOException */ public static void touch(File file) throws IOException {if (!file.createNewFile() && !file.setLastModified(System.currentTimeMillis())) {throw new IOException("failed to touch file " + file); } } }
選舉核心算法
/** * 節點上下文 */@Data@AllArgsConstructor@Builderpublic class NodeContext {//當前節點Id private NodeId selfId; //集群成員組 private NodeGroup group; //消息連接處理器 private Connector connector; //任務定時器 private Scheduler scheduler; //是一種觀察者模式的消息發布/訂閱的工具 //import com.google.common.eventbus.EventBus; private EventBus eventBus; //任務執行器 private TaskExecutor taskExecutor; //記錄日志 private Log log; //節點存儲器 private NodeStore store;}
/** * 一致性(核心)組件 */public interface Node {/** * 啟動 */ void start(); /** * 關閉 */ void stop();}
一致性(核心)組件實現類
/** * 一致性(核心)組件實現類 */@Slf4j@RequiredArgsConstructorpublic class NodeImpl implements Node {//核心組件上下文 @Getter private final NodeContext context; //是否已啟動 private boolean started; //當前節點角色 @Getter private AbstractNodeRole role; @Override public synchronized void start() {if (started) {return; }//將自己注冊到EventBus context.getEventBus().register(this); context.getConnector().initialize(); NodeStore store = context.getStore(); changeToRole(new FollowerNodeRole(store.getTerm(),store.getVotedFor(), null,scheduleElectionTimeout())); started = true; }@Override public synchronized void stop() throws InterruptedException {if (!started) {throw new IllegalArgumentException("node not started"); }context.getScheduler().stop(); context.getConnector().close(); context.getTaskExecutor().shutdown(); started = false; }/** * 轉變當前節點角色 * @param newRole */ private void changeToRole(AbstractNodeRole newRole) {log.debug("node {},role state changed -> {}",context.getSelfId(),newRole); //獲取持久化節點存儲 NodeStore store = context.getStore(); //存儲新角色的term store.setTerm(newRole.getTerm()); //如果新節點角色為從節點,存儲已投票的節點Id if (newRole.getName() == RoleName.FOLLOWER) { store.setVotedFor(((FollowerNodeRole) newRole).getVotedFor()); }//將當前節點角色轉化為新的節點角色 role = newRole; }/** * 調度選舉超時 * @return */ private ElectionTimeout scheduleElectionTimeout() {return context.getScheduler().scheduleElectionTimeout(this::electionTimeout); }/** * 運行超時任務 */ public void electionTimeout() {context.getTaskExecutor().submit(this::doProcessElectionTimeout); }/** * 日志復制定時任務 * @return */ public LogReplicationTask scheduleLogReplicationTask() {return context.getScheduler().scheduleLogReplicationTask(this::replicateLog); }/** * 日志復制任務 */ public void replicateLog() {context.getTaskExecutor().submit(this::doReplicateLog); }/** * 執行日志復制,發送日志復制消息給除自己之外的其他節點 */ private void doReplicateLog() {log.debug("replicate log"); context.getGroup().listReplicationTarget() .forEach(this::doMemberReplicateLog); }/** * 發送日志復制消息給集群成員 * @param member */ private void doMemberReplicateLog(GroupMember member) { AppendEntriesRpc rpc = new AppendEntriesRpc(); rpc.setTerm(role.getTerm()); rpc.setLeaderId(context.getSelfId()); rpc.setPrevLogIndex(0); rpc.setPrevLogTerm(0); rpc.setLeaderCommit(0); context.getConnector().sendAppendEntries(rpc,member.getEndpoint()); }/** * 超時任務進程 */ private void doProcessElectionTimeout() {//如果節點類型為Leader節點,不進行選舉 if (role.getName() == RoleName.LEADER) {log.warn("node {},current role is leader,ignore election timeout",context.getSelfId()); return; }//獲取新的選舉的term int newTerm = role.getTerm() + 1; role.cancelTimeoutOrTask(); log.info("start election"); //將當前節點轉變為選舉節點 changeToRole(new CandidateNodeRole(newTerm,scheduleElectionTimeout())); //創建選舉投票消息,并給消息賦屬性 RequestVoteRpc rpc = new RequestVoteRpc(); rpc.setTerm(newTerm); rpc.setCandidateId(context.getSelfId()); rpc.setLastLogIndex(0); rpc.setLastLogTerm(0); //并把消息通過網絡發送給除自己之外的其他所有節點 context.getConnector().sendRequestVote(rpc,context.getGroup().listEndpointExceptSelf()); }/** * 處理投票消息的請求 * @param rpcMessage */ @Subscribe public void onReceiveRequestVoteRpc(RequestVoteRpcMessage rpcMessage) {//通過任務執行器提交 //通過網絡連接向選舉節點回復選舉投票消息 context.getTaskExecutor().submit(() -> context.getConnector().replyRequestVote( doProcessRequestVoteRpc(rpcMessage),context.getGroup().getMember(rpcMessage.getSourceNodeId()).getEndpoint() )); }/** * 投票消息處理進程 * @param rpcMessage * @return */ private RequestVoteResult doProcessRequestVoteRpc(RequestVoteRpcMessage rpcMessage) {//獲取投票消息中的消息體 RequestVoteRpc rpc = rpcMessage.getRpc(); //如果消息的term小于當前節點的term,不投票 if (rpc.getTerm() < role.getTerm()) {log.debug("term from rpc < current term,don't vote ({} < {})", rpc.getTerm(),role.getTerm()); return new RequestVoteResult(role.getTerm(),false); }//決定投票 boolean voteForCandidate = true; //如果消息的term大于當前節點的term if (rpc.getTerm() > role.getTerm()) {//將當前節點轉化成從節點并投票 becomeFollower(rpc.getTerm(),voteForCandidate ? rpc.getCandidateId() : null,null,true); return new RequestVoteResult(rpc.getTerm(),voteForCandidate); }//如果消息的term等于當前節點的term,根據當前節點的角色進行處理 switch (role.getName()) {case FOLLOWER: //如果是從節點 FollowerNodeRole follower = (FollowerNodeRole) role; //獲取投過票的節點Id(即選舉節點Id中的一個) NodeId votedFor = follower.getVotedFor(); //看自己如果沒有投過票或者消息的選舉節點發送者Id就是本身已投過票的節點Id if ((votedFor == null && voteForCandidate) || Objects.equals(votedFor,rpc.getCandidateId())) {//將當前節點變為從節點,并記錄投票的選舉節點,并對選舉發送節點投票 becomeFollower(role.getTerm(),rpc.getCandidateId(),null,true); return new RequestVoteResult(rpc.getTerm(),true); }//否則不投票 return new RequestVoteResult(rpc.getTerm(),false); case CANDIDATE: //如果是選舉節點或主節點,不投票 case LEADER:return new RequestVoteResult(role.getTerm(),false); default:throw new IllegalArgumentException("unexpected node role [" + role.getName() + "]"); } }/** * 變成從節點 * @param term 當前term * @param votedFor 投過票的節點Id * @param leaderId 主節點Id * @param scheduleElectionTimeout 是否運行超時任務 */ private void becomeFollower(int term, NodeId votedFor,NodeId leaderId,boolean scheduleElectionTimeout) {//取消當前節點的超時任務 role.cancelTimeoutOrTask(); //如果有主節點,打印出主節點和term if (leaderId != null && !leaderId.equals(role.getLeaderId(context.getSelfId()))) {log.info("current leader is {},term {}",leaderId,term); }//是否運行超時任務,并獲取該超時任務 ElectionTimeout electionTimeout = scheduleElectionTimeout ? scheduleElectionTimeout() : ElectionTimeout.NONE; //將當前節點角色轉化為從節點 changeToRole(new FollowerNodeRole(term,votedFor,leaderId,electionTimeout)); }/** * 收到投票結果消息的處理 * @param result */ @Subscribe public void onReceiveRequestVoteResult(RequestVoteResult result) {context.getTaskExecutor().submit(() -> processRequestVoteResult(result)); }/** * 投票結果消息的異步處理 * @param result */ private void processRequestVoteResult(RequestVoteResult result) {context.getTaskExecutor().submit(() -> doProcessRequestVoteResult(result)); }/** * 投票結果消息處理進程 * @param result */ private void doProcessRequestVoteResult(RequestVoteResult result) {//如果消息的term大于當前節點的term //當前節點退化成從節點 if (result.getTerm() > role.getTerm()) { becomeFollower(result.getTerm(), null, null, true); return; }//如果當前節點不是選舉節點,結束處理 if (role.getName() != RoleName.CANDIDATE) {log.debug("receive request vote result and current role is not candidate, ignore"); return; }//如果消息返回不投票,結束處理 if (!result.isVoteGranted()) {return; }//將當前選舉節點的已投票數加1 int currentVotesCount = ((CandidateNodeRole) role).getVotesCount() + 1; //獲取集群主要成員的數量 int countOfMajor = context.getGroup().getCountOfMajor(); log.debug("votes count {}, major node count {}", currentVotesCount, countOfMajor); //取消選舉超時任務 role.cancelTimeoutOrTask(); //如果當前選舉節點的選舉票數過半 if (currentVotesCount > countOfMajor / 2) {log.info("become leader, term {}", role.getTerm()); //重置其他節點的復制進度 resetReplicatingStates(); //將當前節點轉變成主節點,并開始發送日志復制消息或心跳消息 changeToRole(new LeaderNodeRole(role.getTerm(), scheduleLogReplicationTask())); context.getLog().appendEntry(role.getTerm()); //重置所有進站通道 context.getConnector().resetChannels(); } else {//如果選舉票數未過半,依然轉化為選舉節點,并開啟選舉超時任務 changeToRole(new CandidateNodeRole(role.getTerm(), currentVotesCount, scheduleElectionTimeout())); } }/** * 重置其他節點的復制進度 */ private void resetReplicatingStates() {context.getGroup().resetReplicatingStates(context.getLog().getNextIndex()); }/** * 處理心跳消息的請求 * @param rpcMessage */ @Subscribe public void onReceiveAppendEntriesRpc(AppendEntriesRpcMessage rpcMessage) {context.getTaskExecutor().submit(() -> context.getConnector().replyAppendEntries(doProcessAppendEntriesRpc(rpcMessage), context.getGroup().getMember(rpcMessage.getSourceNodeId()).getEndpoint())); }/** * 心跳消息處理進程 * @param rpcMessage * @return */ private AppendEntriesResult doProcessAppendEntriesRpc(AppendEntriesRpcMessage rpcMessage) { AppendEntriesRpc rpc = rpcMessage.getRpc(); if (rpc.getTerm() < role.getTerm()) {return new AppendEntriesResult(rpc.getMessageId(),role.getTerm(),false); }if (rpc.getTerm() > role.getTerm()) { becomeFollower(rpc.getTerm(),null,rpc.getLeaderId(),true); return new AppendEntriesResult(rpc.getMessageId(),rpc.getTerm(),appendEntries(rpc)); }assert rpc.getTerm() == role.getTerm(); switch (role.getName()) {case FOLLOWER: becomeFollower(rpc.getTerm(),((FollowerNodeRole) role).getVotedFor(),rpc.getLeaderId(),true); return new AppendEntriesResult(rpc.getMessageId(),rpc.getTerm(),appendEntries(rpc)); case CANDIDATE: becomeFollower(rpc.getTerm(),null,rpc.getLeaderId(),true); return new AppendEntriesResult(rpc.getMessageId(),rpc.getTerm(),appendEntries(rpc)); case LEADER:log.warn("receive append entries rpc from another leader {},ignore",rpc.getLeaderId()); return new AppendEntriesResult(rpc.getMessageId(),rpc.getTerm(),false); default:throw new IllegalArgumentException("unexpected node role [" + role.getName() + "]"); } }/** * 追加日志 * @param rpc * @return */ private boolean appendEntries(AppendEntriesRpc rpc) {return true; }/** * 處理日志復制消息的響應 * @param resultMessage */ @Subscribe public void onReceiveAppendEntriesResult(AppendEntriesResultMessage resultMessage) {context.getTaskExecutor().submit(() -> doProcessAppendEntriesResult(resultMessage)); }/** * 日志復制消息響應的進程 * @param resultMessage */ private void doProcessAppendEntriesResult(AppendEntriesResultMessage resultMessage) { AppendEntriesResult result = resultMessage.getResult(); if (result.getTerm() > role.getTerm()) { becomeFollower(result.getTerm(),null,null,true); return; }if (role.getName() != RoleName.LEADER) {log.warn("receive append entries result from node {} but current" +" node is not leader,ignore",resultMessage.getSourceNodeId()); } } }
一致性(核心)組件建造類
/** * 一致性(核心)組件建造類 */public class NodeBuilder {private final NodeGroup group; private final NodeId selfId; private final EventBus eventBus; private Scheduler scheduler = null; private Connector connector = null; private TaskExecutor taskExecutor = null; private NodeStore nodeStore = null; private Log log = null; private NodeConfig config = new NodeConfig(); public NodeBuilder(Collection<NodeEndpoint> endpoints,NodeId selfId) {group = new NodeGroup(endpoints,selfId); this.selfId = selfId; eventBus = new EventBus(selfId.getValue()); }public NodeBuilder(NodeEndpoint endpoint) {this(Collections.singletonList(endpoint),endpoint.getId()); }/** * 設置通信組件 * @param connector * @return */ public NodeBuilder setConnector(Connector connector) {this.connector = connector; return this; }/** * 設置定時器 * @param scheduler * @return */ public NodeBuilder setScheduler(Scheduler scheduler) {this.scheduler = scheduler; return this; }/** * 設置任務執行器 * @param taskExecutor * @return */ public NodeBuilder setTaskExecutor(TaskExecutor taskExecutor) {this.taskExecutor = taskExecutor; return this; }/** * 設置存儲器 * @param nodeStore * @return */ public NodeBuilder setNodeStore(NodeStore nodeStore) {this.nodeStore = nodeStore; return this; }/** * 設置日志 * @param log * @return */ public NodeBuilder setLog(Log log) {this.log = log; return this; }/** * 構建Node實例 * @return */ public Node build() {return new NodeImpl(buildContext()); }/** * 構建上下文 * @return */ private NodeContext buildContext() {return NodeContext.builder() .group(group) .selfId(selfId) .eventBus(eventBus) .scheduler(scheduler != null ? scheduler : new DefaultScheduler(config)) .connector(connector) .taskExecutor(taskExecutor != null ? taskExecutor : new SingleThreadTaskExecutor("node")) .store(nodeStore != null ? nodeStore : new FileNodeStore(new File("./example/node.bin"))) .log(log != null ? log : new MemoryLog()) .build(); } }
此處Log部分后面再補充。
單元測試
public class NodeImplTest {private NodeBuilder newNodeBuilder(NodeId selfId, NodeEndpoint... endpoints) {return new NodeBuilder(Arrays.asList(endpoints),selfId) .setScheduler(new NullScheduler()) .setConnector(new MockConnector()) .setTaskExecutor(new DirectTaskExecutor()) .setNodeStore(new MemoryNodeStore()); }/** * 啟動測試 */ @Test public void testStart() { NodeImpl node = (NodeImpl) newNodeBuilder(NodeId.of("A"), new NodeEndpoint(new NodeId("A"),"localhost",2333)) .build(); node.start(); FollowerNodeRole role = (FollowerNodeRole) node.getRole(); assertEquals(role.getTerm(),0); assertEquals(role.getVotedFor(),null); }/** * 測試收到選舉結果消息 */ @Test public void testOnReceiveRequestVoteResult() { NodeImpl node = (NodeImpl) newNodeBuilder(NodeId.of("A"), new NodeEndpoint(new NodeId("A"),"localhost",2333), new NodeEndpoint(new NodeId("B"),"localhost",2334), new NodeEndpoint(new NodeId("C"),"localhost",2335)) .build(); node.start(); node.electionTimeout(); node.onReceiveRequestVoteResult(new RequestVoteResult(1,true)); LeaderNodeRole role = (LeaderNodeRole) node.getRole(); assertEquals(role.getTerm(),1); }/** * 測試日志復制 */ @Test public void testReplicateLog() { NodeImpl node = (NodeImpl) newNodeBuilder(NodeId.of("A"), new NodeEndpoint(new NodeId("A"),"localhost",2333), new NodeEndpoint(new NodeId("B"),"localhost",2334), new NodeEndpoint(new NodeId("C"),"localhost",2335)) .build(); node.start(); node.electionTimeout(); node.onReceiveRequestVoteResult(new RequestVoteResult(1,true)); node.replicateLog(); MockConnector mockConnector = (MockConnector) node.getContext().getConnector(); assertEquals(mockConnector.getMessageCount(),3); List<MockConnector.Message> messages = mockConnector.getMessages(); Set<NodeId> destinationNodeIds = messages.subList(1,3).stream() .map(MockConnector.Message::getDestinationNodeId) .collect(Collectors.toSet()); assertEquals(destinationNodeIds.size(),2); assertTrue(destinationNodeIds.contains(NodeId.of("B"))); assertTrue(destinationNodeIds.contains(NodeId.of("C"))); AppendEntriesRpc rpc = (AppendEntriesRpc) messages.get(2).getRpc(); assertEquals(rpc.getTerm(),1); }/** * 測試復制日志請求消息的處理 */ @Test public void testOnReceiveAppendEntriesRpcFollower() { NodeImpl node = (NodeImpl) newNodeBuilder(NodeId.of("A"), new NodeEndpoint(new NodeId("A"),"localhost",2333), new NodeEndpoint(new NodeId("B"),"localhost",2334), new NodeEndpoint(new NodeId("C"),"localhost",2335)) .build(); node.start(); AppendEntriesRpc rpc = new AppendEntriesRpc(); rpc.setTerm(1); rpc.setLeaderId(NodeId.of("B")); node.onReceiveAppendEntriesRpc(new AppendEntriesRpcMessage(rpc,NodeId.of("B"),null)); MockConnector connector = (MockConnector) node.getContext().getConnector(); AppendEntriesResult result = (AppendEntriesResult) connector.getResult(); assertEquals(result.getTerm(),1); assertTrue(result.isSuccess()); FollowerNodeRole role = (FollowerNodeRole) node.getRole(); assertEquals(role.getTerm(),1); assertEquals(NodeId.of("B"),role.getLeaderId()); }/** * 測試日志復制回復消息的處理 */ @Test public void testOnReceiveAppendEntriesNormal() { NodeImpl node = (NodeImpl) newNodeBuilder(NodeId.of("A"), new NodeEndpoint(new NodeId("A"),"localhost",2333), new NodeEndpoint(new NodeId("B"),"localhost",2334), new NodeEndpoint(new NodeId("C"),"localhost",2335)) .build(); node.start(); node.electionTimeout(); node.onReceiveRequestVoteResult(new RequestVoteResult(1,true)); node.replicateLog(); node.onReceiveAppendEntriesResult(new AppendEntriesResultMessage(new AppendEntriesResult("",1,true), NodeId.of("B"), new AppendEntriesRpc())); } }
testStart結果如下
testOnReceiveRequestVoteResult結果如下
testReplicateLog結果如下
testOnReceiveAppendEntriesRpcFollower結果如下
testOnReceiveAppendEntriesNormal結果如下
日志實現
日志在分布式一致性算法中一直都是一個很重要的基礎組件,不管是在與Raft算法作為對比對象的Paxos算法中,還是在Paxos變體算法中。這些算法所要求的日志系統和一般的數據庫WAL(Write-Ahead Log),即只會追加日志的日志系統不同,在運行中寫入的日志可能會因為沖突而被丟棄或者說被覆蓋。日志并不關心上層服務是什么,日志存儲的內容是與服務無關的。可以把服務的某個請求轉換成一種通用的存儲方式,比如轉換成二進制存放起來。
日志條目接口
/** * 日志條目 */public interface Entry {//日志條目類型 int KIND_NO_OP = 0; //選舉產生的新Leader節點增加的第一條空白日志 int KIND_GENERAL = 1; //普通日志條目,上層服務產生的日志 /** * 獲取類型 * @return */ int getKind(); /** * 獲取索引 * @return */ int getIndex(); /** * 獲取term * @return */ int getTerm(); /** * 獲取元信息(kind,term和index) * @return */ EntryMeta getMeta(); /** * 獲取日志負載 * @return */ byte[] getCommandBytes();}
/** * 日志條目元信息 */@AllArgsConstructor@Getterpublic class EntryMeta {private final int kind; private final int index; private final int term;}
日志條目抽象類
/** * 日志條目抽象類 */@AllArgsConstructorpublic abstract class AbstractEntry implements Entry {//日志類型 private final int kind; //日志索引 protected final int index; protected final int term; @Override public int getKind() {return this.kind; }@Override public int getIndex() {return index; }@Override public int getTerm() {return term; }@Override public EntryMeta getMeta() {return new EntryMeta(kind, index, term); } }
普通日志條目
/** * 普通日志條目 */public class GeneralEntry extends AbstractEntry {//日志負載 private final byte[] commandBytes; public GeneralEntry(int index, int term, byte[] commandBytes) {super(KIND_GENERAL, index, term); this.commandBytes = commandBytes; }@Override public byte[] getCommandBytes() {return this.commandBytes; }@Override public String toString() {return "GeneralEntry{" +"index=" + index +", term=" + term +'}'; } }
空日志條目
/** * 空日志條目 */public class NoOpEntry extends AbstractEntry {public NoOpEntry(int index, int term) {super(KIND_NO_OP, index, term); }@Override public byte[] getCommandBytes() {return new byte[0]; }@Override public String toString() {return "NoOpEntry{" +"index=" + index +", term=" + term +'}'; } }
日志
日志接口
/** * 日志 */public interface Log {//總條目數 int ALL_ENTRIES = -1; /** * 獲取最后一條日志的元信息 * 一般用于選取開始時、發送消息時 */ EntryMeta getLastEntryMeta(); /** * 創建AppendEntries消息 * Leader向Follower發送日志復制消息時 * * @param term 當前的term * @param selfId 自節點Id * @param nextIndex 下一條索引 * @param maxEntries 最大條目數 */ AppendEntriesRpc createAppendEntriesRpc(int term, NodeId selfId, int nextIndex, int maxEntries); /** * 獲取下一條日志索引 */ int getNextIndex(); /** * 獲取當前提交的索引 */ int getCommitIndex(); /** * 判斷對象的lastLogIndex和LastLogTerm是否比自己新 * * @param lastLogIndex 最后一條日志索引 * @param lastLogTerm 最后一條日志term */ boolean isNewerThan(int lastLogIndex, int lastLogTerm); /** * 增加一個空日志條目 * 上層服務操作或者當前節點成為Leader后的第一條空日志 * * @param term */ NoOpEntry appendEntry(int term); /** * 增加一個普通日志條目 * * @param term * @param command */ GeneralEntry appendEntry(int term, byte[] command); /** * 追加來自Leader的日志條目 * 收到來自Leader服務器的日志復制請求時 * * @param prevLogIndex 日志條目的前一個索引 * @param prevLogTerm 日志復制的前一個term * @param entries 日志條目集合 * @return true if success, false if previous log check failed */ boolean appendEntriesFromLeader(int prevLogIndex, int prevLogTerm, List<Entry> entries); /** * 推進commitIndex * 收到來自Leader服務器的日志復制請求時 * * @param newCommitIndex 新的commitIndex * @param currentTerm 當前term */ void advanceCommitIndex(int newCommitIndex, int currentTerm); /** * 關閉 */ void close();}
日志條目序列
/** * 日志條目序列 */public interface EntrySequence {/** * 判斷是否為空 * @return */ boolean isEmpty(); /** * 獲取第一條日志的索引 * @return */ int getFirstLogIndex(); /** * 獲取最后一條日志的索引 * @return */ int getLastLogIndex(); /** * 獲取下一條日志的索引 * @return */ int getNextLogIndex(); /** * 獲取序列的子視圖,到最后一條日志 * @param fromIndex * @return */ List<Entry> subView(int fromIndex); /** * 獲取序列的子視圖,指定范圍[fromIndex, toIndex) * @param fromIndex * @param toIndex * @return */ List<Entry> subList(int fromIndex, int toIndex); /** * 檢查某個日志條目是否存在 * @param index * @return */ boolean isEntryPresent(int index); /** * 獲取某個日志條目的元信息 * @param index * @return */ EntryMeta getEntryMeta(int index); /** * 獲取某個日志條目 * @param index * @return */ Entry getEntry(int index); /** * 獲取最后一個日志條目 * @return */ Entry getLastEntry(); /** * 追加日志條目 * @param entry */ void append(Entry entry); /** * 追加多條日志 * @param entries */ void append(List<Entry> entries); /** * 推進commitIndex * @param index */ void commit(int index); /** * 獲取當前commitIndex * @return */ int getCommitIndex(); /** * 移除某個索引之后的日志條目 * @param index */ void removeAfter(int index); /** * 關閉日志序列 */ void close();}
日志條目序列抽象類
/** * 日志條目序列抽象類 */public abstract class AbstractEntrySequence implements EntrySequence {//日志索引偏移 protected int logIndexOffset; //下一條日志的索引 protected int nextLogIndex; public AbstractEntrySequence(int logIndexOffset) {this.logIndexOffset = logIndexOffset; this.nextLogIndex = logIndexOffset; }/** * 日志索引偏移量在當前的日志條目序列不是從1開始時,不管第一條日志是否存在 * 初始情況下:日志索引偏移 = 下一條日志的索引 = 1 * @return */ @Override public boolean isEmpty() {return logIndexOffset == nextLogIndex; }@Override public int getFirstLogIndex() {if (isEmpty()) {throw new EmptySequenceException(); }return doGetFirstLogIndex(); }/** * 獲取日志索引偏移 * @return */ protected int doGetFirstLogIndex() {return logIndexOffset; }@Override public int getLastLogIndex() {if (isEmpty()) {throw new EmptySequenceException(); }return doGetLastLogIndex(); }/** * 獲取最后一條日志的索引 * @return */ protected int doGetLastLogIndex() {return nextLogIndex - 1; }@Override public boolean isEntryPresent(int index) {return !isEmpty() && index >= doGetFirstLogIndex() && index <= doGetLastLogIndex(); }@Override public Entry getEntry(int index) {if (!isEntryPresent(index)) {return null; }return doGetEntry(index); }@Override public EntryMeta getEntryMeta(int index) { Entry entry = getEntry(index); return entry != null ? entry.getMeta() : null; }/** * 獲取指定索引的日志條目 * @param index * @return */ protected abstract Entry doGetEntry(int index); @Override public Entry getLastEntry() {return isEmpty() ? null : doGetEntry(doGetLastLogIndex()); }@Override public List<Entry> subView(int fromIndex) {if (isEmpty() || fromIndex > doGetLastLogIndex()) {return Collections.emptyList(); }return subList(Math.max(fromIndex, doGetFirstLogIndex()), nextLogIndex); }// [fromIndex, toIndex) @Override public List<Entry> subList(int fromIndex, int toIndex) {if (isEmpty()) {throw new EmptySequenceException(); }if (fromIndex < doGetFirstLogIndex() || toIndex > doGetLastLogIndex() + 1 || fromIndex > toIndex) {throw new IllegalArgumentException("illegal from index " + fromIndex + " or to index " + toIndex); }return doSubList(fromIndex, toIndex); }protected abstract List<Entry> doSubList(int fromIndex, int toIndex); @Override public int getNextLogIndex() {return nextLogIndex; }@Override public void append(List<Entry> entries) {for (Entry entry : entries) { append(entry); } }@Override public void append(Entry entry) {if (entry.getIndex() != nextLogIndex) {throw new IllegalArgumentException("entry index must be " + nextLogIndex); } doAppend(entry); nextLogIndex++; }protected abstract void doAppend(Entry entry); @Override public void removeAfter(int index) {if (isEmpty() || index >= doGetLastLogIndex()) {return; } doRemoveAfter(index); }protected abstract void doRemoveAfter(int index);}
基于內存實現的日志條目序列
/** * 基于內存實現的日志條目序列 */public class MemoryEntrySequence extends AbstractEntrySequence {private final List<Entry> entries = new ArrayList<>(); private int commitIndex = 0; public MemoryEntrySequence() {this(1); }public MemoryEntrySequence(int logIndexOffset) {super(logIndexOffset); }@Override protected List<Entry> doSubList(int fromIndex, int toIndex) {return entries.subList(fromIndex - logIndexOffset, toIndex - logIndexOffset); }@Override protected Entry doGetEntry(int index) {return entries.get(index - logIndexOffset); }@Override protected void doAppend(Entry entry) {entries.add(entry); }@Override public void commit(int index) {commitIndex = index; }@Override public int getCommitIndex() {return commitIndex; }@Override protected void doRemoveAfter(int index) {if (index < doGetFirstLogIndex()) {entries.clear(); nextLogIndex = logIndexOffset; } else {entries.subList(index - logIndexOffset + 1, entries.size()).clear(); nextLogIndex = index + 1; } }@Override public void close() { }@Override public String toString() {return "MemoryEntrySequence{" +"logIndexOffset=" + logIndexOffset +", nextLogIndex=" + nextLogIndex +", entries.size=" + entries.size() +'}'; } }
基于文件實現的日志條目序列
日志條目文件結構
日志條目文件按照記錄行的方式組織文件。每一行的內容有日志類型(4個字節),日志索引(4個字節),日志term(4個字節),命令長度(4個字節)和具體的命令內容(變長)
/** * 日志條目文件 */@AllArgsConstructorpublic class EntriesFile {//可定位文件 private final SeekableFile seekableFile; public EntriesFile(File file) throws FileNotFoundException {this(new RandomAccessFileAdapter(file)); }/** * 追加日志條目 * @param entry * @return * @throws IOException */ public long appendEntry(Entry entry) throws IOException {long offset = seekableFile.size(); seekableFile.seek(offset); seekableFile.writeInt(entry.getKind()); seekableFile.writeInt(entry.getIndex()); seekableFile.writeInt(entry.getTerm()); byte[] commandBytes = entry.getCommandBytes(); seekableFile.writeInt(commandBytes.length); seekableFile.write(commandBytes); return offset; }/** * 從指定偏移加載日志條目 * @param offset * @param factory * @return * @throws IOException */ public Entry loadEntry(long offset, EntryFactory factory) throws IOException {if (offset > seekableFile.size()) {throw new IllegalArgumentException("offset > size"); }seekableFile.seek(offset); int kind = seekableFile.readInt(); int index = seekableFile.readInt(); int term = seekableFile.readInt(); int length = seekableFile.readInt(); byte[] bytes = new byte[length]; seekableFile.read(bytes); return factory.create(kind, index, term, bytes); }public long size() throws IOException {return seekableFile.size(); }public void clear() throws IOException { truncate(0L); }public void truncate(long offset) throws IOException {seekableFile.truncate(offset); }public void close() throws IOException {seekableFile.close(); } }
/** * 日志條目工廠 */public class EntryFactory {/** * 創建日志條目對象 * @param kind * @param index * @param term * @param commandBytes * @return */ public Entry create(int kind, int index, int term, byte[] commandBytes) {switch (kind) {case Entry.KIND_NO_OP:return new NoOpEntry(index, term); case Entry.KIND_GENERAL:return new GeneralEntry(index, term, commandBytes); default:throw new IllegalArgumentException("unexpected entry kind " + kind); } } }
日志條目索引文件
EntryIndexFile開頭的是起始索引和結束索引。接下來是日志條目的元信息,日志索引不包括在內,日志索引可以通過計算來獲得。比如第一條日志條目元信息的索引為minEntryIndex,之后一條為minEntryIndex+1,最后一條日志條目元信息的索引為maxEntryIndex。
/** * 日志條目索引文件 */public class EntryIndexFile implements Iterable<EntryIndexItem> {//最大條目索引的偏移 private static final long OFFSET_MAX_ENTRY_INDEX = Integer.BYTES; //單條日志條目元信息的長度 private static final int LENGTH_ENTRY_INDEX_ITEM = 16; //可定位的文件 private final SeekableFile seekableFile; //日志條目數 @Getter private int entryIndexCount; //最小日志索引 private int minEntryIndex; //最大日志索引 private int maxEntryIndex; //日志條目容器 private Map<Integer, EntryIndexItem> entryIndexMap = new HashMap<>(); public EntryIndexFile(File file) throws IOException {this(new RandomAccessFileAdapter(file)); }public EntryIndexFile(SeekableFile seekableFile) throws IOException {this.seekableFile = seekableFile; load(); }/** * 加載所有日志元信息 * @throws IOException */ private void load() throws IOException {if (seekableFile.size() == 0L) {entryIndexCount = 0; return; }minEntryIndex = seekableFile.readInt(); maxEntryIndex = seekableFile.readInt(); updateEntryIndexCount(); //逐條加載日志元信息到容器 long offset; int kind; int term; for (int i = minEntryIndex; i <= maxEntryIndex; i++) { offset = seekableFile.readLong(); kind = seekableFile.readInt(); term = seekableFile.readInt(); entryIndexMap.put(i, new EntryIndexItem(i, offset, kind, term)); } }/** * 更新日志條目數量 */ private void updateEntryIndexCount() {entryIndexCount = maxEntryIndex - minEntryIndex + 1; }/** * 文件是否為空 * @return */ public boolean isEmpty() {return entryIndexCount == 0; }public int getMinEntryIndex() { checkEmpty(); return minEntryIndex; }private void checkEmpty() {if (isEmpty()) {throw new IllegalStateException("no entry index"); } }public int getMaxEntryIndex() { checkEmpty(); return maxEntryIndex; }/** * 追加日志條目信息 * @param index * @param offset * @param kind * @param term * @throws IOException */ public void appendEntryIndex(int index, long offset, int kind, int term) throws IOException {if (seekableFile.size() == 0L) {//如果文件為空,則寫入最小日志條目索引 seekableFile.writeInt(index); minEntryIndex = index; } else {//索引檢查 if (index != maxEntryIndex + 1) {throw new IllegalArgumentException("index must be " + (maxEntryIndex + 1) + ", but was " + index); }//跳過最小日志條目索引 seekableFile.seek(OFFSET_MAX_ENTRY_INDEX); }//寫入最大日志條目索引 seekableFile.writeInt(index); maxEntryIndex = index; updateEntryIndexCount(); //移動到文件最后 seekableFile.seek(getOffsetOfEntryIndexItem(index)); seekableFile.writeLong(offset); seekableFile.writeInt(kind); seekableFile.writeInt(term); entryIndexMap.put(index, new EntryIndexItem(index, offset, kind, term)); }/** * 獲取指定索引的日志的偏移 * @param index * @return */ private long getOffsetOfEntryIndexItem(int index) {return (index - minEntryIndex) * LENGTH_ENTRY_INDEX_ITEM + Integer.BYTES * 2; }/** * 清除全部 * @throws IOException */ public void clear() throws IOException {seekableFile.truncate(0L); entryIndexCount = 0; entryIndexMap.clear(); }/** * 移除某個索引之后的數據 * @param newMaxEntryIndex * @throws IOException */ public void removeAfter(int newMaxEntryIndex) throws IOException {//判斷是否為空 if (isEmpty() || newMaxEntryIndex >= maxEntryIndex) {return; }//判斷新的maxEntryIndex是否比minEntryIndex小 //如果是則全部移除 if (newMaxEntryIndex < minEntryIndex) { clear(); return; }//修改maxEntryIndex seekableFile.seek(OFFSET_MAX_ENTRY_INDEX); seekableFile.writeInt(newMaxEntryIndex); //裁剪文件 seekableFile.truncate(getOffsetOfEntryIndexItem(newMaxEntryIndex + 1)); //移除容器中的元信息 for (int i = newMaxEntryIndex + 1; i <= maxEntryIndex; i++) {entryIndexMap.remove(i); }maxEntryIndex = newMaxEntryIndex; entryIndexCount = newMaxEntryIndex - minEntryIndex + 1; }public long getOffset(int entryIndex) {return get(entryIndex).getOffset(); }public EntryIndexItem get(int entryIndex) { checkEmpty(); if (entryIndex < minEntryIndex || entryIndex > maxEntryIndex) {throw new IllegalArgumentException("index < min or index > max"); }return entryIndexMap.get(entryIndex); }/** * 遍歷文件中所有的日志條目元信息 * @return */ @Override public Iterator<EntryIndexItem> iterator() {if (isEmpty()) {return Collections.emptyIterator(); }return new EntryIndexIterator(entryIndexCount, minEntryIndex); }public void close() throws IOException {seekableFile.close(); }/** * 日志條目索引迭代器 */ @AllArgsConstructor private class EntryIndexIterator implements Iterator<EntryIndexItem> {//條目總數 private final int entryIndexCount; //當前索引 private int currentEntryIndex; /** * 是否存在下一條 * @return */ @Override public boolean hasNext() { checkModification(); return currentEntryIndex <= maxEntryIndex; }/** * 檢查是否修改 */ private void checkModification() {if (this.entryIndexCount != EntryIndexFile.this.entryIndexCount) {throw new IllegalStateException("entry index count changed"); } }/** * 獲取下一條 * @return */ @Override public EntryIndexItem next() { checkModification(); return entryIndexMap.get(currentEntryIndex++); } } }
文件地址
/** * 文件地址 */public interface LogDir {/** * 初始化目錄 */ void initialize(); /** * 目錄是否存在 * @return */ boolean exists(); /** * 獲取EntriesFile對應的文件 * @return */ File getEntriesFile(); /** * 獲取EntryIndexFile對應的文件 * @return */ File getEntryOffsetIndexFile(); /** * 獲取目錄 * @return */ File get(); /** * 重命名目錄 * @param logDir * @return */ boolean renameTo(LogDir logDir);}
/** * 基于文件實現的日志條目序列 */public class FileEntrySequence extends AbstractEntrySequence {//索引條目工廠 private final EntryFactory entryFactory = new EntryFactory(); //日志條目文件 private final EntriesFile entriesFile; //日志條目索引文件 private final EntryIndexFile entryIndexFile; //日志條目緩沖 private final LinkedList<Entry> pendingEntries = new LinkedList<>(); //Raft算法中定義初始commitIndex為0,和日志是否支持持久化無關 private int commitIndex; public FileEntrySequence(LogDir logDir, int logIndexOffset) {super(logIndexOffset); try {this.entriesFile = new EntriesFile(logDir.getEntriesFile()); this.entryIndexFile = new EntryIndexFile(logDir.getEntryOffsetIndexFile()); initialize(); } catch (IOException e) {throw new LogException("failed to open entries file or entry index file", e); } }public FileEntrySequence(EntriesFile entriesFile, EntryIndexFile entryIndexFile, int logIndexOffset) {super(logIndexOffset); this.entriesFile = entriesFile; this.entryIndexFile = entryIndexFile; initialize(); }/** * 初始化 */ private void initialize() {if (entryIndexFile.isEmpty()) {commitIndex = logIndexOffset - 1; return; }//使用日志索引文件的minEntryIndex作為logIndexOffset logIndexOffset = entryIndexFile.getMinEntryIndex(); //使用日志索引文件的maxEntryIndex加1作為nextLogOffset nextLogIndex = entryIndexFile.getMaxEntryIndex() + 1; commitIndex = entryIndexFile.getMaxEntryIndex(); }@Override public int getCommitIndex() {return commitIndex; }/** * 獲取日志條目視圖 * @param fromIndex * @param toIndex * @return */ @Override protected List<Entry> doSubList(int fromIndex, int toIndex) {//結果分為來自文件的與來自緩沖的兩部分 List<Entry> result = new ArrayList<>(); //從文件中獲取日志條目 if (!entryIndexFile.isEmpty() && fromIndex <= entryIndexFile.getMaxEntryIndex()) {int maxIndex = Math.min(entryIndexFile.getMaxEntryIndex() + 1, toIndex); for (int i = fromIndex; i < maxIndex; i++) { result.add(getEntryInFile(i)); } }//從日志緩沖中獲取日志條目 if (!pendingEntries.isEmpty() && toIndex > pendingEntries.getFirst().getIndex()) { Iterator<Entry> iterator = pendingEntries.iterator(); Entry entry; int index; while (iterator.hasNext()) { entry = iterator.next(); index = entry.getIndex(); if (index >= toIndex) {break; }if (index >= fromIndex) { result.add(entry); } } }return result; }/** * 獲取指定位置的日志條目 * @param index * @return */ @Override protected Entry doGetEntry(int index) {if (!pendingEntries.isEmpty()) {int firstPendingEntryIndex = pendingEntries.getFirst().getIndex(); if (index >= firstPendingEntryIndex) {return pendingEntries.get(index - firstPendingEntryIndex); } }assert !entryIndexFile.isEmpty(); return getEntryInFile(index); }/** * 獲取日志元信息 * @param index * @return */ @Override public EntryMeta getEntryMeta(int index) {if (!isEntryPresent(index)) {return null; }if (entryIndexFile.isEmpty()) {return pendingEntries.get(index - doGetFirstLogIndex()).getMeta(); }return entryIndexFile.get(index).toEntryMeta(); }/** * 按照索引獲取文件中的日志條目 * @param index * @return */ private Entry getEntryInFile(int index) {long offset = entryIndexFile.getOffset(index); try {return entriesFile.loadEntry(offset, entryFactory); } catch (IOException e) {throw new LogException("failed to load entry " + index, e); } }/** * 獲取最后一條日志 * @return */ @Override public Entry getLastEntry() {if (isEmpty()) {return null; }if (!pendingEntries.isEmpty()) {return pendingEntries.getLast(); }assert !entryIndexFile.isEmpty(); return getEntryInFile(entryIndexFile.getMaxEntryIndex()); }/** * 追加日志條目 * @param entry */ @Override protected void doAppend(Entry entry) {pendingEntries.add(entry); }/** * 提交commitIndex * @param index */ @Override public void commit(int index) {//檢查commitIndex if (index < commitIndex) {throw new IllegalArgumentException("commit index < " + commitIndex); }if (index == commitIndex) {return; }if (pendingEntries.isEmpty() || pendingEntries.getLast().getIndex() < index) {throw new IllegalArgumentException("no entry to commit or commit index exceed"); }long offset; Entry entry = null; try {for (int i = commitIndex + 1; i <= index; i++) { entry = pendingEntries.removeFirst(); offset = entriesFile.appendEntry(entry); entryIndexFile.appendEntryIndex(i, offset, entry.getKind(), entry.getTerm()); commitIndex = i; } } catch (IOException e) {throw new LogException("failed to commit entry " + entry, e); } }/** * 移除指定索引之后的日志條目 * @param index */ @Override protected void doRemoveAfter(int index) {//只需要移除緩沖中的日志 if (!pendingEntries.isEmpty() && index >= pendingEntries.getFirst().getIndex() - 1) {//移除指定數目的日志條目 //循環方向是從小到大,但是移除是從后往前 //最終移除指定數量的日志條目 for (int i = index + 1; i <= doGetLastLogIndex(); i++) {pendingEntries.removeLast(); }nextLogIndex = index + 1; return; }try {if (index >= doGetFirstLogIndex()) {//索引比日志緩沖中的第一條日志小 pendingEntries.clear(); entriesFile.truncate(entryIndexFile.getOffset(index + 1)); entryIndexFile.removeAfter(index); nextLogIndex = index + 1; commitIndex = index; } else {//如果索引比第一條日志的索引都小,則清除所有數據 pendingEntries.clear(); entriesFile.clear(); entryIndexFile.clear(); nextLogIndex = logIndexOffset; commitIndex = logIndexOffset - 1; } } catch (IOException e) {throw new LogException(e); } }/** * 關閉文件序列 */ @Override public void close() {try {entriesFile.close(); entryIndexFile.close(); } catch (IOException e) {throw new LogException("failed to close", e); } } }
日志實現
日志抽象類
/** * 日志抽象類 */@Slf4jpublic abstract class AbstractLog implements Log {//日志條目序列 protected EntrySequence entrySequence; @Override public EntryMeta getLastEntryMeta() {if (entrySequence.isEmpty()) {return new EntryMeta(Entry.KIND_NO_OP, 0,0); }return entrySequence.getLastEntry().getMeta(); }@Override public AppendEntriesRpc createAppendEntriesRpc(int term, NodeId selfId, int nextIndex, int maxEntries) {int nextLogIndex = entrySequence.getNextLogIndex(); if (nextIndex > nextLogIndex) {throw new IllegalArgumentException("illegal next index " + nextIndex); } AppendEntriesRpc rpc = new AppendEntriesRpc(); rpc.setMessageId(UUID.randomUUID().toString()); rpc.setTerm(term); rpc.setLeaderId(selfId); rpc.setLeaderCommit(entrySequence.getCommitIndex()); Entry entry = entrySequence.getEntry(nextIndex - 1); if (entry != null) { rpc.setPrevLogIndex(entry.getIndex()); rpc.setPrevLogTerm(entry.getTerm()); }if (!entrySequence.isEmpty()) {int maxIndex = (maxEntries == ALL_ENTRIES ? nextLogIndex : Math.min(nextLogIndex, nextIndex + maxEntries)); rpc.setEntries(entrySequence.subList(nextIndex, maxIndex)); }return rpc; }@Override public int getNextIndex() {return entrySequence.getNextLogIndex(); }@Override public int getCommitIndex() {return entrySequence.getCommitIndex(); }@Override public boolean isNewerThan(int lastLogIndex, int lastLogTerm) { EntryMeta lastEntryMeta = getLastEntryMeta(); log.debug("last entry ({}, {}), candidate ({}, {})", lastEntryMeta.getIndex(), lastEntryMeta.getTerm(), lastLogIndex, lastLogTerm); return lastEntryMeta.getTerm() > lastLogTerm || lastEntryMeta.getIndex() > lastLogIndex; }@Override public NoOpEntry appendEntry(int term) { NoOpEntry entry = new NoOpEntry(entrySequence.getNextLogIndex(), term); entrySequence.append(entry); return entry; }@Override public GeneralEntry appendEntry(int term, byte[] command) { GeneralEntry entry = new GeneralEntry(entrySequence.getNextLogIndex(), term, command); entrySequence.append(entry); return entry; }/** * 追加從主節點來的日志條目 * 在追加之前需要移除不一致的日志條目。移除時從最后一條匹配的日志條目開始, * 之后所有沖突的日志條目都會被移除 * @param prevLogIndex 日志條目的前一個索引 * @param prevLogTerm 日志復制的前一個term * @param leaderEntries * @return */ @Override public boolean appendEntriesFromLeader(int prevLogIndex, int prevLogTerm, List<Entry> leaderEntries) {//檢查前一條日志是否匹配 if (!checkIfPreviousLogMatches(prevLogIndex, prevLogTerm)) {return false; }//Leader節點傳遞過來的日志條目為空 if (leaderEntries.isEmpty()) {return true; }assert prevLogIndex + 1 == leaderEntries.get(0).getIndex(); //移除沖突的日志條目并返回接下來要追加的日志條目(如果還有的話) EntrySequenceView newEntries = removeUnmatchedLog(new EntrySequenceView(leaderEntries)); //僅追加日志 appendEntriesFromLeader(newEntries); return true; }/** * 追加全部日志 * @param leaderEntries */ private void appendEntriesFromLeader(EntrySequenceView leaderEntries) {if (leaderEntries.isEmpty()) {return; }log.debug("append entries from leader from {} to {}", leaderEntries.getFirstLogIndex(), leaderEntries.getLastLogIndex()); Iterator<Entry> leaderEntriesIterator = leaderEntries.iterator(); while (leaderEntriesIterator.hasNext()) {entrySequence.append(leaderEntriesIterator.next()); } }/** * 移除沖突的日志條目 * @param leaderEntries * @return */ private EntrySequenceView removeUnmatchedLog(EntrySequenceView leaderEntries) {//Leader節點過來的entries不應該為空 assert !leaderEntries.isEmpty(); //找到第一個不匹配的日志索引 int firstUnmatched = findFirstUnmatchedLog(leaderEntries); //沒有不匹配的日志 if (firstUnmatched < 0) {return new EntrySequenceView(Collections.emptyList()); }//移除不匹配的日志索引開始的所有日志 removeEntriesAfter(firstUnmatched - 1); //返回之后追加的日志條目 return leaderEntries.subView(firstUnmatched); }/** * 查找第一條不匹配的日志 * @param leaderEntries * @return */ private int findFirstUnmatchedLog(EntrySequenceView leaderEntries) {//Leader節點過來的entries不應該為空 assert !leaderEntries.isEmpty(); int logIndex; EntryMeta followerEntryMeta; Iterator<Entry> entryIterator = leaderEntries.iterator(); while (entryIterator.hasNext()) { Entry leaderEntry = entryIterator.next(); logIndex = leaderEntry.getIndex(); //按照索引查找日志條目信息 followerEntryMeta = entrySequence.getEntryMeta(logIndex); //日志不存在或者term不一致 if (followerEntryMeta == null || followerEntryMeta.getTerm() != leaderEntry.getTerm()) {return logIndex; } }return -1; }/** * 檢查前一條日志是否匹配 * @param prevLogIndex * @param prevLogTerm * @return */ private boolean checkIfPreviousLogMatches(int prevLogIndex, int prevLogTerm) {//檢查指定索引的日志條目 Entry entry = entrySequence.getEntry(prevLogIndex); //日志不存在 if (entry == null) {log.debug("previous log {} not found", prevLogIndex); return false; }int term = entry.getTerm(); if (term != prevLogTerm) {log.debug("different term of previous log, local {}, remote {}", term, prevLogTerm); return false; }return true; }/** * 移除不匹配的索引之后的日志條目 * @param index */ private void removeEntriesAfter(int index) {if (entrySequence.isEmpty() || index >= entrySequence.getLastLogIndex()) {return; }log.debug("remove entries after {}", index); entrySequence.removeAfter(index); }/** * 推進commitIndex * @param newCommitIndex 新的commitIndex * @param currentTerm 當前term */ @Override public void advanceCommitIndex(int newCommitIndex, int currentTerm) {if (!validateNewCommitIndex(newCommitIndex, currentTerm)) {return; }log.debug("advance commit index from {} to {}", entrySequence.getCommitIndex(), newCommitIndex); entrySequence.commit(newCommitIndex); }/** * 檢查新的commitIndex * @param newCommitIndex * @param currentTerm * @return */ private boolean validateNewCommitIndex(int newCommitIndex, int currentTerm) {//小于當前的commitIndex if (newCommitIndex <= entrySequence.getCommitIndex()) {return false; } Entry entry = entrySequence.getEntry(newCommitIndex); if (entry == null) {log.debug("log of new commit index {} not found", newCommitIndex); return false; }//日志條目的term必須是當前term,才可推進commitIndex if (entry.getTerm() != currentTerm) {log.debug("log term of new commit index != current term ({} != {})", entry.getTerm(), currentTerm); return false; }return true; }@Override public void close() {entrySequence.close(); }/** * 日志條目序列視圖 */ private static class EntrySequenceView implements Iterable<Entry> {private final List<Entry> entries; @Getter private int firstLogIndex; @Getter private int lastLogIndex; EntrySequenceView(List<Entry> entries) {this.entries = entries; if (!entries.isEmpty()) {firstLogIndex = entries.get(0).getIndex(); lastLogIndex = entries.get(entries.size() - 1).getIndex(); } } Entry get(int index) {if (entries.isEmpty() || index < firstLogIndex || index > lastLogIndex) {return null; }return entries.get(index - firstLogIndex); }boolean isEmpty() {return entries.isEmpty(); } EntrySequenceView subView(int fromIndex) {if (entries.isEmpty() || fromIndex > lastLogIndex) {return new EntrySequenceView(Collections.emptyList()); }return new EntrySequenceView(entries.subList(fromIndex - firstLogIndex, entries.size()) ); }@Override public Iterator<Entry> iterator() {return entries.iterator(); } } }
基于內存的日志
/** * 基于內存的日志 */public class MemoryLog extends AbstractLog {public MemoryLog(EntrySequence entrySequence) {this.entrySequence = entrySequence; }public MemoryLog() {this(new MemoryEntrySequence()); } }
基于文件的日志
/** * 抽象文件地址 */@AllArgsConstructorpublic abstract class AbstractLogDir implements LogDir {protected final File dir; @Override public void initialize() {if (!dir.exists() && !dir.mkdir()) {throw new LogException("failed to create directory " + dir); }try { Files.touch(getEntriesFile()); Files.touch(getEntryOffsetIndexFile()); } catch (IOException e) {throw new LogException("failed to create file", e); } }@Override public boolean exists() {return dir.exists(); }@Override public File getEntriesFile() {return new File(dir, RootDir.FILE_NAME_ENTRIES); }@Override public File getEntryOffsetIndexFile() {return new File(dir, RootDir.FILE_NAME_ENTRY_OFFSET_INDEX); }@Override public File get() {return dir; }@Override public boolean renameTo(LogDir logDir) {return dir.renameTo(logDir.get()); } }
/** * 日志代 * 日志根目錄下存在多個日志的分代 * log-root * |-log-1 * | |-entries.bin * | /-entries.idx * /-log-100 * |-entries.bin * /-entries.idx * 上面log-1、log-100是兩個日志代,數字是日志索引偏移lastIncludedIndex */public class LogGeneration extends AbstractLogDir implements Comparable<LogGeneration> {//前綴正則匹配 private static final Pattern DIR_NAME_PATTERN = Pattern.compile("log-(\\d+)"); //最新的日志代索引偏移 @Getter private final int lastIncludedIndex; public LogGeneration(File baseDir, int lastIncludedIndex) {super(new File(baseDir, generateDirName(lastIncludedIndex))); this.lastIncludedIndex = lastIncludedIndex; }public LogGeneration(File dir) {super(dir); Matcher matcher = DIR_NAME_PATTERN.matcher(dir.getName()); if (!matcher.matches()) {throw new IllegalArgumentException("not a directory name of log generation, [" + dir.getName() + "]"); }lastIncludedIndex = Integer.parseInt(matcher.group(1)); }/** * 日志地址是否有效 * @param dirName * @return */ public static boolean isValidDirName(String dirName) {return DIR_NAME_PATTERN.matcher(dirName).matches(); }/** * 獲取日志代名稱 * @param lastIncludedIndex * @return */ private static String generateDirName(int lastIncludedIndex) {return "log-" + lastIncludedIndex; }/** * 比較日志代的大小 * @param o * @return */ @Override public int compareTo(LogGeneration o) {return Integer.compare(lastIncludedIndex, o.lastIncludedIndex); } }
/** * 普通日志地址 */@ToStringpublic class NormalLogDir extends AbstractLogDir {public NormalLogDir(File dir) {super(dir); } }
/** * 根目錄 */@Slf4jpublic class RootDir {//日志條目文件名 public static final String FILE_NAME_ENTRIES = "entries.bin"; //日志索引條目文件名 public static final String FILE_NAME_ENTRY_OFFSET_INDEX = "entries.idx"; //分代目錄名 private static final String DIR_NAME_GENERATING = "generating"; //根目錄 private final File baseDir; public RootDir(File baseDir) {if (!baseDir.exists()) {throw new IllegalArgumentException("dir " + baseDir + " not exists"); }this.baseDir = baseDir; }public LogDir getLogDirForGenerating() {return getOrCreateNormalLogDir(DIR_NAME_GENERATING); }/** * 獲取或創建普通日志地址 * @param name * @return */ private NormalLogDir getOrCreateNormalLogDir(String name) { NormalLogDir logDir = new NormalLogDir(new File(baseDir, name)); if (!logDir.exists()) { logDir.initialize(); }return logDir; }/** * 重命名日志代 * @param dir * @param lastIncludedIndex * @return */ public LogDir rename(LogDir dir, int lastIncludedIndex) { LogGeneration destDir = new LogGeneration(baseDir, lastIncludedIndex); if (destDir.exists()) {throw new IllegalStateException("failed to rename, dest dir " + destDir + " exists"); }log.info("rename dir {} to {}", dir, destDir); if (!dir.renameTo(destDir)) {throw new IllegalStateException("failed to rename " + dir + " to " + destDir); }return destDir; }/** * 創建第一個日志代 * @return */ public LogGeneration createFirstGeneration() { LogGeneration generation = new LogGeneration(baseDir, 0); generation.initialize(); return generation; }/** * 獲取最新的日志代 * @return */ public LogGeneration getLatestGeneration() { File[] files = baseDir.listFiles(); if (files == null) {return null; } LogGeneration latest = null; String fileName; LogGeneration generation; for (File file : files) {if (!file.isDirectory()) {continue; } fileName = file.getName(); if (DIR_NAME_GENERATING.equals(fileName) || !LogGeneration.isValidDirName(fileName)) {continue; } generation = new LogGeneration(file); if (latest == null || generation.compareTo(latest) > 0) { latest = generation; } }return latest; } }
/** * 基于文件的日志 */public class FileLog extends AbstractLog {private final RootDir rootDir; public FileLog(File baseDir) {rootDir = new RootDir(baseDir); LogGeneration latestGeneration = rootDir.getLatestGeneration(); if (latestGeneration != null) {entrySequence = new FileEntrySequence(latestGeneration, latestGeneration.getLastIncludedIndex()); }else { LogGeneration firstGeneration = rootDir.createFirstGeneration(); entrySequence = new FileEntrySequence(firstGeneration,1); } } }
“Raft分布式一致性算法怎么實現”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。