您好,登錄后才能下訂單哦!
引言:來到了新公司,需要對kafka組件有很深的研究,本人之前對老版的kafka有過一定的研究,但是談不上深入,新公司力推kafka,比較kafka作為消息系統在目前的市場上的占有率還是很高的,可以看本人之前kafka的博客中有關kafka的優點和為什么要用kafka。
在眾多優點中,我本人認為最重要的2個優點如下:
1、削峰
數據庫的處理能力是有限的,在峰值期,過多的請求落到后臺,一旦超過系統的處理能力,可能會使系統掛掉。
如上圖所示,系統的處理能力是 2k/s,MQ 處理能力是 8k/s,峰值請求 5k/s,MQ 的處理能力遠遠大于數據庫,在高峰期,請求可以先積壓在 MQ 中,系統可以根據自身的處理能力以 2k/s 的速度消費這些請求。
這樣等高峰期一過,請求可能只有 100/s,系統可以很快的消費掉積壓在 MQ 中的請求。
注意,上面的請求指的是寫請求,查詢請求一般通過緩存解決。
2、解耦
如下場景,S 系統與 A、B、C 系統緊密耦合。由于需求變動,A 系統修改了相關代碼,S 系統也需要調整 A 相關的代碼。
過幾天,C 系統需要刪除,S 緊跟著刪除 C 相關代碼;又過了幾天,需要新增 D 系統,S 系統又要添加與 D 相關的代碼;再過幾天,程序猿瘋了...
這樣各個系統緊密耦合,不利于維護,也不利于擴展。現在引入 MQ,A 系統變動,A 自己修改自己的代碼即可;C 系統刪除,直接取消訂閱;D 系統新增,訂閱相關消息即可。
這樣通過引入消息中間件,使各個系統都與 MQ 交互,從而避免它們之間的錯綜復雜的調用關系。
kafka架構原理:
最經典的圖也就是官方的圖了
找了一些其他博主的圖:這里自己就懶的畫了
詳細復雜的kafka架構
通俗點講:就是producer ----> kafka cluster(brokers) -----> consumer
生產者生產消息 經過 kafka隊列 被消費者消費
相關的組件概念見:
topic and logs
廢話不多說,先見圖
文字解釋如下:
Message 是按照 Topic 來組織的,每個 Topic 可以分成多個 Partition(對server.properties/num.partitions)。 本人習慣性配置文件為num.partitions=broker個數,人為的分配到各個節點上。
Partition 中的每條記錄(Message)包含三個屬性:Offset,messageSize 和 Data。
其中 Offset 表示消息偏移量;messageSize 表示消息的大小;Data 表示消息的具體內容。
Partition 是以文件的形式存儲在文件系統中,位置由 server.properties/log.dirs 指定,其命名規則為 <topic_name>-<partition_id>。
生產配置文件為:log.dirs=/data/kafka/kafka-logs
[hadoop@kafka03-55-13 kafka-logs]$ pwd
/data/kafka/kafka-logs
[hadoop@kafka03-55-13 kafka-logs]$ ls |grep mjh
topic-by-mjh-0
topic-by-mjh-1
topic-by-mjh-10
topic-by-mjh-11
topic-by-mjh-12
...
...
...
Partition 可能位于不同的 Broker 上,Partition 是分段的,每個段是一個 Segment 文件。
Partition 目錄下包括了數據文件和索引文件
[hadoop@kafka03-55-13 kafka-logs]$ cd topic-by-mjh-0
[hadoop@kafka03-55-13 topic-by-mjh-0]$ ll
total 4
-rw-rw-r-- 1 hadoop hadoop 10485760 Aug 24 20:13 00000000000000000334.index
-rw-rw-r-- 1 hadoop hadoop 0 Aug 13 17:42 00000000000000000334.log
-rw-rw-r-- 1 hadoop hadoop 10485756 Aug 24 20:13 00000000000000000334.timeindex
-rw-rw-r-- 1 hadoop hadoop 4 Aug 16 14:16 leader-epoch-checkpoint
Index 采用稀疏存儲的方式,它不會為每一條 Message 都建立索引,而是每隔一定的字節數建立一條索引,避免索引文件占用過多的空間。
缺點是沒有建立索引的 Offset 不能一次定位到 Message 的位置,需要做一次順序掃描,但是掃描的范圍很小。
索引包含兩個部分(均為 4 個字節的數字),分別為相對 Offset 和 Position。
相對 Offset 表示 Segment 文件中的 Offset,Position 表示 Message 在數據文件中的位置。
Segment下的log文件就是存儲消息的地方
每個消息都會包含消息體、offset、timestamp、key、size、壓縮編碼器、校驗和、消息版本號等。
在磁盤上的數據格式和producer發送到broker的數據格式一模一樣,也和consumer收到的數據格式一模一樣。由于磁盤格式與consumer以及producer的數據格式一模一樣,這樣就使得Kafka可以通過零拷貝(zero-copy)技術來提高傳輸效率。 // 關于零拷貝技術,后期會專門寫一遍博客來解釋
小結:
1、Partition 是一個順序的追加日志,屬于順序寫磁盤(順序寫磁盤效率比隨機寫內存要高,保障 Kafka 吞吐率)。
2、Kafka 的 Message 存儲采用了分區(Partition),磁盤順序讀寫,分段(LogSegment)和稀疏索引這幾個手段來達到高效性。
3、在 Kafka 的文件存儲中,同一個 Topic 下有多個不同的 Partition,每個 Partition 都為一個目錄,而每一個目錄又被平均分配成多個大小相等的 Segment File 中(Segment 大小我們在生產上設置成1G或者 500MB ),Segment File 又由 index file 和 data file 組成,他們總是成對出現,后綴 ".index" 和 ".log" 分表表示 Segment 索引文件和數據文件。
Partition and Replica
一個 Topic 物理上分為多個 Partition,位于不同的 Broker 上。如果沒有 Replica,一旦 Broker 宕機,其上所有的 Patition 將不可用。
每個 Partition 可以有多個Replica(對應server.properties/default.replication.factor),分配到不同的 Broker 上。本人默認習慣為 default.replication.factor=2 也就是默認2個副本,比較合理
其中有一個 Leader 負責讀寫,處理來自 Producer 和 Consumer 的請求;其他作為 Follower 從 Leader Pull 消息,保持與 Leader 的同步。
如何分配 Partition 和 Replica 到 Broker 上?步驟如下:
1、將所有 Broker(假設共 n 個 Broker)和待分配的 Partition 排序。
2、將第 i 個 Partition 分配到第(i mod n)個 Broker 上。
3、將第 i 個 Partition 的第 j 個 Replica 分配到第((i + j) mode n)個 Broker 上。
根據上面的分配規則,若 Replica 的數量大于 Broker 的數量,必定會有兩個相同的 Replica 分配到同一個 Broker 上,產生冗余。因此 Replica 的數量應該小于或等于 Broker 的數量。
//這里kafka硬性規定了創建的replica不能超過broker的數量,必須等于小于broker的數量
這里有2個算法函數解釋一下
1、mod:求余函數;
2、mode:返回在某數組或數據區域中出現頻率最多的數值,mode是一個位置測量函數。
我這里只有3個broker 創建4個replica就出現報錯 具體見下
[root@kafka02-55-12 ~]# kafka-topics.sh --zookeeper 10.211.55.11:2181,10.211.55.12:2181,10.211.55.13:2181/kafkagroup --replication-factor 4 --partitions 9 --create --topic topic-zhuhair
**Error while executing topic command : Replication factor: 4 larger than available brokers: 3.**
[2019-08-24 20:41:40,611] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 4 larger than available brokers: 3.
pratition的leader是如何選舉的---broker failover故障轉移
//通俗點講也就是當broker發生宕機了,如何保證高可用的
文字描述如下:
Kafka 在 Zookeeper 中(/brokers/topics/[topic]/partitions/[partition]/state)動態維護了一個 ISR(in-sync replicas)。
ISR 里面的所有 Replica 都"跟上"了 Leader,Controller 將會從 ISR 里選一個做 Leader。
具體流程文字描述如下:
1、Controller 在 Zookeeper 的 /brokers/ids/[brokerId] 節點注冊 Watcher,
2、當 Broker 宕機時 Zookeeper 會 Fire Watch。
3、Controller 從 /brokers/ids 節點讀取可用 Broker。
4、Controller 決定 set_p,該集合包含宕機 Broker 上的所有 Partition。
5、對 set_p 中的每一個 Partition,從/brokers/topics/[topic]/partitions/[partition]/state 節點讀取 ISR,決定新 Leader,將新 Leader、ISR、controller_epoch 和 leader_epoch 等信息寫入 State 節點。
6、zk通過 RPC 向相關 Broker 發送 leaderAndISRRequest 命令。
極端情況下需要考慮的是:
當 ISR 為空時,會選一個 Replica(不一定是 ISR 成員)作為 Leader;
當所有的 Replica 都歇菜了,會等任意一個 Replica 復活,將其作為 Leader。
//
這就需要在可用性和一致性當中作出一個簡單的折衷。如果一定要等待ISR中的Replica“活”過來,那不可用的時間就可能會相對較長。而且如果ISR中的所有Replica都無法“活”過來了,或者數據都丟失了,這個Partition將永遠不可用。選擇第一個“活”過來的Replica作為Leader,而這個Replica不是ISR中的Replica,那即使它并不保證已經包含了所有已commit的消息,它也會成為Leader而作為consumer的數據源(前文有說明,所有讀寫都由Leader完成)。Kafka0.8.*使用了第二種方式。根據Kafka的文檔,在以后的版本中,Kafka支持用戶通過配置選擇這兩種方式中的一種,從而根據不同的使用場景選擇高可用性還是強一致性。
ISR(同步列表)中的 Follower 都"跟上"了Leader,"跟上"并不表示完全一致,它由 server.properties/replica.lag.time.max.ms 配置。表示 Leader 等待 Follower 同步消息的最大時間,如果超時,Leader 將 Follower 移除 ISR。配置項 replica.lag.max.messages 已經移除。
Replica 副本如何同步 消息傳遞同步策略
1、Producer在發布消息到某個Partition時,先通過ZooKeeper找到該Partition的Leader,
2、無論該Topic的Replication Factor為多少,Producer只將該消息發送到該Partition的Leader。
3、Leader會將該消息寫入其本地Log。
4、每個Follower都從Leader pull數據。這種方式上,Follower存儲的數據順序與Leader保持一致。
5、Follower在收到該消息并寫入其Log后,向Leader發送ACK。
6、一旦Leader收到了ISR中的所有Replica的ACK,該消息就被認為已經commit了,Leader將增加HW并且向Producer發送ACK。
為了提高性能,每個Follower在接收到數據后就立馬向Leader發送ACK,而非等到數據寫入Log中。
因此,對于已經commit的消息,Kafka只能保證它被存于多個Replica的內存中,而不能保證它們被持久化到磁盤中,也就不能完全保證異常發生后該條消息一定能被Consumer消費。
Consumer讀消息也是從Leader讀取,只有被commit過的消息才會暴露給Consumer。
具體的可靠性,是由生產者(根據配置項 producer.properties/acks)來決定的。
有資料說 最新的文檔 2.2.x request.required.acks 已經不存在了,這一點有待我去確認
通俗一點講對ack的三個參數的含義為
Kafka?producer有三種ack機制 ?初始化producer時在config中進行配置
0?:意味著producer不等待broker同步完成的確認,繼續發送下一條(批)信息
提供了最低的延遲。但是最弱的持久性,當服務器發生故障時,就很可能發生數據丟失。例如leader已經死亡,producer不知情,還會繼續發送消息broker接收不到數據就會數據丟失
1:意味著producer要等待leader成功收到數據并得到確認,才發送下一條message。此選項提供了較好的持久性較低的延遲性。Partition的Leader死亡,follwer尚未復制,數據就會丟失
-1:意味著producer得到follwer確認,才發送下一條數據
持久性最好,延時性最差。
在這里強調的一點是,在kafak的partition中的fllower和leader中的復制不是完全的同步復制,也不是單純的異步復制
同步復制:所有的fllower復制完才提交 這樣的缺點是極大的影響了吞吐率
異步復制:Follower異步的從Leader復制數據,數據只要被Leader寫入log就被認為已經commit,這種情況下如果Follower都復制完都落后于Leader,而如果Leader突然宕機,則會丟失數據。
所有 kafak采用的是ISR 的方式則很好的均衡了確保數據不丟失以及吞吐率。Follower可以批量的從Leader復制數據,這樣極大的提高復制性能(批量寫磁盤),極大減少了Follower與Leader的差距。
producer如何發送消息
Producer 首先將消息封裝進一個 ProducerRecord 實例中。
寫消息的路由模式
1、 指定了 patition,則直接使用;
2、 未指定 patition 但指定 key,通過對 key 的 value 進行hash 選出一個 patition
這個 Hash(即分區機制)由 producer.properties/partitioner.class 指定的類實現,這個路由類需要實現 Partitioner 接口。
3、 patition 和 key 都未指定,使用輪詢選出一個 patition。
備注:消息并不會立即發送,而是先進行序列化后,發送給 Partitioner,
也就是上面提到的 Hash 函數,由 Partitioner 確定目標分區后,發送到一塊內存緩沖區中(發送隊列)。Producer 的另一個工作線程(即 Sender 線程),
則負責實時地從該緩沖區中提取出準備好的消息封裝到一個批次內,統一發送到對應的 Broker 中。
具體寫數據流程如下:
具體流程如下:
1、 producer 先從 zookeeper 的 "/brokers/.../state" 節點找到該 partition 的 leader
2、 producer 將消息發送給該 leader
3、 leader 將消息寫入本地 log
4、 followers 從 leader pull 消息,寫入本地 log 后 leader 發送 ACK
5、 leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 發送 ACK
參考網上的資料,有的文字版的解釋是這樣的 個人覺得下面的這樣的文字解釋的更加通俗易懂
流程如下:
1、首先,我們需要創建一個ProducerRecord,這個對象需要包含消息的主題(topic)和值(value),可以選擇性指定一個鍵值(key)或者分區(partition)。
2、發送消息時,生產者會對鍵值和值序列化成字節數組,然后發送到分配器(partitioner)。
3、如果我們指定了分區,那么分配器返回該分區即可;否則,分配器將會基于鍵值來選擇一個分區并返回。
4、選擇完分區后,生產者知道了消息所屬的主題和分區,它將這條記錄添加到相同主題和分區的批量消息中,另一個線程負責發送這些批量消息到對應的Kafka broker。
5、當broker接收到消息后,如果成功寫入則返回一個包含消息的主題、分區及位移的RecordMetadata對象,否則返回異常。
6、生產者接收到結果后,對于異常可能會進行重試。
參考鏈接:
架構成長之路:Kafka設計原理看了又忘,忘了又看?一文讓你掌握: https://www.toutiao.com/i6714606866355192328/
Kafka的ACK機制有三種,是哪三種 : https://blog.csdn.net/Sun1181342029/article/details/87806207
kafka原理系列ACK機制(數據可靠性和持久性保證) https://blog.csdn.net/bluehawksk/article/details/96120803
kafka入門介紹 https://www.orchome.com/5
Kafka學習之路 (三)Kafka的高可用 https://www.cnblogs.com/qingyunzong/p/9004703.html
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。