91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何進行Pulsar Connector機制的剖析

發布時間:2021-12-09 11:13:59 來源:億速云 閱讀:144 作者:柒染 欄目:大數據

本篇文章給大家分享的是有關如何進行Pulsar Connector機制的剖析,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

Apache Pulsar 是 Yahoo 開源的下一代分布式消息系統,在2018年9月從 Apache  軟件基金會畢業成為頂級項目。Pulsar 特有的分層分片的架構,在保證大數據消息流系統的性能和吞吐量的同時,也提供了高可用性、高可擴展性和易維護性。

分片架構將消息流數據的存儲粒度從分區拉低到了分片,以及相應的層級化存儲,使 Pulsar 成為 unbounded streaming data storage 的不二之選。這使得 Pulsar 可以更完美地匹配和適配 Flink 的批流一體的計算模式。

1. Pulsar 簡介


1.1 特點
 
隨著開源后,各行業企業可以根據不同需求,為 Pulsar 賦予更豐富的功能,所以目前它也不再只是中間件的功能,而是慢慢發展成為一個 Event Streaming Platform(事件流處理平臺),具有 Connect(連接)、Store(存儲)和 Process(處理)功能。
 
■ Connect
 
在連接方面,Pulsar 具有自己單獨的 Pub/Sub 模型,可以同時滿足 Kafka 和 RocketMQ 的應用場景。同時 Pulsar IO 的功能,其實就是 Connector,可以非常方便地將數據源導入到 Pulsar 或從 Pulsar 導出等。

另外,在Pulsar 2.5.0 中,我們新增了一個重要機制:Protocol handler。這個機制支持在 broker 自定義添加額外的協議支持,可以保證在不更改原數據庫的基礎上,也能享用 Pulsar 的一些高級功能。所以 Pulsar 也延展出比如:KoP、ActiveMQ、Rest 等。
 
■   Store
 
Pulsar 提供了可以讓用戶導入的途徑后就必然需要考慮在 Pulsar 上進行存儲。Pulsar 采用的是分布式存儲,最開始是在 Apache BookKeeper 上進行。后來添加了更多的層級存儲,通過 JCloud 和 HDFS 等多種模式進行存儲的選擇。當然,層級存儲也受限于存儲容量。
 
■ Process
 
Pulsar 提供了一個無限存儲的抽象,方便第三方平臺進行更好的批流融合的計算。即 Pulsar 的數據處理能力。Pulsar 的數據處理能力實際上是按照你數據計算的難易程度、實效性等進行了切分。
 
目前 Pulsar 包含以下幾類集成融合處理方式:
 
  • Pulsar Function:Pulsar 自帶的函數處理,通過不同系統端的函數編寫,即可完成計算并運用到 Pulsar 中。

  • Pulsar-Flink connector 和 Pulsar-Spark connector:作為批流融合計算引擎,Flink 和 Spark 都提供流計算的機制。如果你已經在使用他們了,那恭喜你。因為 Pulsar 也全部支持這兩種計算,無需你再進行多余的操作了。

  • Presto (Pulsar SQL):有的朋友會在應用場景中更多的使用 SQL,進行交互式查詢等。Pulsar 與 Presto 有很好的集成處理,可以用 SQL 在 Pulsar 進行處理。

 

如何進行Pulsar Connector機制的剖析

 
1.2 訂閱模型
 
從使用來看,Pulsar 的用法與傳統的消息系統類似,是基于發布-訂閱模型的。使用者被分為生產者(Producer)和消費者(Consumer)兩個角色,對于更具體的需求,還可以以 Reader 的角色來消費數據。用戶可以以生產者的身份將數據發布在特定的主題之下,也可以以消費者的身份訂閱(Subscription)特定的主題,從而獲取數據。在這個過程中,Pulsar 實現了數據的持久化與數據分發,Pulsar 還提供了Schema 功能,能夠對數據進行驗證。

如下圖所示,Pulsar 里面有幾種訂閱模式:

  1. 獨占訂閱(Exclusive) 

  2. 故障轉移訂閱(Failover) 

  3. 共享訂閱(Shared) 

  4. Key保序共享訂閱(Key_shared)

   

