91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何解析Kafka在大數據環境中的應用

發布時間:2021-12-15 11:27:52 來源:億速云 閱讀:167 作者:柒染 欄目:大數據

這篇文章將為大家詳細講解有關如何解析Kafka在大數據環境中的應用,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。

我們生活在一個數據爆炸的時代,數據的巨量增長給我們的業務處理帶來了壓力,同時巨量的數據也給我們帶來了十分可觀的財富。隨著大數據將各個行業用戶、運營商、服務商的數據整合進大數據環境,或用戶取用大數據環境中海量的數據,業務平臺間的消息處理將變得尤為復雜。如何高效地采集、使用數據,如何減輕各業務系統的壓力,也變得越來越突出。在早期的系統實現時,業務比較簡單。即便是數據量、業務量比較大,大數據環境也能做出處理。但是隨著接入的系統增多,數據量、業務量增大,大數據環境、業務系統都可出現一定的瓶頸。下面我們看幾個場景。

場景一:我們開發過一個設備信息挖掘平臺。這個平臺需要實時將采集互聯網關采集到的路由節點的狀態信息存入數據中心。通常一個網關一次需要上報幾十甚至幾百個變化的路由信息。全區有幾萬個這種互聯網關。當信息采集平臺將這些變化的數據信息寫入或更新到數據庫時候,會給數據庫代理非常大的壓力,甚至可以直接將數據庫搞掛掉。這就對我們的數據采集系統提出了很高的要求。如何穩定高效地把消息更新到數據庫這一要求擺了出來。

場景二:數據中心處理過的數據需要實時共享給幾個不同的機構。我們常采用的方法是將數據批量存放在數據采集機,分支機構定時來采集;或是分支機構通過JDBC、RPC、http或其他機制實時從數據中心獲取數據。這兩種方式都存在一定的問題,前者在于實時性不足,還牽涉到數據完整性問題;后者在于,當數據量很大的時候,多個分支機構同時讀取數據,會對數據中心的造成很大的壓力,也造成很大的資源浪費。

為了解決以上場景提出的問題,我們需要這樣一個消息系統:

緩沖能力,系統可以提供一個緩沖區,當有大量數據來臨時,系統可以將數據可靠的緩沖起來,供后續模塊處理;

訂閱、分發能力,系統可以接收消息可靠的緩存下來,也可以將可靠緩存的數據發布給使用者。

這就要我們找一個高吞吐的、能滿足訂閱發布需求的系統。

Kafka是一個分布式的、高吞吐的、基于發布/訂閱的消息系統。利用kafka技術可以在廉價PC  Server上搭建起大規模的消息系統。Kafka具有消息持久化、高吞吐、分布式、實時、低耦合、多客戶端支持、數據可靠等諸多特點,適合在線和離線的消息處理。

使用kafka解決我們上述提到的問題。

如何解析Kafka在大數據環境中的應用

互聯網關采集到變化的路由信息,通過kafka的producer將歸集后的信息批量傳入kafka。Kafka按照接收順序對歸集的信息進行緩存,并加入待消費隊列。Kafka的consumer讀取隊列信息,并一定的處理策略,將獲取的信息更新到數據庫。完成數據到數據中心的存儲。

數據中心的數據需要共享時,kafka的producer先從數據中心讀取數據,然后傳入kafka緩存并加入待消費隊列。各分支結構作為數據消費者,啟動消費動作,從kafka隊列讀取數據,并對獲取的數據進行處理。

Kafka生產的代碼如下:

public void produce(){                          //生產消息預處理          produceInfoProcess();                 pro.send(ProducerRecord,new Callback(){                              @Override                              onCompletion() {                                       if (metadata == null) {                                                 // 發送失敗                                                 failedSend();                                       } else {                                                 //發送成功!"                                                  successedSend();       }                              }                                      });         }

消息生產者根據需求,靈活定義produceInfoProcess()方法,對相關數據進行處理。并依據數據發布到kafka的情況,處理回調機制。在數據發送失敗時,定義failedSend()方法;當數據發送成功時,定義successedSend()方法。

Kafka消費的代碼如下:

public void consumer() {                         //配置文件              properties();              //獲取當前數據的迭代器              iterator = stream.iterator();              while (iterator.hasNext()) {                  //取出消息                  MessageAndMetadata<byte[], byte[]> next = iterator.next();                  messageProcess();                   }            }

Kafka消費者會和kafka集群建立一個連接。從kafka讀取數據,調用messageProcess()方法,對獲取的數據靈活處理。

Kafka的高吞吐能力、緩存機制能有效的解決高峰流量沖擊問題。實踐表明,在未將kafka引入系統前,當互聯網關發送的數據量較大時,往往會掛起關系數據庫,數據常常丟失。在引入kafka后,更新程序能夠結合能力自主處理消息,不會引起數據丟失,關系型數據庫的壓力波動不會發生過于顯著的變化,不會出現數據庫掛起鎖死現象。

依靠kafka的訂閱分發機制,實現了一次發布,各分支依據需求自主訂閱的功能。避免了各分支機構直接向數據中心請求數據,或者數據中心依次批量向分支機構傳輸數據以致實時性不足的情況。kafka提高了實時性,減輕了數據中心的壓力,提高了效率。

關于如何解析Kafka在大數據環境中的應用就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

建始县| 涞水县| 济宁市| 南雄市| 内丘县| 永德县| 苍山县| 额济纳旗| 水富县| 夏津县| 宁强县| 象山县| 年辖:市辖区| 合肥市| 冷水江市| 蒙自县| 台东市| 苏州市| 西峡县| 砀山县| 太原市| 哈密市| 博乐市| 西充县| 临武县| 南陵县| 东乌| 泸西县| 凤凰县| 镶黄旗| 大埔区| 绥阳县| 旌德县| 米脂县| 阜南县| 大兴区| 高要市| 常熟市| 扶沟县| 遵义县| 屏边|