您好,登錄后才能下訂單哦!
這篇文章主要介紹了Kafka集群優化的方法是什么的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇Kafka集群優化的方法是什么文章都會有所收獲,下面我們一起來看看吧。
背景
個推作為專業的數據智能服務商,已經成功服務了數十萬APP,每日的消息下發量達百億級別,由此產生了海量日志數據。為了應對業務上的各種需求,我們需要采集并集中化日志進行計算,為此個推選用了高可用的、高可靠的、分布式的Flume系統以對海量日志進行采集、聚合和傳輸。此外,個推也不斷對Flume進行迭代升級,以實現自己對日志的特定需求。
原有的異地機房日志匯聚方式,整個流程相對來說比較簡單,A機房業務產生的日志通過多種方式寫入該機房Kafka集群,然后B機房的Flume通過網絡專線實時消費A機房Kafka的日志數據后寫入本機房的Kafka集群,所有機房的數據就是通過相同方式在B機房Kakfa集群中集中化管理。如圖一所示:
圖一:原有異地日志傳輸模式
但是隨著業務量的不斷增加,日志數據在逐漸增多的過程中對帶寬要求變高,帶寬的瓶頸問題日益凸顯。按照1G的專線帶寬成本2~3w/月來計算,一個異地機房一年僅專線帶寬擴容成本就高達30w以上。對此,如何找到一種成本更加低廉且符合當前業務預期的傳輸方案呢?Avro有快速壓縮的二進制數據形式,并能有效節約數據存儲空間和網絡傳輸帶寬,從而成為優選方案。
優化思路
Avro簡介
Avro是一個數據序列化系統。它是Hadoop的一個子項目,也是Apache的一個獨立的項目,其主要特點如下:
● 豐富的數據結構;
● 可壓縮、快速的二進制數據類型;
● 可持久化存儲的文件類型;
● 遠程過程調用(RPC);
● 提供的機制使動態語言可以方便地處理數據。
具體可參考官方網站:http://avro.apache.org/
Flume Avro方案
Flume的RPC Source是Avro Source,它被設計為高擴展的RPC服務端,能從其他Flume Agent 的Avro Sink或者Flume SDK客戶端,接收數據到Flume Agent中,具體流程如圖二所示:
圖二:Avro Source流程
針對該模式,我們的日志傳輸方案計劃變更為A機房部署Avro Sink用以消費該機房Kafka集群的日志數據,壓縮后發送到B機房的Avro Source,然后解壓寫入B機房的Kafka集群,具體的傳輸模式如圖三所示:
圖三:Flume Avro傳輸模式
可能存在的問題
我們預估可能存在的問題主要有以下三點:
● 當專線故障的時候,數據是否能保證完整性;
● 該模式下CPU和內存等硬件的消耗評估;
● 傳輸性能問題。
驗證情況
針對以上的幾個問題,我們做了幾項對比實驗。
環境準備情況說明:
1. 兩臺服務器192.168.10.81和192.168.10.82,以及每臺服務器上對應一個Kakfa集群,模擬A機房和B機房;
2. 兩個Kafka集群中對應topicA(源端)和topicB(目標端)。在topicA中寫入合計大小11G的日志數據用來模擬原始端日志數據。
3. 192.168.10.82上部署一個Flume,模擬原有傳輸方式。
4. 192.168.10.81服務器部署Avro Sink,192.168.10.82部署Avro Source,模擬Flume Avro傳輸模式。
原有Flume模式驗證(非Avro)
監控Kafka消費情況:
81流量統計:
82流量統計:
消費全部消息耗時:20min
消費總日志條數統計:129,748,260
總流量:13.5G
Avro模式驗證
配置說明:
Avro Sink配置:
#kafkasink 是kafkatokafka的sinks的名字,可配多個,空格分開kafkatokafka.sources = kafka_dmc_bulletkafkatokafka.channels = channel_dmc_bulletkafkatokafka.sinks = kafkasink_dmc_bulletkafkatokafka.sources.kafka_dmc_bullet.type = org.apache.flume.source.kafka.KafkaSourcekafkatokafka.sources.kafka_dmc_bullet.channels = channel_dmc_bulletkafkatokafka.sources.kafka_dmc_bullet.zookeeperConnect =192.168.10.81:2181kafkatokafka.sources.kafka_dmc_bullet.topic = topicAkafkatokafka.sources.kafka_dmc_bullet.kafka.zookeeper.connection.timeout.ms =150000kafkatokafka.sources.kafka_dmc_bullet.kafka.consumer.timeout.ms =10000kafkatokafka.sources.kafka_dmc_bullet.kafka.group.id = flumeavrokafkatokafka.sources.kafka_dmc_bullet.batchSize =5000#source kafkasink_dmc_bullet的配置,可配置多個sink提高壓縮傳輸效率kafkatokafka.sinks.kafkasink_dmc_bullet.type = org.apache.flume.sink.AvroSinkkafkatokafka.sinks.kafkasink_dmc_bullet.hostname =192.168.10.82kafkatokafka.sinks.kafkasink_dmc_bullet.port =55555//與source的rpc端口一一對應kafkatokafka.sinks.kafkasink_dmc_bullet.compression-type = deflate//壓縮模式kafkatokafka.sinks.kafkasink_dmc_bullet.compression-level =6//壓縮率1~9kafkatokafka.sinks.kafkasink_dmc_bullet.channel = channel_dmc_bulletkafkatokafka.sinks.kafkasink_dmc_bullet.channel = channel_dmc_bulletkafkatokafka.sinks.kafkasink_dmc_bullet.requiredAcks =1kafkatokafka.sinks.kafkasink_dmc_bullet.batchSize =5000#source kafkasink_dmc_bullet配的channel,只配一個kafkatokafka.channels.channel_dmc_bullet.type = memorykafkatokafka.channels.channel_dmc_bullet.capacity =100000#kafkatokafka.channels.channel_dmc_bullet.byteCapacity = 10000#kafkatokafka.channels.channel_dmc_bullet.byteCapacityBufferPercentage = 10kafkatokafka.channels.channel_dmc_bullet.transactionCapacity =5000kafkatokafka.channels.channel_dmc_bullet.keep-alive =60
Avro Source配置:
#kafkasink 是kafkatokafka的sinks的名字,可配多個,空格分開kafkatokafka.sources = kafka_dmc_bulletkafkatokafka.channels = channel_dmc_bulletkafkatokafka.sinks = kafkasink_dmc_bulletkafkatokafka.sources.kafka_dmc_bullet.type= avrokafkatokafka.sources.kafka_dmc_bullet.channels = channel_dmc_bulletkafkatokafka.sources.kafka_dmc_bullet.bind =0.0.0.0kafkatokafka.sources.kafka_dmc_bullet.port =55555//rpc端口綁定kafkatokafka.sources.kafka_dmc_bullet.compression-type= deflate//壓縮模式kafkatokafka.sources.kafka_dmc_bullet.batchSize =100#source kafkasink_dmc_bullet的配置kafkatokafka.sinks.kafkasink_dmc_bullet.type= org.apache.flume.sink.kafka.KafkaSinkkafkatokafka.sinks.kafkasink_dmc_bullet.kafka.partitioner.class = com.gexin.rp.base.kafka.SimplePartitionerkafkatokafka.sinks.kafkasink_dmc_bullet.channel = channel_dmc_bulletkafkatokafka.sinks.kafkasink_dmc_bullet.topic = topicBkafkatokafka.sinks.kafkasink_dmc_bullet.brokerList =192.168.10.82:9091,192.168.10.82:9092,192.168.10.82:9093kafkatokafka.sinks.kafkasink_dmc_bullet.requiredAcks =1kafkatokafka.sinks.kafkasink_dmc_bullet.batchSize =500kafkatokafka.channels.channel_dmc_bullet.type= memorykafkatokafka.channels.channel_dmc_bullet.capacity =100000kafkatokafka.channels.channel_dmc_bullet.transactionCapacity =1000
監控Kafka消費情況
81流量統計:
82流量統計:
消費全部消息耗時:26min
消費總日志條數統計:129,748,260
總流量:1.69G
故障模擬
1. 模擬專線故障,在A、B兩機房不通的情況下,Avro Sink報錯如下:
2. 監控Kafka消費情況,發現消費者已停止消費:
3. 故障處理恢復后繼續消費剩余日志,經統計,總日志條數為:129,747,255。
結論
1. 當專線發生故障時,正在網絡傳輸中的通道外數據可能會有少部分丟失,其丟失原因為網絡原因,與Avro模式無關;故障后停止消費的數據不會有任何的丟失問題,由于網絡原因丟失的數據需要評估其重要性以及是否需要補傳。
2. 流量壓縮率達80%以上,同時我們也測試了等級為1~9的壓縮率,6跟9非常接近,CPU和內存的使用率與原有傳輸模式相差不大,帶寬的優化效果比較明顯。
3. 傳輸性能由于壓縮的原因適當變弱,單Sink由原先20分鐘延長至26分鐘,可適當增加Sink的個數來提高傳輸速率。
生產環境實施結果
實施結果如下:
1. 由于還有其它業務的帶寬占用,總帶寬使用率節省了50%以上,現階段高峰期帶寬速率不超過400Mbps;
2. 每個Sink傳輸速率的極限大概是3000條每秒,壓縮傳輸速率問題通過增加Sink的方式解決,但會適當增加CPU和內存的損耗。
關于“Kafka集群優化的方法是什么”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“Kafka集群優化的方法是什么”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。