如何進行Pulsar Connector機制的剖析

如何進行Pulsar Connector機制的剖析

    
Pulsar 里的主題分成兩類,一類是分區主題(Partitioned Topic),一類是非分區主題(Not Partitioned Topic)。

分區主題實際上是由多個非分區主題組成的。主題和分區都是邏輯上的概念,我們可以把主題看作是一個大的無限的事件流,被分區切分成幾條小的無限事件流。

而對應的,在物理上,Pulsar 采用分層結構。每一條事件流存儲在一個 Segment 中,每個Segment 包括了許多個Entry,Entry 里面存放的才是用戶發送過來的一條或多條消息實體。
 
Message 是 Entry 中存放的數據,也是 Pulsar 中消費者消費一次獲得的數據。Message 中除了包括字節流數據,還有 Key 屬性,兩種時間屬性和 MessageId 以及其他信息。MessageId 是消息的唯一標識,包括了ledger-id、entry-id、 batch-index、 partition-index 的信息,如下圖,分別記錄了消息在Pulsar 中的Segment、Entry、Message、Partition 存儲位置, 因此也可以據此從物理上找到Message的信息內容。

如何進行Pulsar Connector機制的剖析

   

2. Pulsar 架構

   
 
一個 Pulsar 集群由 Brokers 集群和 Bookies 集群組成。Brokers 之間是相互獨立的,負責向生產者和消費者提供關于某個主題的服務。Bookies 之間也是相互獨立的,負責存儲 Segment 的數據,是消息持久化的地方。為了管理配置信息和代理信息,Pulsar 還借助了 Zookeeper 這個組件,Brokers 和 Bookies 都會在 zookeeper 上注冊,下面從消息的具體讀寫路徑(見下圖)來介紹 Pulsar 的結構。
 

如何進行Pulsar Connector機制的剖析

如何進行Pulsar Connector機制的剖析

 
在寫路徑中,生產者創建并發送一條消息到主題中,該消息可能會以某種算法(比如Round robin)被路由到一個具體的分區上,Pulsar 會選擇一個Broker 為這個分區服務,該分區的消息實際會被發送到這個 Broker上。當Broker 拿到一條消息,它會以 Write Quorum (Qw)的方式將消息寫入到 Bookies 中。當成功寫入到 Bookies 的數量達到設定時,Broker 會收到完成通知,并且 Broker 也會返回通知生產者寫入成功。

在讀路徑中,消費者首先要發起一次訂閱,之后才能與主題對應的 Broker 進行連接,Broker 從 Bookies 請求數據并發送給消費者。當數據接受成功,消費者可以選擇向 Broker 發送確認信息,使得 Broker 能夠更新消費者的訪問位置信息。前面也提到,對于剛寫入的數據,Pulsar 會存儲在緩存中,那么就可以直接從 Brokers 的緩存中讀取了,縮短了讀取路徑。
 
Pulsar 將存儲與服務相分離,實現了很好的可拓展性,在平臺層面,能夠通過調整Bookies 的數量來滿足不同的需求。在用戶層面,只需要跟 Brokers 通信,而Brokers 本身被設計成沒有狀態的,當某個 Broker 因故障無法使用時,可以動態的生成一個新的 Broker 來替換。
 

