您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關使用Python掌握Apache Kafka應當了解的3個庫分別是哪些,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
Apache Kafka是一個分布式流平臺,可以實時發布,訂閱,存儲和處理消息。 其基于拉式的體系結構減輕了重負載對服務的壓力,使其易于擴展。 它以低延遲將大量數據從源移動到目的地。
Kafka是基于JVM的平臺,因此客戶端的主流編程語言是Java。 但是,隨著社區的蓬勃發展,高質量的開源Python客戶端也已面世,并已用于生產中。
在本文中,我將介紹最著名的Python Kafka客戶端:kafka-python,pykafka和confluent-kafka并進行比較。 最后,我將對每個圖書館的利弊發表自己的看法。
首先是第一件事。 為什么選擇Kafka? Kafka旨在增強事件驅動的體系結構。 它通過提供高吞吐量,低延遲,高耐用性和高可用性解決方案來增強體系結構。 (這并不意味著您可以同時擁有所有它們,總會有一個權衡。閱讀此白皮書以了解更多信息。)
Apache Kafka®是一個功率流處理平臺,他的白皮書討論了如何針對以下情況優化Kafka部署:
除了其高性能外,另一個吸引人的功能是發布/訂閱模型,其中發件人沒有專門向收件人發送郵件。 而是根據主題將郵件傳遞到收件人可以訂閱的集中位置。
這樣,我們可以輕松地將應用程序解耦并擺脫整體設計。 讓我們看一個例子,了解為什么去耦效果更好。
您創建的網站需要將用戶活動發送到某個地方,因此您可以編寫從網站到實時監控儀表板的直接連接。 這是一個簡單的解決方案,效果很好。 有一天,您決定將用戶活動存儲在數據庫中以備將來分析。 因此,您將另一個直接數據庫連接寫入到您的網站。 同時,您的網站越來越多的流量,并且您想通過添加警報服務,實時分析服務等來增強它的功能。
您的架構最終將像這樣。 諸如大量代碼倉庫,安全性問題,可伸縮性問題和可維護性問題之類的問題將傷害您。
> Architecture without decoupling (Created by Xiaoxu Gao)
您需要一個中心來分隔具有不同角色的應用程序。 對于創建事件的應用程序,我們稱它們為生產者。 他們將事件發布到集中式中心。 每個事件(即消息)都屬于一個主題。 消費者位于樞紐的另一側。 他們從中心訂閱了他們需要的主題,而無需直接與制作人交談。
有了此模型,就可以輕松擴展和維護體系結構。 工程師可以將更多精力放在核心業務上。
> Architecture with decoupling (Created by Xiaoxu Gao)
您可以從官方網站下載Apache Kafka。 快速入門可幫助您在10秒鐘內啟動服務器。
您也可以從Confluent平臺下載Apache Kafka。 它是迄今為止最大的面向Kafka的流數據平臺。 它為個人和企業提供了一系列圍繞Kafka的基礎架構服務,以便將數據作為實時流提供。 創始人是最初創建Apache Kafka的團隊的成員。
每臺Kafka服務器都稱為代理,您可以以獨立模式運行它或形成集群。 除了Kafka,我們還需要Zookeeper存儲有關Kafka的元數據。 Zookeeper的行為就像協調器一樣,負責管理分布式系統中每個代理的狀態。
> Kafka setup (Created by Xiaoxu Gao)
假設我們已經與1位Zookeeper和1位Kafka經紀人建立了基礎架構。 現在該連接了! 原始的Java客戶端提供5個API:
生產者API:將消息發布到Kafka集群中的主題。
使用者API:使用來自Kafka集群中主題的消息。
Streams API:使用主題中的消息,并將其轉換為Kafka集群中的其他主題。 這些操作可以是過濾,聯接,映射,分組等。
連接API:無需編碼即可直接將Kafka群集連接到源系統或接收器系統。 該系統可以是文件,關系數據庫,Elasticsearch等。
管理員API:管理和檢查Kafka集群中的主題和代理。
在Python世界中,已經實現了5個API中的3個,分別是Producer API,Consumer API和Admin API。 Python中還沒有這樣的Kafka Stream API,但是很好的替代方法是Faust。
本節中的測試是基于本地安裝的1個Zookeeper和1個Kafka代理執行的。 這與性能調整無關,所以我主要使用該庫提供的默認配置。
Kafka-Python
kafka-python的設計功能非常類似于官方的Java客戶端,并帶有大量pythonic接口。 最好與Kafka 0.9+版本一起使用。 第一版發布于2014年3月。正在積極維護中。
安裝
pip install kafka-python
每個消息都是通過send()異步發送的。 調用時,它將記錄添加到緩沖區并立即返回。 這使生產者可以以批處理方式將記錄發送到Kafka經紀人以提高效率。 異步可以極大地提高速度,但是我們還應該了解以下幾點:
在異步模式下,不能保證排序。 您無法控制Kafka經紀人何時確認(確認)每封郵件。
為生產者提供成功回調和失敗回調是一個好習慣。 例如,您可以在成功回調中編寫信息日志消息,而在失敗回調中編寫異常日志消息。
由于無法保證順序,因此在回調中收到異常之前,可能會發送額外的消息。
如果要避免這些問題,可以選擇同步發送消息。 send()的返回是FutureRecordMetadata。 通過執行future.get(timeout = 60),生產者將被阻止最多60秒鐘,直到代理成功確認消息為止。 缺點是速度,與異步模式相比,它相對較慢。
消費者
使用者實例是一個Python迭代器。 消費者類的核心是poll()方法。 它允許使用者繼續從主題中提取消息。 它的輸入參數timeout_ms之一默認為0,這意味著該方法將立即返回所有在緩沖區中拉出并可用的記錄。 您可以增加timeout_ms以返回更大的批次。
默認情況下,每個使用者都是一個無限的偵聽器,因此它不會停止運行,直到程序中斷。 但另一方面,您可以根據收到的消息停止使用者。 例如,您可以退出循環并在達到某個偏移量時關閉使用者。
也可以將使用者分配給一個分區或來自多個主題的多個分區。
這是kafka-python庫的測試結果。 每個消息的大小為100字節。 生產者的平均吞吐量為1.4MB / s。 使用者的平均吞吐量為2.8MB / s。
Confluent-kafka
Confluent-kafka是Python的高性能Kafka客戶端,它利用高性能C客戶端librdkafka。 從1.0版開始,這些作為PyPi上的OS X和Linux的獨立二進制輪分發。 它支持Kafka 0.8+版本。 第一版發布于2016年5月。正在積極維護中。
安裝
對于OS X和Linux,軟件包中包括librdkafka,需要單獨安裝。
pip install confluent-kafka
對于Windows用戶,在我撰寫本文時,confluent-kafka尚未在Windows上支持Python3.8二進制輪子。 您將遇到librdkafka的問題。 請查看他們的發行說明,該說明正在積極開發中。 另一種解決方案是降級到Python3.7。
Confluent-kafka在速度方面具有令人難以置信的性能。 API的設計有點類似于kafka-python。 您可以通過將flush()放入循環中來使其同步。
消費者
confluent-kafka中的Consumer API需要更多代碼。 您無需自己處理高級循環方法(例如,消耗()),而需要自己處理while循環。 我建議您創建自己的consump(),它實際上是一個Python生成器。 只要有一條消息被拉出并且在緩沖區中可用,它就會產生該消息。
這樣,主要功能將變得干凈,您可以自由控制消費者的行為。 例如,您可以在consumpt()中定義一個"會話窗口"。 如果在X秒鐘內未提取任何消息,則使用者將停止。 或者,您可以添加標志infinite = True作為輸入參數,以控制使用者是否應為無限偵聽器。
這是confluent-kafka庫的測試結果。 每個消息的大小為100字節。 生產者的平均吞吐量為21.97MBps。 消費者的平均吞吐量為16.8?28.7MB / s。
PyKafka
PyKafka是Python的程序員友好的Kafka客戶端。 它包括Kafka生產者和使用者的Python實現,可以選擇由基于librdkafka的C擴展支持。 它支持Kafka 0.82+版本。 第一版發布于2012年8月,但自2018年11月以來未進行過更新。
安裝
pip install pykafka
該軟件包不附帶librdkafka,您需要在所有操作系統中分別安裝。
pykafka具有KafkaClient接口,該接口涵蓋了ProducerAPI和Consumer API。
消息可以異步和同步模式發送。 我發現pykafka會修改某些生產者配置(例如linger_ms和min_queued_messages)的默認值,這會對發送少量數據產生影響。
您可以將其與Apache Kafka網站上的默認配置進行比較。
如果要獲取每個消息的回調,請確保將min_queued_messages更改為1,否則如果數據集小于70000,則不會收到任何報告。
> pykafka-producer-config
消費者
您可以從KafkaClinet界面獲取SimpleConsumer。 這類似于kafka-python,其中民意調查被包裝在SimpleConsumer類中。
這是pykafka庫的測試結果。 每個消息的大小為100字節。 生產者的平均吞吐量為2.1MB / s。 使用者的平均吞吐量為1.57MB / s。
到目前為止,我已經解釋了每個庫的Producer API和Consumer API。 就Admin API而言,kafka-python和confluent-kafka確實提供了顯式的Admin API。 您可以在要創建主題的單元測試中使用它,然后在執行下一個測試之前將其刪除。 此外,如果您想使用Python構建Kafka監控儀表板,則Admin API可以幫助您檢索集群和主題的元數據。
Confluent-kafka:
毫無疑問,Confluent-kafka在這三個庫中表現最佳。 該API的設計經過精心設計,參數與原始Apache Kafka相同的名稱和默認值。 您可以輕松地將其鏈接到原始參數。 就個人而言,我喜歡自定義消費者行為的靈活性。 Confluent也正在積極開發和支持它。
缺點是Windows用戶可能需要花費一些時間才能使其工作。 并且由于C擴展,調試可能很棘手。
kafka-python:
kafka-python是沒有C擴展的純Python庫。 該API經過精心設計,對于初學者來說很容易使用。 這也是一個積極開發的項目。
python-kafka的缺點是它的速度。 如果您確實關心性能,建議您改用confluent-kafka。
pykafka:
與kafka-python和conflunet-kafka相比,pykafka的開發活動較少。 該版本的歷史記錄表明,自2018年11月以來尚未進行過更新。此外,pykafka具有不同的API設計,并使用了不同的默認參數,這可能不是第一次。
以上就是使用Python掌握Apache Kafka應當了解的3個庫分別是哪些,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。