您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關kafka是如何保證消息的可靠性與一致性,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
在kafka中主要通過ISR機制來保證消息的可靠性。下面通過幾個問題來說明kafka如何來保證消息可靠性與一致性
在zk中會保存AR(Assigned Replicas)列表,其中包含了分區所有的副本,其中 AR = ISR+OSR
ISR(in sync replica):是kafka動態維護的一組同步副本,在ISR中有成員存活時,只有這個組的成員才可以成為leader,內部保存的為每次提交信息時必須同步的副本(acks = all時),每當leader掛掉時,在ISR集合中選舉出一個follower作為leader提供服務,當ISR中的副本被認為壞掉的時候,會被踢出ISR,當重新跟上leader的消息數據時,重新進入ISR。
OSR(out sync replica): 保存的副本不必保證必須同步完成才進行確認,OSR內的副本是否同步了leader的數據,不影響數據的提交,OSR內的follower盡力的去同步leader,可能數據版本會落后。
當寫入到kakfa時,生產者可以選擇是否等待0(只需寫入leader),1(只需同步一個副本) 或 -1(全部副本)的消息確認(這里的副本指的是ISR中的副本)。
需要注意的是“所有副本確認”并不能保證全部分配副本已收到消息。默認情況下,當acks=all時,只要當前所有在同步中的副本(ISR中的副本)收到消息,就會進行確認。所以Kafka的交付承諾可以這樣理解:對沒有提交成功的消息不做任何交付保證,而對于ISR中至少有一個存活的完全同步的副本的情況下的“成功提交”的消息保證不會丟失。
第一點:一個節點必須維持和zk的會話,通過zk的心跳檢測實現
第二點:如果節點是一個slave也就是復制節點,那么他必須復制leader節點不能太落后。這里的落后可以指兩種情況
1:數據復制落后,slave節點和leader節點的數據相差較大,這種情況有一個缺點,在生產者突然發送大量消息導致網絡堵塞后,大量的slav復制受阻,導致數據復制落后被大量的踢出ISR。
2:時間相差過大,指的是slave向leader請求復制的時間距離上次請求相隔時間過大。通過配置 replica.lag.time.max
就可以配置這個時間參數。這種方式解決了上述第一種方式導致的問題。
在kafka中有一個partition recovery機制用于恢復掛掉的partition。
每個Partition會在磁盤記錄一個RecoveryPoint(恢復點), 記錄已經flush到磁盤的最大offset。當broker fail 重啟時,會進行loadLogs。首先會讀取該Partition的RecoveryPoint,找到包含RecoveryPoint點上的segment及以后的segment, 這些segment就是可能沒有完全flush到磁盤segments。然后調用segment的recover,重新讀取各個segment的msg,并重建索引。
優點:
以segment為單位管理Partition數據,方便數據生命周期的管理,刪除過期數據簡單
在程序崩潰重啟時,加快recovery速度,只需恢復未完全flush到磁盤的segment即可
慢副本:在一定周期時間內follower不能追趕上leader。最常見的原因之一是I / O瓶頸導致follower追加復制消息速度慢于從leader拉取速度。
卡住副本:在一定周期時間內follower停止從leader拉取請求。follower replica卡住了是由于GC暫停或follower失效或死亡。
新啟動副本:當用戶給主題增加副本因子時,新的follower不在同步副本列表中,直到他們完全趕上了leader日志。
一個partition的follower落后于leader足夠多時,被認為不在同步副本列表或處于滯后狀態。
正如上述所說,現在kafka判定落后有兩種,副本滯后判斷依據是副本落后于leader最大消息數量(replica.lag.max.messages)或replicas響應partition leader的最長等待時間(replica.lag.time.max.ms)。前者是用來檢測緩慢的副本,而后者是用來檢測失效或死亡的副本
1. 兩種選擇:服務直接不可用一段時間等待ISR中副本恢復(祈禱恢復的副本有數據吧) 或者 直接選用第一個副本(這個副本不一定在ISR中)作為leader,這兩種方法也是在可用性和一致性之間的權衡。
2. 服務不可用方式這種適用在不允許消息丟失的情況下使用,適用于一致性大于可用性,可以有兩種做法
2.1 設置ISR最小同步副本數量,如果ISR的當前數量大于設置的最小同步值,那么該分區才會接受寫入,避免了ISR同步副本過少。如果小于最小值那么該分區將不接收寫入。這個最小值設置只有在acks = all的時候才會生效。
2.2 禁用unclean-leader選舉,當isr中的所有副本全部不可用時,不可以使用OSR 中的副本作為leader,直接使服務不可用,直到等到ISR 中副本恢復再進行選舉leader。
3. 直接選擇第一個副本作為leader的方式,適用于可用性大于一致性的場景,這也是kafka在isr中所有副本都死亡了的情況采用的默認處理方式,我們可以通過配置參數 unclean.leader.election.enable
來禁止這種行為,采用第一種方法。
broker的offset大致分為三種:base offset、high watemark(HW)、log end offset(LEO)
base offset:起始位移,replica中第一天消息的offset
HW:replica高水印值,副本中最新一條已提交消息的位移。leader 的HW值也就是實際已提交消息的范圍,每個replica都有HW值,但僅僅leader中的HW才能作為標示信息。什么意思呢,就是說當按照參數標準成功完成消息備份(成功同步給follower replica后)才會更新HW的值,代表消息理論上已經不會丟失,可以認為“已提交”。
LEO:日志末端位移,也就是replica中下一條待寫入消息的offset,注意哈,是下一條并且是待寫入的,并不是最后一條。這個LEO個人感覺也就是用來標示follower的同步進度的。所以HW代表已經完成同步的數據的位置,LEO代表已經寫入的最新位置,只有HW位置之前的才是可以被外界訪問的數據。現在就來看一下之前,broker從收到消息到返回響應這個黑盒子里發生了什么。
broker 收到producer的請求
leader 收到消息,并成功寫入,LEO 值+1
broker 將消息推給follower replica,follower 成功寫入 LEO +1 …
所有LEO 寫入后,leader HW +1
消息可被消費,并成功響應
上述過程從下面的圖便可以看出:
選舉leader常用的方法是多數選舉法,比如Redis等,但是kafka沒有選用多數選舉法,kafka采用的是quorum(法定人數)。
quorum是一種在分布式系統中常用的算法,主要用來通過數據冗余來保證數據一致性的投票算法。在kafka中該算法的實現就是ISR,在ISR中就是可以被選舉為leader的法定人數。
在leader宕機后,只能從ISR列表中選取新的leader,無論ISR中哪個副本被選為新的leader,它都知道HW之前的數據,可以保證在切換了leader后,消費者可以繼續看到HW之前已經提交的數據。
HW的截斷機制:選出了新的leader,而新的leader并不能保證已經完全同步了之前leader的所有數據,只能保證HW之前的數據是同步過的,此時所有的follower都要將數據截斷到HW的位置,再和新的leader同步數據,來保證數據一致。當宕機的leader恢復,發現新的leader中的數據和自己持有的數據不一致,此時宕機的leader會將自己的數據截斷到宕機之前的hw位置,然后同步新leader的數據。宕機的leader活過來也像follower一樣同步數據,來保證數據的一致性。
關于kafka是如何保證消息的可靠性與一致性就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。