您好,登錄后才能下訂單哦!
如何將kafka中的數據快速導入Hadoop,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。
Kafka是一個分布式發布—訂閱系統,由于其強大的分布式和性能特性,迅速成為數據管道的關鍵部分。它可完成許多工作,例如消息傳遞、指標收集、流處理和日志聚合。Kafka的另一個有效用途是將數據導入Hadoop。使用Kafka的關鍵原因是它將數據生產者和消費者分離,允許擁有多個獨立的生產者(可能由不同的開發團隊編寫)。同樣,還有多個獨立的消費者(也可能由不同的團隊編寫)。此外,消費者可以是實時/同步或批量/離線/異步。當對比RabbitMQ等其他pub-sub工具時,后一個屬性有很大區別。
要使用Kafka,有一些需要理解的概念:
topic—topic是相關消息的訂閱源;
分區—每個topic由一個或多個分區組成,這些分區是由日志文件支持的有序消息隊列;
生產者和消費者—生產者和消費者將消息寫入分區并從分區讀取。
Brokers—Brokers是管理topic和分區并為生產者和消費者請求提供服務的Kafka流程。
Kafka不保證對topic的“完全”排序,只保證組成topic的各個分區是有序的。消費者應用程序可以根據需要強制執行對“全局”topic排序。
圖5.14 顯示了Kafka的概念模型
圖5.15 顯示了如何在Kafka部署分發分區的示例
為了支持容錯,可以復制topic,這意味著每個分區可以在不同主機上具有可配置數量的副本。這提供了更高的容錯能力,這意味著單個服務器死亡對數據或生產者和消費者的可用性來說不是災難性的。
此處采用Kafka版本0.8和Camus的0.8.X。
實踐:使用Camus將Avro數據從Kafka復制到HDFS
該技巧在已經將數據流入Kafka用于其他目的并且希望將數據置于HDFS中的情況下非常有用。
問題
希望使用Kafka作為數據傳遞機制來將數據導入HDFS。
解決方案
使用LinkedIn開發的解決方案Camus將Kafka中的數據復制到HDFS。
討論
Camus是LinkedIn開發的一個開源項目。Kafka在LinkedIn大量部署,而Camus則用作將數據從Kafka復制到HDFS。
開箱即用,Camus支持Kafka中的兩種數據格式:JSON和Avro。在這種技術中,我們將通過Camus使用Avro數據。Camus對Avro的內置支持要求Kafka發布者以專有方式編寫Avro數據,因此對于這種技術,我們假設希望在Kafka中使用vanilla序列化數據。
讓這項技術發揮作用需要完成三個部分的工作:首先要將一些Avro數據寫入Kafka,然后編寫一個簡單的類來幫助Camus反序列化Avro數據,最后運行一個Camus作業來執行數據導入。
為了把Avro記錄寫入Kafka,在以下代碼中,需要通過配置必需的Kafka屬性來設置Kafka生成器,從文件加載一些Avro記錄,并將它們寫出到Kafka:
可以使用以下命令將樣本數據加載到名為test的Kafka的topic中:
Kafka控制臺使用者可用于驗證數據是否已寫入Kafka,這會將二進制Avro數據轉儲到控制臺:
完成后,編寫一些Camus代碼,以便可以在Camus中閱讀這些Avro記錄。
實踐:編寫Camus和模式注冊表
首先,需要了解三種Camus概念:
解碼器—解碼器的工作是將從Kafka提取的原始數據轉換為Camus格式。
編碼器—編碼器將解碼數據序列化為將存儲在HDFS中的格式。
Schema注冊表—提供有關正在編碼的Avro數據的schema信息。
正如前面提到的,Camus支持Avro數據,但確實需要Kafka生產者使用Camus KafkaAvroMessageEncoder類來編寫數據,該類為Avro序列化二進制數據添加了部分專有數據,可能是因為Camus中的解碼器可以驗證它是由該類編寫的。
在此示例中,使用 Avro serialization進行序列化,因此需要編寫自己的解碼器。幸運的是,這很簡單:
你可能已經注意到我們在Kafka中寫了一個特定的Avro記錄,但在Camus中我們將該記錄讀作通用的Avro記錄,而不是特定的Avro記錄,這是因為CamusWrapper類僅支持通用Avro記錄。否則,特定的Avro記錄可以更簡單地使用,因為可以使用生成的代碼并具有隨之而來的所有安全特征。
CamusWrapper對象是從Kafka提取的數據。此類存在的原因是允許將元數據粘貼到envelope中,例如時間戳,服務器名稱和服務詳細信息。強烈建議使用的任何數據都有一些與每條記錄相關的有意義的時間戳(通常這將是創建或生成記錄的時間)。然后,可以使用接受時間戳作為參數的CamusWrapper構造函數:
public CamusWrapper(R record, long timestamp) { ... }
如果未設置時間戳,則Camus將在創建包裝器時創建新的時間戳。在確定輸出記錄的HDFS位置時,在Camus中使用此時間戳和其他元數據。
接下來,需要編寫一個schema注冊表,以便Camus Avro編碼器知道正在寫入HDFS的Avro記錄的schema詳細信息。注冊架構時,還要指定從中拉出Avro記錄的Kafka的topic名稱:
運行Camus
Camus在Hadoop集群上作為MapReduce作業運行,希望在該集群中導入Kafka數據。需要向Camus提供一堆屬性,可以使用命令行或者使用屬性文件來執行此操作,我們將使用此技術的屬性文件:
從屬性中可以看出,無需明確告訴Camus要導入哪些topic。Camus自動與Kafka通信以發現topic(和分區)以及當前的開始和結束偏移。
如果想要精確控制導入的topic,可以分別使用kafka.whitelist.topics和kafka.blacklist.topics列舉白名單(限制topic)和黑名單(排除topic),可以使用逗號作為分隔符指定多個topic,還支持正則表達式,如以下示例所示,其匹配topic的“topic1”或以“abc”開頭,后跟一個或多個數字的任何topic,可以使用與value完全相同的語法指定黑名單:
kafka.whitelist.topics=topic1,abc[0-9]+
一旦屬性全部設置完畢,就可以運行Camus作業了:
這將導致Avro數據在HDFS中著陸。我們來看看HDFS中的內容:
第一個文件包含已導入的數據,其他供Camus管理。
可以使用AvroDump實用程序查看HDFS中的數據文件:
那么,當Camus工作正在運行時究竟發生了什么? Camus導入過程作為MapReduce作業執行,如圖5.16所示。
隨著MapReduce中的Camus任務成功,Camus OutputCommitter(允許在任務完成時執行自定義工作的MapReduce構造)以原子方式將任務的數據文件移動到目標目錄。OutputCommitter還為任務正在處理的所有分區創建偏移文件,同一作業中的其他任務可能會失敗,但這不會影響成功任務的狀態——成功任務的數據和偏移輸出仍然存在,因此后續的Camus執行將從最后一個已知的成功狀態恢復處理。
接下來,讓我們看看Camus導入數據的位置以及如何控制行為。
數據分區
之前,我們看到了Camus導入位于Kafka的Avro數據,讓我們仔細看看HDFS路徑結構,如圖5.17所示,看看可以做些什么來確定位置。
圖5.17 在HDFS中解析導出數據的Camus輸出路徑
路徑的日期/時間由從CamusWrapper中提取的時間戳確定,可以從MessageDecoder中的Kafka記錄中提取時間戳,并將它們提供給CamusWrapper,這將允許按照有意義的日期對數據進行分區,而不是默認值,這只是在MapReduce中讀取Kafka記錄的時間。
Camus支持可插拔分區程序,允許控制圖5.18所示路徑的一部分。
圖5.18 Camus分區路徑
Camus Partitioner接口提供了兩種必須實現的方法:
例如,自定義分區程序可創建用于Hive分區的路徑。
Camus提供了一個完整的解決方案,可以在HDFS中從Kafka獲取數據,并在出現問題時負責維護狀態和進行錯誤處理。通過將其與Azkaban或Oozie集成,可以輕松實現自動化,并根據消息時間組織HDFS數據執行簡單的數據管理。值得一提的是,當涉及到ETL時,與Flume相比,它的功能是無懈可擊的。
Kafka捆綁了一種將數據導入HDFS的機制。它有一個KafkaETLInputFormat輸入格式類,可用于在MapReduce作業中從Kafka提取數據。要求編寫MapReduce作業以執行導入,但優點是可以直接在MapReduce流中使用數據,而不是將HDFS用作數據的中間存儲。
看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業資訊頻道,感謝您對億速云的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。