3. Pulsar Connector 內部機制


 
首先,Pulsar Connector 在使用上是比較簡單的,由一個 Source 和一個 Sink 組成,source 的功能就是將一個或多個主題下的消息傳入到 Flink 的Source中,Sink的功能就是從 Flink 的 Sink 中獲取數據并放入到某些主題下,在使用方式上,如下所示,與 Kafa Connector 很相似,使用時需要設置一些參數。
 

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); Properties props = new Properties(); props.setProperty("topic", "test-source-topic") FlinkPulsarSource<String> source = new FlinkPulsarSource<>(              serviceUrl,              adminUrl,              new SimpleStringSchema(),              props); DataStream<String> stream = see.addSource(source);
FlinkPulsarSink<Person> sink =      new FlinkPulsarSink(              serviceUrl,              adminUrl,              Optional.of(topic), // mandatory target topic              props,              TopicKeyExtractor.NULL, // replace this to extract key or topic for each record              Person.class); stream.addSink(sink);

 
現在介紹 Kulsar Connector 一些特性的實現機制。
 
3.1 精確一次
 
因為 Pulsar 中的 MessageId 是全局唯一且有序的,與消息在 Pulsar 中的物理存儲也對應,因此為了實現 Exactly Once,Pulsar Connector 借助 Flink 的 Checkpoint 機制,將 MessageId 存儲到 Checkpoint。

對于連接器的 Source 任務,在每次觸發 Checkpoint 的時候,會將各個分區當前處理的 MessageId 保存到狀態存儲里面,這樣在任務重啟的時候,每個分區都可以通過 Pulsar 提供的 Reader seek 接口找到 MessageId 對應的消息位置,然后從這個位置之后讀取消息數據。

通過 Checkpoint 機制,還能夠向存儲數據的節點發送數據使用完畢的通知,從而能準確刪除過期的數據,做到存儲的合理利用。
 
3.2 動態發現
 
考慮到Flink中的任務都是長時間運行的,在運行任務的過程中,用戶也許會需要動態的增加部分主題或者分區,Pulsar Connector 提供了自動發現的解決方案。

Pulsar 的策略是另外啟動一個線程,定期的去查詢設定的主題是否改變,分區有沒有增刪,如果發生了新增分區的情況,那么就額外創建新的Reader 任務去完成主題下的數據的反序列化,當然如果是刪除分區,也會相應的減少讀取任務。
 
3.3 結構化數據
 
在讀取主題下的數據的過程中,我們可以將數據轉化成一條條結構化的記錄來處理。Pulsar 支持 Avro schema and avro/json/protobuf Message 格式類型的數據轉化成 Flink 中的 Row格式數據。對于用戶關心的元數據,Pulsar 也在 Row 中提供了對應的元數據域。

另外,Pulsar 基于 Flink 1.9 版本進行了新的開發,支持 Table API 和 Catalog,Pulsar 做了一個簡單的映射,如下圖所示,將 Pulsar 的租戶/命名空間對應到 Catalog 的數據庫,將主題對應為庫中的具體表。
 

如何進行Pulsar Connector機制的剖析

 


 之前提到 Pulsar 將數據存儲在 Bookeeper 中,還可以導入到 Hdfs 或者 S3 這樣的文件系統中,但對于分析型應用來說,我們往往只關心所有數據中每條數據的部分屬性,因此采用列存儲的方式對 IO 和網絡都會有性能提升,Pulsar 也在嘗試在Segment 中以列的方式存儲。
在原來的讀路徑中,不管是 Reader 還是Comsumer,都需要通過 Brokers 來傳遞數據。如果采用新的 Bypass Broker方式,通過查詢元數據,就能直接找到每條 Message 存儲的 Bookie 位置,這樣可以直接從 Bookie 讀取數據,縮短讀取路徑,從而提升效率。
Pulsar 相對 Kafka 來說,由于數據在物理上是存放在一個個 Segment 中的,那么在讀取的過程中,通過提高并行化的方式,建立多線程同時讀取多個 Segment,就能夠提升整個作業的完成效率,不過這也需要你的任務自身對每個Topic 分區的訪問順序沒有嚴格要求,并且對于新產生的數據,是不保存在 Segement 的,還是需要做緩存的訪問來獲取數據,因此,并行讀取將成為一個可選項,為用戶提供更多的選擇方案。

以上就是如何進行Pulsar Connector機制的剖析,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

江门市| 赤峰市| 洮南市| 裕民县| 家居| 吕梁市| 北辰区| 成安县| 钟山县| 湘潭市| 新乐市| 林州市| 乌鲁木齐市| 阿拉尔市| 富蕴县| 长葛市| 陇西县| 太湖县| 丹凤县| 昔阳县| 南安市| 孟津县| 靖西县| 会同县| 九龙县| 博白县| 安顺市| 乌兰县| 呈贡县| 班戈县| 英德市| 荥经县| 保靖县| 沙洋县| 乐清市| 象山县| 达尔| 炎陵县| 宿松县| 乃东县| 阜宁县|