您好,登錄后才能下訂單哦!
小編今天帶大家了解如何解析Apache Pulsar的消息存儲模型,文中知識點介紹的非常詳細。覺得有幫助的朋友可以跟著小編一起瀏覽文章的內容,希望能夠幫助更多想解決這個問題的朋友找到問題的答案,下面跟著小編一起深入學習“如何解析Apache Pulsar的消息存儲模型”的知識吧。
導讀
Apache Pulsar 是 Apache 軟件基金會頂級項目,是下一代云原生分布式消息流平臺,集消息、存儲、輕量化函數式計算為一體,采用計算與存儲分離架構設計,支持多租戶、持久化存儲、多機房跨區域數據復制,具有強一致性、高吞吐、低延時及高可擴展性等流數據存儲特性。
背景
在社區中,我們經常可以看到用戶有關 Backlog,storage size 和 retention 等策略的困惑,比較常見的一些問題,諸如:
…
Pulsar 的消息模型
首先,我們先來看一下 Pulsar 的消息模型
如上圖所示,Pulsar 提供了最基本的 pub-sub 的處理模型。
Producer
首先 Producer 端生產消息,將消息以 append 的形式追加到 Topic 中,這里具體分發到哪一個 Topic 中,根據消息是否設置了 msg key 會有所不同。
未設置 msg key,消息會以 round robin 的形式,分發到不同的 partitions 中
在消息分發的模型中,Pulsar 與 Kafka 類似。
Consumer
在 Consumer 之外,Pulsar 抽象了一層訂閱層,用于訂閱 Topic。通過訂閱層的抽象,Pulsar 可以靈活的支持 Queue 和 Streaming 這兩種類型的消息隊列。每一個 sub 都可以拿到這個 Topic 中所有數據的完整 copy,有點類似 Kafka 中的 consumer group。根據訂閱類型的不同,每一個訂閱下面可以有一個或者多個 Consumer 來接收消息。
目前,Pulsar 支持如下四種消息訂閱模型:
Key_Shared
存儲模型
消息在每個 Partition Topic 的分布式日志中只存儲一次
這就意味著,當 Producer 成功發送消息到 Topic 之后,這個消息只會在存儲層存儲一次,無論你有多少個 Subscription 訂閱到這個 Topic 中,實際上操作的都是同一份數據。基于這個基礎,我們可以看到 Apache Pulsar 從上到下的層級抽象概念如下圖所示:
首先第一層抽象是 Topic(Partition),用來存儲 Producer 追加的 messages 信息,Topic 之下對應的是一個個的 ledger,ledger 里面又劃分為一個個的分片,在一個個的分片中存儲了更小粒度的 ertries,entries 中存儲的是 【一條】或者 【一個 batch】 的消息。
Node: 在 Bookkeeper 中,對數據操作的最小單元是按照 segment 這個粒度來進行操作的。
為什么需要做分層抽象呢?
在這里最直白的解釋其實就是,為了確保數據被在每一個 bk 節點中打的足夠散,分布的足夠均勻。這也是分層分片架構設計的好處之一。
Ack 機制
在 Pulsar 中支持了兩種 Ack 的機制,分別是單條 Ack 和批量 Ack。單條 Ack(AckIndividual)是指 Consumer 可以根據消息的 messageID 來針對某一個特定的消息進行 Ack 操作;批量 Ack(AckCumulative)是指一次 Ack 多條消息。
訂閱機制
為了更好的理解 Strorage Size 以及 Backlog, 我們首先需要去了解 Pulsar 中的訂閱機制,如下圖所示:
當有消息積壓時,你可以通過 clear-backlog 來清除積壓的消息。清除 backlog 中積壓的消息是相對危險的操作,所以系統會提示你,是否確認要刪除 backlog 中的消息, clear-backlog 提供了 -f(--force) 的參數來屏蔽該提示。
Producer 還是按照追加的形式不斷往 Topic 中發送消息,Consumer 端會創建一個 Subscription 去訂閱這個 Topic,當成功訂閱時,會初始化一個 Cursor 指向具體的消息的位置,默認情況下是 Latest。
Cursor 是用來存儲一個訂閱中消費的狀態信息
上圖中,我們可以看到該訂閱下面的 Topic 已經成功 Receive 并且 Ack 掉了 m4 這條消息。那么包含 m4 在內的所有的消息狀態都會被標記為可刪除的狀態。在 Pulsar 中,使用 MarkDeletePosition 來標記這個位置。之后的所有消息,代表這個訂閱還沒有消費的消息。
隨著時間的推移,假設在 AckCumulative 的場景下,上述訂閱中的 Consumer 又消費了一些消息,目前 Cursor 的位置移動到了 m8 的位置,意味著 m8 之前的消息都可以進入刪除狀態。
假設是在 AckIndividual 的場景下,上述訂閱中的 Consumer 只消費了 m7 這條消息并且發送了 Ack 請求,m5, m6 這兩條消息仍然沒有被成功消費,那么目前處于可刪除狀態的消息是 m4 之前的消息和 m7 這條消息。也就是說,在這種場景下,由于使用單條 Ack 導致 Topic 中間出現了 Ack 的空洞。
Cursor = Offset + IndevidualDeletes, Ack 會觸發 Cursor 的移動,但是不會刪除任何消息
隨著時間的推移,在單條 Ack 的場景下,Ack 的空洞可能會自己消失,如下圖所示:
上面我們描述了,單個訂閱在單條 Ack 和批量 Ack 混合的場景下,Topic 中 cursor 的移動情況。假設目前有多個 Subscription 訂閱了這個 Topic,那么每一個 Subscription 都可以拿到這個 Topic 中數據的完整 Copy,也就是一個 Subscription 會在這個 Topic 中初始化一個新的 Cursor, 每一個 Cursor 之間消費的進度是沒有交集、互不影響的,所以就可能出現下圖中的情況:
在上圖中,針對該 Topic,有兩個訂閱:Subscription-1 和 Subscription-2。Subscription-1中的 Consumer 消費掉了 m4 之前的消息,Subscription-2 中的 Consumer 消費掉了 m8 之前的消息。而 m4-m8 之間的這四條消息,雖然被 Subscription-2 消費完成,但是 Subscription-1 還沒有消費完成這部分數據,所以這部分消息還不可以被刪除。目前處于可刪除狀態的消息是 m4 之前的消息,即這個 Topic 中消費進度最慢的那個 Subscription 所消費完成的消息。那么這就會有一個問題,假設我目前 Subscription-1 掉線了,它的 Cursor 的位置一直沒有變化,這就會導致這個 Topic 中的數據一直處于不可刪除的狀態。
針對上述場景,Pulsar 引入了 TTL 的概念,即允許用戶設置 TTL 的時間,當消息到達 TTL 指定的閾值 Cursor 仍然沒有移動的話,那么會觸發 TTL 的機制,將 Cursor 自動向后移到指定的位置。在這里需要注意的一點是,我們一直強調的是 TTL 會移動 Cursor 的位置,到目前為止,我們還沒有提到消息刪除的概念,不要將二者混淆了。TTL 會做的只是去移動 Cursor 的位置,不會有任何跟消息刪除的邏輯。
Backlog
為了更好的表述 Topic 中沒有被消費的數據,Pulsar 引入了 Backlog 的概念來描述這一部分消息。Backlog 可以分為如下兩種形式:
Subscription Backlog: 指針對單個訂閱級別的沒有消費的數據的集合
如下圖所示:Backlog A 屬于 Topic Backlog;Backlog A 屬于 Subscription-1 Backlog;Backlog B 屬于 Subscription-2 的 Backlog。
隨著時間的推移,Backlog 的會不斷的變化,如下圖所示:
在這里需要說明的一點是,這里的 backlogSize 記錄的是帶 batch 的消息,也就是一個 batch 會被當作一條消息來進行處理。因為在 broker 端去解析整個 batch 會給 broker 帶來一定的負擔,同時浪費大量的 CPU 資源,所以,具體 batch 邏輯的解析放到了 Consumer 端來進行處理。所以 Backlog 本質上記錄的是上面我們提到的 entries 的數量。
在 Pulsar 中,針對 Backlog 有兩個指標,具體如下:
backlogSize:記錄的是所有沒有被 Ack 的消息的大小
Retention 機制
在 Apache Pulsar 中,使用了 BookKeeper 來作為存儲層,允許用戶將消息持久化,為了確保消息不會無限期的持久化下去,Pulsar 引入了 Retention 的機制,允許用戶來配置消息持久化的策略。默認情況下,持久化的機制是關閉的,即消息被 Ack 之后,就會進入刪除的邏輯。
配置 Retention 策略時,有如下兩個參數可以指定:
time:指持久化時間的閾值。0 代表不配置 Retention 時間策略,-1 代表時間無限大
在引入 Retention 策略之后,整個 Topic 表示的視圖如下所示,m0-m5 代表已經被所有訂閱確認的消息并且已經超過了 Retention 策略的閾值,即這些消息正在 準備刪除。注意,我這里描述的是 【準備刪除】具體是否可以被刪除,現在還不能確定。
在最開始,我們從最上層的 Topic 一步步抽象到了一條具體的 msg,(在這里為了方便描述,我們忽略掉 batch 的概念,即一條 msg 等價于一個 entry)現在我們再反過來把所有的概念都疊加回去。因為在 bk 中,允許操作的最小的單元是一個 segment,所以在具體的 msg(entry)級別,是沒辦法針對一條消息進行刪除的,刪除操作需要針對一個 segment 來進行操作。如下圖所示:
假設 m0-m3 屬于 segment3;m4-m7 屬于segment2;m8-m11 屬于 segment1。按照上圖的描述,m0-m5 的消息都可以進行刪除操作, 但是 segment 2 中包含了 m6, m7 并沒有達到 Retention 的閾值,所以 segment 目前還不可以被刪除。
Storage Size
為了更方便的表述當前消息占用的存儲空間的大小,Pulsar 引入了 storageSize 來描述整個概念。如下圖所示:當 backlog B 與 storage Size 標識的消息相同時,backlogSize 等價于 storageSize。
當由于引入單條 Ack,Retention 策略以及 Bookkeeper 基于 segment 刪除的設定,那么很有可能造成 Storage Size 大于 backlog Size 的場景,如下圖所示:
消息在每個 Partition Topic 的分布式日志中只會存儲一次
Cursor 是用來存儲一個訂閱下 Consumer 的消費狀態的
Cursor 等價于 offset(kafka)+ individualDeletes
Ack 會去更新 Topic 中 Cursor 的位置
當某條消息被所有訂閱者都 Ack 之后,這條消息進入【可以被刪除】的狀態
所有沒有被確認的消息會一直保存在 Subscription backlog 中
TTL 可以通過設定一個時間閾值來自動更新 Cursor 的位置
Retention 策略是用來操作那些被 Ack 之后的消息應該怎么處理
消息的刪除是以 segment 為單位的,而不是 entry。
感謝大家的閱讀,以上就是“如何解析Apache Pulsar的消息存儲模型”的全部內容了,學會的朋友趕緊操作起來吧。相信億速云小編一定會給大家帶來更優質的文章。謝謝大家對億速云網站的支持!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。