您好,登錄后才能下訂單哦!
什么是Paxos共識算法
最初的服務往往都是通過單體架構對外提供的,即單Server-單Database模式。隨著業務的不斷擴展,用戶和請求數都在不斷上升,如何應對大量的請求就成了每個服務都需要解決的問題,這也就是我們常說的高并發。為了解決單臺服務器面對高并發的蒼白無力,可以通過增加服務器數量來解決,即多Server-單Database(Master-Slave)模式,此時的壓力就來到了數據庫一方,數據庫的IO效率決定了整個服務的效率,繼續增加Server數量將無法提升服務性能。這就衍生出了當前火熱的微服務架構。當用戶請求經由負載均衡分配到某一服務實例上后,如何保證該服務的其他實例最終能夠得到相同的數據變化呢?這就要用到Paxos分布式共識協議,Paxos解決的就是共識問題,也就是一段時間后,無論get哪一個服務實例,都能獲取到相同的數據。目前國內外的分布式產品很多都使用了Paxos協議,可以說Paxos幾乎就是共識協議的標準和代名詞。
Paxos有兩種協議,我們常常提到的其實是Basic Paxos,另一種叫Multi Paxos,如無特殊說明,本文中提到的Paxos協議均為Basic Paxos。
Paxos協議是由圖靈獎獲得者Leslie Lamport于1998年在其論文《The Part-Time Parliament》中首次提出的,講述了一個希臘小島Paxos是如何通過決議的。但由于該論文晦澀艱深,當時的計算機界大牛們也沒幾個人能理解。于是Lamport2001年再次發表了《Paxos Made Simple》,摘要部分是這么寫的:
The Paxos algorithm, when presented in plain English, is very simple.
翻譯過來就是:不會吧,不會吧,這么簡單的Paxos算法不會真的有人弄不懂吧?然而事實卻是很多人對Paxos都望而卻步,理解Paxos其實并不難,但是Paxos的難點在于工程化,如何利用Paxos協議寫出一個能過夠真正在生產環境中跑起來的服務才是Paxos最難的地方,關于Paxos的工程化可以參考微信后臺團隊撰寫的《微信自研生產級paxos類庫PhxPaxos實現原理介紹》
Paxos如何保證一致性的
Paxos協議一共有兩個階段:Prepare和Propose,兩種角色:Proposer和Acceptor,每一個服務實例既是Proposer,同時也是Acceptor,Proposer負責提議,Acceptor決定是否接收來自Proposer的提議,一旦提議被多數接受,那么我們就可以宣稱對該提議包含的值達成了一致,而且不會再改變。
階段一:Prepare 準備
階段二:Propose 提議
一旦提議被半數以上的服務接受,那么我們就可以宣稱整個服務集群在這一提議上達成了一致。
需要注意的是,在一個服務集群中以上兩個階段是很有可能同時發生的。 例如:實例A已完成Prepare階段,并發送了Propose請求。同時實例B開始了Prepare階段,并生成了更大的ProposalID發送Prepare請求,可能導致實例A的Propose請求被拒絕。 每個服務實例也是同時在扮演Proposer和Acceptor角色,向其他服務發送請求的同時,可能也在處理別的服務發來的請求。
使用GO語言實現Paxos協議
服務注冊與發現
由于每個服務實例都是在執行相同的代碼,那我們要如何知曉其他服務實例的入口呢(IP和端口號)?方法之一就是寫死在代碼中,或者提供一份配置文件。服務啟動后可以讀取該配置文件。但是這種方法不利于維護,一旦我們需要移除或添加服務則需要在每個機器上重新休息配置文件。
除此之外,我們可以通過一個第三方服務:服務的注冊與發現來注冊并獲知當前集群的總服務實例數,即將本地的配置文件改為線上的配置服務。
服務注冊:Register函數,服務實例啟動后通過調用這個RPC方法將自己注冊在服務管理中
func (s *Service) Register(args *RegisterArgs, reply *RegisterReply) error { s.mu.Lock() defer s.mu.Unlock() server := args.ServerInfo for _, server := range s.Servers { if server.IPAddress == args.ServerInfo.IPAddress && server.Port == args.ServerInfo.Port { reply.Succeed = false return nil } } reply.ServerID = len(s.Servers) reply.Succeed = true s.Servers = append(s.Servers, server) fmt.Printf("Current registerd servers:\n%v\n", s.Servers) return nil }
服務發現:GetServers函數,服務通過調用該RPC方法獲取所有服務實例的信息(IP和端口號)
func (s *Service) GetServers(args *GetServersArgs, reply *GetServersReply) error { // return all servers reply.ServerInfos = s.Servers return nil }
Prepare階段
Proposer,向所有的服務發送Prepare請求,并等待直到半數以上的服務返回結果,這里也可以等待所有服務返回后再處理,但是Paxos協議可以容忍小于半數的服務宕機,因此我們只等待大于N/2個返回即可。當返回的結果有任何一個請求被拒絕,那Proposer即認為這次的請求被拒絕,返回重新生成ProposalID并發送新一輪的Prepare請求。
func (s *Server) CallPrepare(allServers []ServerInfo, proposal Proposal) PrepareReply { returnedReplies := make([]PrepareReply, 0) for _, otherS := range allServers { // use a go routine to call every server go func(otherS ServerInfo) { delay := rand.Intn(10) time.Sleep(time.Second * time.Duration(delay)) args := PrepareArgs{s.Info, proposal.ID} reply := PrepareReply{} fmt.Printf("【Prepare】Call Prepare on %v:%v with proposal id %v\n", otherS.IPAddress, otherS.Port, args.ProposalID) if Call(otherS, "Server.Prepare", &args, &reply) { if reply.HasAcceptedProposal { fmt.Printf("【Prepare】%v:%v returns accepted proposal: %v\n", otherS.IPAddress, otherS.Port, reply.AcceptedProposal) } else { fmt.Printf("【Prepare】%v:%v returns empty proposal\n", otherS.IPAddress, otherS.Port) } s.mu.Lock() returnedReplies = append(returnedReplies, reply) s.mu.Unlock() } }(otherS) } for { // wait for responses from majority if len(returnedReplies) > (len(allServers))/2.0 { checkReplies := returnedReplies // three possible response // 1. deny the prepare, and return an empty/accepted proposal // as the proposal id is not higher than minProposalID on server (proposal id <= server.minProposalID) // 2. accept the prepare, and return an empty proposal as the server has not accept any proposal yet // 3. accept the prepare, and return an accepted proposal // check responses from majority // find the response with max proposal id acceptedProposal := NewProposal() for _, r := range checkReplies { // if any response refused the prepare, this server should resend prepare if !r.PrepareAccepted { return r } if r.HasAcceptedProposal && r.AcceptedProposal.ID > acceptedProposal.ID { acceptedProposal = r.AcceptedProposal } } // if some other server has accepted proposal, return that proposal with max proposal id // if no other server has accepted proposal, return an empty proposal return PrepareReply{HasAcceptedProposal: !acceptedProposal.IsEmpty(), AcceptedProposal: acceptedProposal, PrepareAccepted: true} } //fmt.Printf("Waiting for response from majority...\n") time.Sleep(time.Second * 1) } }
Acceptor,通過比較ProposalID和minProposal,如果ProposalID小于等于minProposal,則拒絕該Prepare請求,否則更新minProposal為ProposalID。最后返回已接受的提議
func (s *Server) Prepare(args *PrepareArgs, reply *PrepareReply) error { s.mu.Lock() defer s.mu.Unlock() // 2 promises and 1 response // Promise 1 // do not accept prepare request which ProposalID <= minProposalID // Promise 2 // do not accept propose request which ProposalID < minProposalID // Response 1 // respond with accepted proposal if any if reply.PrepareAccepted = args.ProposalID > s.minProposalID; reply.PrepareAccepted { // ready to accept the proposal with Id s.minProposalID s.minProposalID = args.ProposalID } reply.HasAcceptedProposal = s.readAcceptedProposal() reply.AcceptedProposal = s.Proposal return nil }
Propose階段
Proposer,同樣首先向所有的服務發送Propose請求,并等待知道半數以上的服務返回結果。如果返回的結果有任何一個請求被拒絕,則Proposer認為這次的請求被拒絕,返回重新生成ProposalID并發送新一輪的Prepare請求
func (s *Server) CallPropose(allServers []ServerInfo, proposal Proposal) ProposeReply { returnedReplies := make([]ProposeReply, 0) for _, otherS := range allServers { go func(otherS ServerInfo) { delay := rand.Intn(5000) time.Sleep(time.Millisecond * time.Duration(delay)) args := ProposeArgs{otherS, proposal} reply := ProposeReply{} fmt.Printf("【Propose】Call Propose on %v:%v with proposal: %v\n", otherS.IPAddress, otherS.Port, args.Proposal) if Call(otherS, "Server.Propose", &args, &reply) { fmt.Printf("【Propose】%v:%v returns: %v\n", otherS.IPAddress, otherS.Port, reply) s.mu.Lock() returnedReplies = append(returnedReplies, reply) s.mu.Unlock() } }(otherS) } for { // wait for responses from majority if len(returnedReplies) > (len(allServers))/2.0 { checkReplies := returnedReplies for _, r := range checkReplies { if !r.ProposeAccepted { return r } } return checkReplies[0] } time.Sleep(time.Second * 1) } }
Acceptor,通過比較ProposalID和minProposal,如果ProposalID小于minProposal,則拒絕該Propose請求,否則更新minProposal為ProposalID,并將提議持久化到本地磁盤中。
func (s *Server) Propose(args *ProposeArgs, reply *ProposeReply) error { if s.minProposalID <= args.Proposal.ID { s.mu.Lock() s.minProposalID = args.Proposal.ID s.Proposal = args.Proposal s.SaveAcceptedProposal() s.mu.Unlock() reply.ProposeAccepted = true } reply.ProposalID = s.minProposalID return nil }
運行
運行結果:
這里我一共開啟了3個服務實例,并在每次請求之前加入了隨機的延遲,模擬網絡通信中的延遲,因此每個服務的每個請求并不是同時發出的
動圖一張:
靜態結果一張:
可以看到3個服務盡管一開始會嘗試以他們自己的端口號(5001,5002,5003)作為提議值,在Prepare/Propose失敗后,都會重新生成更大的ProposalID并開啟新一輪的提議過程(Prepare,Propose),且最后都以5003達成一致。
小結
至此,我們就用GO實現了Paxos協議的核心邏輯。但顯而易見的是,這段代碼仍然存在很多問題,完全無法滿足生產環境的需求
到此這篇關于使用GO實現Paxos共識算法的文章就介紹到這了,更多相關GO實現Paxos共識算法內容請搜索億速云以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持億速云!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。