您好,登錄后才能下訂單哦!
[TOC]
? Kafka是一個分布式消息隊列,采用scala語言開發。Kafka對消息保存時根據Topic進行歸類,發送消息者稱為Producer,消息接受者稱為Consumer,此外kafka集群有多個kafka實例組成,每個實例(server)成為broker。無論是kafka集群,還是producer和consumer都依賴于zookeeper集群保存一些meta信息,來保證系統可用性。
(1)點對點模式(類似接受文件,一對一,消費者主動拉取數據,消息收到后消息清除)點對點模型通常是一個基于拉取或者輪詢的消息傳送模型,這種模型從隊列中請求信息,而不是將消息推送到客戶端。這個模型的特點是發送到隊列的消息被一個且只有一個接收者接收處理,即使有多個消息監聽者也是如此。
(2)發布/訂閱模式(類似公眾號,一對多,數據生產后,推送給所有訂閱者)
發布訂閱模型則是一個基于推送的消息傳送模型。發布訂閱模型可以有多種不同的訂閱者,臨時訂閱者只在主動監聽主題時才接收消息,而持久訂閱者則監聽主題的所有消息,即使當前訂閱者不可用,處于離線狀態。
1)解耦:
允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。
2)冗余:
消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。許多消息隊列所采用的"插入-獲取-刪除"范式中,在把一個消息從隊列中刪除之前,需要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。
3)擴展性:
因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。
4)靈活性 & 峰值處理能力:
在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量并不常見。如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。
5)可恢復性:
系統的一部分組件失效時,不會影響到整個系統。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復后被處理。
6)順序保證:
在大多使用場景下,數據處理的順序都很重要。大部分消息隊列本來就是排序的,并且能保證數據會按照特定的順序來處理。(Kafka保證一個Partition內的消息的有序性,無法保證整體有序,觸發一個topic只有一個partition)
7)緩沖:
有助于控制和優化數據流經過系統的速度,解決生產消息和消費消息的處理速度不一致的情況。
8)異步通信:
很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但并不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。
? 消息由生產者發布到Kafka集群后,會被消費者消費。消息的消費模型有兩種:推送模型(push)和拉取模型(pull)。
? 基于推送模型(push)的消息系統,由消息代理記錄消費者的消費狀態。消息代理在將消息推送到消費者后,標記這條消息為已消費,但這種方式無法很好地保證消息被處理。比如,消息代理把消息發送出去后,當消費進程掛掉或者由于網絡原因沒有收到這條消息時,就有可能造成消息丟失(因為消息代理已經把這條消息標記為已消費了,但實際上這條消息并沒有被實際處理)。如果要保證消息被處理,消息代理發送完消息后,要設置狀態為“已發送”,只有收到消費者的確認請求后才更新為“已消費”,這就需要消息代理中記錄所有的消費狀態,這種做法顯然是不可取的。
? Kafka采用拉取模型,由消費者自己記錄消費狀態,每個消費者互相獨立地順序讀取每個分區的消息。如下圖所示,有兩個消費者(不同消費者組)拉取同一個主題的消息,消費者A的消費進度是3,消費者B的消費進度是6。消費者拉取的最大上限通過最高水位(watermark)控制(也就是只能取到當前topic的最后一條消息),生產者最新寫入的消息如果還沒有達到備份數量(也就是要保證副本數寫入完成,從而保證消息不丟失,由此才讓該消息給消費者消費),對消費者是不可見的。這種由消費者控制偏移量的優點是:消費者可以按照任意的順序消費消息。比如,消費者可以重置到舊的偏移量,重新處理之前已經消費過的消息;或者直接跳到最近的位置,從當前的時刻開始消費。
? 圖1.1 kafka消費模型
? 在一些消息系統中,消息代理會在消息被消費之后立即刪除消息。如果有不同類型的消費者訂閱同一個主題,消息代理可能需要冗余地存儲同一消息;或者等所有消費者都消費完才刪除,這就需要消息代理跟蹤每個消費者的消費狀態,這種設計很大程度上限制了消息系統的整體吞吐量和處理延遲。Kafka的做法是生產者發布的所有消息會一致保存在Kafka集群中,不管消息有沒有被消費。用戶可以通過設置保留時間來清理過期的數據,比如,設置保留策略為兩天。那么,在消息發布之后,它可以被不同的消費者消費,在兩天之后,過期的消息就會自動清理掉。
1)Producer :消息生產者,就是向kafka broker發消息的客戶端。
2)Consumer :消息消費者,向kafka broker取 消息的客戶端
3)Topic :可以理解為一個隊列。是消息的一個分組
4) Consumer Group (CG):kafka提供的可擴展且具有容錯性的消費者機制。既然是一個組,那么組內必然可以有多個消費者或消費者實例(consumer instance),它們共享一個公共的ID,即group ID。組內的所有消費者協調在一起來消費訂閱主題(subscribed topics)的所有分區(partition)。當然,每個分區只能由同一個消費組內的一個consumer來消費。但是不同消費者組消費同一個topic是可以的,而且互不影響。消費者可以通過水平擴展的方式同時讀取大量的消息。另外,如果一個消費者失敗了,那么其他的group成員會自動負載均衡讀取之前失敗的消費者讀取的分區
5)Broker :一臺kafka服務器就是一個broker。一個集群由多個broker組成。一個broker可以容納多個topic。
6)Partition:為了實現擴展性,一個非常大的topic可以分布到多個broker(即服務器)上,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序將消息發給consumer,不保證一個topic的整體(多個partition間)的順序。
7)Offset:kafka的存儲文件都是按照offset.kafka來命名,用offset做名字的好處是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。當然the first offset就是00000000000.kafka
一般來說,假設 "test" 這個topic有兩個分區,那么該topic的存儲目錄有兩個,命名為:test-0,test-1 ,然后對應分區的目錄保存對應的數據
首先kafka依賴zookeeper存儲元信息、且需要jdk來運行程序。所以需要事先部署好這兩個。請看之前的文章。
準備好三臺虛擬機:
bigdata121 | bigdata122 | bigdata123 |
---|---|---|
zookeeper1 | zookeeper2 | zookeeper3 |
kafka1 | kafka | kafka3 |
軟件版本:
jdk | 1.8 |
---|---|
zookeeper | 3.4.10 |
kafka | 2.1.1 |
centos | 7.2.1511 |
bigdata121:
1、解壓:
tar zxf kafka_2.11-2.1.1.tgz -C /opt/modules/
2、創建日志目錄:
mkdir /opt/modules/kafka_2.11-2.1.1/logs
3、修改kafka server配置文件:
vim /opt/modules/kafka_2.11-2.1.1/config/server.properties
#### 修改一些關鍵性配置
#broker的全局唯一編號,不能重復
broker.id=0
#是否允許刪除topic,測試環境方便測試設置為true,生產環境建議設置為false
delete.topic.enable=true
#kafka運行日志存放的路徑
log.dirs=/opt/modules/kafka_2.11-2.1.1/logs
#配置連接Zookeeper集群地址,并且/path/to 是指定在zookeeper中存儲的根節點路徑,比如 /root
zookeeper.connect=bigdata121:2181,bigdata122:2181,bigdata123:2181/path/to
4、配置環境變量
vim /etc/profile.d/kafka.sh
#!/bin/bash
export KAFKA_HOME=/opt/modules/kafka_2.11-2.1.1
export PATH=$PATH:${KAFKA_HOME}/bin
5、啟用環境變量
source /etc/profile.d/kafka.sh
配置好后,將kafka的整個目錄rsync到其他兩臺主機的 /opt/modules 下,并修改
/opt/modules/kafka_2.11-2.1.1/config/server.properties 這個配置文件
broker.id=1、broker.id=2
反正就是每個broker的id必須唯一
分別在三臺機器上啟動kafka集群節點:
kafka-server-start.sh -daemon config/server.properties
-daemon 表示以后臺進程方式啟動kafka服務
config/server.properties server的配置文件路徑
停止當前節點:
kafka-server-stop.sh
1)查看當前服務器中的所有topic
[root@bigdata11 kafka]$ bin/kafka-topics.sh --zookeeper bigdata13:2181 --list
2)創建topic
[root@bigdata11 kafka]$ bin/kafka-topics.sh --zookeeper bigdata13:2181 --create --replication-factor 3 --partitions 1 --topic first
選項說明:
--topic 定義topic名
--replication-factor 定義副本數
--partitions 定義分區數
3)刪除topic
[root@bigdata11 kafka]$ bin/kafka-topics.sh --zookeeper bigdata11:2181 --delete --topic first
需要server.properties中設置delete.topic.enable=true否則只是標記刪除或者直接重啟。
4)發送消息
[root@bigdata11 kafka]$ bin/kafka-console-producer.sh --broker-list bigdata11:9092 --topic first
>hello world
5)消費消息
[root@bigdata12 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server node3:9092 --from-beginning --topic first
--from-beginning:會把first主題中以往所有的數據都讀取出來。根據業務場景選擇是否增加該配置。
6)查看某個Topic的詳情
[root@bigdata11 kafka]$ bin/kafka-topics.sh --zookeeper bigdata11:2181 --describe --topic first
? producer采用推(push)模式將消息發布到broker,每條消息都被追加(append)到分區(patition)中,屬于順序寫磁盤(順序寫磁盤效率比隨機寫內存要高,保障kafka吞吐率)
? Kafka集群有多個消息代理服務器(broker-server)組成,發布到Kafka集群的每條消息都有一個類別,用主題(topic)來表示。通常,不同應用產生不同類型的數據,可以設置不同的主題。一個主題一般會有多個消息的訂閱者,當生產者發布消息到某個主題時,訂閱了這個主題的消費者都可以接收到生成者寫入的新消息。
? Kafka集群為每個主題維護了分布式的分區(partition)日志文件,物理意義上可以把主題(topic)看作進行了分區的日志文件(partition log)。主題的每個分區都是一個有序的、不可變的記錄序列,新的消息會不斷追加到日志中。分區中的每條消息都會按照時間順序分配到一個單調遞增的順序編號,叫做偏移量(offset),這個偏移量能夠唯一地定位當前分區中的每一條消息。
? 消息發送時都被發送到一個topic,其本質就是一個目錄,而topic是由一些Partition Logs(分區日志)組成,其組織結構如下圖所示:下圖中的topic有3個分區,每個分區的偏移量都從0開始,不同分區之間的偏移量都是獨立的,不會相互影響。
? 圖3.1 kafka寫入方式
? 圖3.2 kafka分區讀取
? 我們可以看到,每個Partition中的消息都是有序的,生產的消息被不斷追加到Partition log上,其中的每一個消息都被賦予了一個唯一的offset值。
發布到Kafka主題的每條消息包括鍵值和時間戳。消息到達服務器端的指定分區后,都會分配到一個自增的偏移量。原始的消息內容和分配的偏移量以及其他一些元數據信息最后都會存儲到分區日志文件中。消息的鍵也可以不用設置,這種情況下消息會均衡地分布到不同的分區。
(1)方便在集群中擴展,每個Partition可以通過調整以適應它所在的機器,而一個topic又可以有多個Partition組成,因此整個集群就可以適應任意大小的數據了;
(2)可以提高并發,因為可以以Partition為單位讀寫了。
傳統消息系統在服務端保持消息的順序,如果有多個消費者消費同一個消息隊列,服務端會以消費存儲的順序依次發送給消費者。但由于消息是異步發送給消費者的,消息到達消費者的順序可能是無序的,這就意味著在并行消費時,傳統消息系統無法很好地保證消息被順序處理。雖然我們可以設置一個專用的消費者只消費一個隊列,以此來解決消息順序的問題,但是這就使得消費處理無法真正執行。
Kafka比傳統消息系統有更強的順序性保證,它使用主題的分區作為消息處理的并行單元。Kafka以分區作為最小的粒度,將每個分區分配給消費者組中不同的而且是唯一的消費者,并確保一個分區只屬于一個消費者,即這個消費者就是這個分區的唯一讀取線程。那么,只要分區的消息是有序的,消費者處理的消息順序就有保證。每個主題有多個分區,不同的消費者處理不同的分區,所以Kafka不僅保證了消息的有序性,也做到了消費者的負載均衡。
(1)指定了patition,則直接使用;
(2)未指定patition但指定key,通過對key進行hash出一個patition
(3)patition和key都未指定,使用輪詢選出一個patition。
下面看看這個默認的partition實現類的源碼:
DefaultPartitioner類
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
//未指定key,輪詢獲取分區號
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
//這里就是當指定了key時,對key進行hash來獲取分區號
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
? 同一個partition可能會有多個replication(對應 server.properties 配置中的 default.replication.factor=N)。沒有replication的情況下,一旦broker 宕機,其上所有 patition 的數據都不可被消費(數據直接丟失了),同時producer也不能再將數據存于其上的patition。引入replication之后,同一個partition可能會有多個replication,而這時需要在這些replication之間選出一個leader,producer和consumer只與這個leader交互(讀寫操作都只會和leader交互),其它replication作為follower從leader 中復制數據,不會執行其他操作。當leader掛了時,會在follower中選出新的leader。
1) producer先從zookeeper的 "/brokers/.../state"節點找到該partition的leader
比如完整的路徑如:/brokers/topics/TOPIC_NAME/partitions/NUM_OF_PARTITION/state
2)producer將消息發送給該leader
3)leader將消息寫入本地log
4)followers從leader pull消息,寫入本地log后向leader發送ACK
5)leader收到所有ISR中的replication的ACK后,增加HW(high watermark,最后commit 的offset)并向producer發送ACK
物理上把topic分成一個或多個patition(對應 server.properties 中的num.partitions=3配置,這是默認分區個數,創建topic時可以手動指定分區個數),每個patition物理上對應一個文件夾(該文件夾存儲該patition的所有消息和索引文件),如下:
分區目錄命名方式為 topicName-partiontionNum 的形式
首先,我們創建了first這個topic,有三個partition,0、1、2
[root@bigdata11 logs]$ ll
drwxrwxr-x. 2 root root 4096 8月 6 14:37 first-0
drwxrwxr-x. 2 root root 4096 8月 6 14:35 first-1
drwxrwxr-x. 2 root root 4096 8月 6 14:37 first-2
[root@bigdata11 logs]$ cd first-0
[root@bigdata11 first-0]$ ll
-rw-rw-r--. 1 root root 10485760 8月 6 14:33 00000000000000000000.index 這是索引
-rw-rw-r--. 1 root root 219 8月 6 15:07 00000000000000000000.log 這是分區日志,也就是存儲消息的地方
-rw-rw-r--. 1 root root 10485756 8月 6 14:33 00000000000000000000.timeindex
-rw-rw-r--. 1 root root 8 8月 6 14:37 leader-epoch-checkpoint
前面說到,無論消息是否被消費,kafka都會保留所有消息,消費者可以根據需要隨時從需要offset消費數據。有兩種策略可以刪除舊數據:
1)基于時間:log.retention.hours=168,也就是默認刪除7天前的數據
2)基于大小:log.retention.bytes=1073741824,超過1GB刪除
需要注意的是,因為Kafka讀取特定消息的時間復雜度為O(1)(因為是通過索引直接定位讀取,所以和大小無關),即與文件大小無關,所以這里刪除過期文件與提高 Kafka 性能無關。
3.2.3 zookeeper存儲結構
zookeeper存儲了整個kafka集群的一些元信息,比如有哪些broker,哪些topic等。下面看看結構:
? 圖3.3 zookeeper存儲結構
其中某些目錄的作用如下:
/brokers/topics/TOPIC_NAME/partitions/PARTITION_NUM/state:
指定topic的指定分區的元信息,里面存儲了該分區leader所在broker的id,以及所有副本存儲在哪些broker中。
/brokers/ids/xxxx:
有哪些broker,以及對應的id
/consumer:
注冊的consumer的信息,例如消費者組id、消費的topic、消費的offset、消費者組中的哪個消費者消費哪個partition等
要注意的是,只有consumer會在zookeeper注冊,producer不會在zookeeper注冊
kafka支持高級api和低級api進行操作。
1)高級API優點
高級API 寫起來簡單
不需要自行去管理offset,系統通過zookeeper自行管理。
不需要管理分區,副本等情況,系統自動管理。
消費者斷線會自動根據上一次記錄在zookeeper中的offset去接著獲取數據(默認設置1分鐘更新一下zookeeper中存的offset)
可以使用group來區分對同一個topic 的不同程序訪問分離開來(不同的group記錄不同的offset,這樣不同程序讀取同一個topic才不會因為offset互相影響)
2)高級API缺點
不能自行控制offset(對于某些特殊需求來說)
不能細化控制如分區、副本、zk等
1)低級 API 優點
能夠讓開發者自己控制offset,想從哪里讀取就從哪里讀取。
自行控制連接分區,對分區自定義進行負載均衡
對zookeeper的依賴性降低(如:offset不一定非要靠zk存儲,自行存儲offset即可,比如存在文件或者內存中)
2)低級API缺點
太過復雜,需要自行控制offset,連接哪個分區,找到分區leader 等。
? 消費者是以consumer group消費者組的方式工作,由一個或者多個消費者組成一個組,共同消費一個topic。每個分區在同一時間只能由group中的一個消費者讀取,但是多個group可以同時消費這個partition。某個消費者讀取某個分區,也可以叫做某個消費者是某個分區的擁有者。
? 在這種情況下,消費者可以通過水平擴展的方式同時讀取大量的消息。另外,如果一個消費者失敗了,那么其他的group成員會自動負載均衡讀取之前失敗的消費者讀取的分區。
? consumer采用pull(拉)模式從broker中讀取數據。
? push(推)模式很難適應消費速率不同的消費者,因為消息發送速率是由broker決定的。它的目標是盡可能以最快速度傳遞消息,但是這樣很容易造成consumer來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而pull模式則可以根據consumer的消費能力以適當的速率消費消息。
? 對于Kafka而言,pull模式更合適,它可簡化broker的設計,consumer可自主控制消費消息的速率,同時consumer可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實現不同的傳輸語義。
? pull模式不足之處是,如果kafka沒有數據,消費者可能會陷入循環中,一直等待數據到達。為了避免這種情況,我們在我們的拉請求中有參數,允許消費者請求在等待數據到達的“長輪詢”中進行阻塞(并且可選地等待到給定的字節數,以確保大的傳輸大小)。
idea創建maven工程,添加kafka依賴:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.1.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.1.1</version>
</dependency>
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class OldProducer {
@SuppressWarnings("deprecation")
public static void main(String[] args) {
Properties properties = new Properties();
//指定broker地址列表
properties.put("metadata.broker.list", "bigdata11:9092");
//指定producer需要broker發送ack確認收到消息
properties.put("request.required.acks", "1");
//指定序列化類
properties.put("serializer.class", "kafka.serializer.StringEncoder");
//使用上面的配置項創建kafka producer
Producer<Integer, String> producer = new Producer<Integer,String>(new ProducerConfig(properties));
//發送消息
KeyedMessage<Integer, String> message = new KeyedMessage<Integer, String>("first", "hello world");
producer.send(message );
}
}
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class NewProducer {
public static void main(String[] args) {
Properties props = new Properties();
// Kafka服務端的主機名和端口號
props.put("bootstrap.servers", "bigdata12:9092");
// 等待所有副本節點的應答
props.put("acks", "all");
// 消息發送最大嘗試次數
props.put("retries", 0);
// 一批消息處理大小
props.put("batch.size", 16384);
// 請求延時
props.put("linger.ms", 1);
// 發送緩存區內存大小
props.put("buffer.memory", 33554432);
// key序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 50; i++) {
producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), "hello world-" + i));
}
producer.close();
}
}
package com.king.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class CallBackProducer {
public static void main(String[] args) {
Properties props = new Properties();
// Kafka服務端的主機名和端口號
props.put("bootstrap.servers", "bigdata12:9092");
// 等待所有副本節點的應答
props.put("acks", "all");
// 消息發送最大嘗試次數
props.put("retries", 0);
// 一批消息處理大小
props.put("batch.size", 16384);
// 增加服務端請求延時
props.put("linger.ms", 1);
// 發送緩存區內存大小
props.put("buffer.memory", 33554432);
// key序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
for (int i = 0; i < 50; i++) {
kafkaProducer.send(new ProducerRecord<String, String>("first", "hello" + i), new Callback() {
//重寫里面的回到方法
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (metadata != null) {
System.out.println(metadata.partition() + "---" + metadata.offset());
}
}
});
}
kafkaProducer.close();
}
}
舊api:
import java.util.Map;
import kafka.producer.Partitioner;
public class CustomPartitioner implements Partitioner {
public CustomPartitioner() {
super();
}
@Override
public int partition(Object key, int numPartitions) {
// 控制分區
return 0;
}
}
新api:
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
public class CustomPartitioner implements Partitioner {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 控制分區
return 0;
}
@Override
public void close() {
}
}
實現好自定義的分區類之后,需要在創建producer的配置項添加指定自定義分區類的配置:
properties.put("partitioner.class", "自定義的分區類名,需要全類名");
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class CustomConsumer {
@SuppressWarnings("deprecation")
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("zookeeper.connect", "bigdata11:2181");
properties.put("group.id", "g1");
properties.put("zookeeper.session.timeout.ms", "500");
properties.put("zookeeper.sync.time.ms", "250");
properties.put("auto.commit.interval.ms", "1000");
// 創建消費者連接器
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
//需要自己維護offset
HashMap<String, Integer> topicCount = new HashMap<>();
topicCount.put("first", 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCount);
KafkaStream<byte[], byte[]> stream = consumerMap.get("first").get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
System.out.println(new String(it.next().message()));
}
}
}
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class CustomNewConsumer {
public static void main(String[] args) {
Properties props = new Properties();
// 定義kakfa 服務的地址,不需要將所有broker指定上
props.put("bootstrap.servers", "bigdata11:9092");
// 制定consumer group
props.put("group.id", "test");
// 是否自動確認offset
props.put("enable.auto.commit", "true");
// 自動確認offset的時間間隔
props.put("auto.commit.interval.ms", "1000");
// key的序列化類
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// value的序列化類
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 定義consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 消費者訂閱的topic, 可同時訂閱多個
consumer.subscribe(Arrays.asList("first", "second","third"));
while (true) {
// 讀取數據,讀取超時時間為100ms
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
? Producer攔截器(interceptor)是在Kafka 0.10版本被引入的,主要用于實現clients端的定制化控制邏輯。對于producer而言,interceptor使得用戶在消息發送前以及producer回調邏輯前有機會對消息做一些定制化需求,比如修改消息等。同時,producer允許用戶指定多個interceptor按序作用于同一條消息從而形成一個攔截鏈(interceptor chain)。Intercetpor的實現接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定義的方法包括:
(1)configure(configs)
獲取配置信息和初始化數據時調用。
(2)onSend(ProducerRecord):
該方法封裝進KafkaProducer.send方法中,即它運行在用戶主線程中。Producer確保在消息被序列化以及計算分區前調用該方法。用戶可以在該方法中對消息做任何操作,但最好保證不要修改消息所屬的topic和分區,否則會影響目標分區的計算
(3)onAcknowledgement(RecordMetadata, Exception):
該方法會在消息被應答或消息發送失敗時調用,并且通常都是在producer回調邏輯觸發之前。onAcknowledgement運行在producer的IO線程中,因此不要在該方法中放入很重的邏輯,否則會拖慢producer的消息發送效率
(4)close:
關閉interceptor,主要用于執行一些資源清理工作
如前所述,interceptor可能被運行在多個線程中,因此在具體實現時用戶需要自行確保線程安全。另外倘若指定了多個interceptor,則producer將按照指定順序調用它們,并僅僅是捕獲每個interceptor可能拋出的異常記錄到錯誤日志中而非在向上傳遞。這在使用過程中要特別留意。
需求:
實現一個簡單的雙interceptor組成的攔截鏈。第一個interceptor會在消息發送前將時間戳信息加到消息value的最前部;第二個interceptor會在消息發送后更新成功發送消息數或失敗發送消息數。
程序:
(1)實現時間攔截器:
package com.king.kafka.interceptor;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class TimeInterceptor implements ProducerInterceptor<String, String> {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// 創建一個新的record,把時間戳寫入消息體的最前部
return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
System.currentTimeMillis() + "," + record.value().toString());
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
}
(2)統計發送消息成功和發送失敗消息數,并在producer關閉時打印這兩個計數器
package com.king.kafka.interceptor;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class CounterInterceptor implements ProducerInterceptor<String, String>{
private int errorCounter = 0;
private int successCounter = 0;
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
// 統計成功和失敗的次數
if (exception == null) {
successCounter++;
} else {
errorCounter++;
}
}
@Override
public void close() {
// 保存結果
System.out.println("Successful sent: " + successCounter);
System.out.println("Failed sent: " + errorCounter);
}
}
(3)producer主程序
package com.king.kafka.interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
public class InterceptorProducer {
public static void main(String[] args) throws Exception {
// 1 設置配置信息
Properties props = new Properties();
props.put("bootstrap.servers", "bigdata11:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 2 構建攔截鏈
List<String> interceptors = new ArrayList<>();
interceptors.add("com.king.kafka.interceptor.TimeInterceptor"); interceptors.add("com.king.kafka.interceptor.CounterInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
String topic = "first";
Producer<String, String> producer = new KafkaProducer<>(props);
// 3 發送消息
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message" + i);
producer.send(record);
}
// 4 一定要關閉producer,這樣才會調用interceptor的close方法
producer.close();
}
}
(4)測試
(1)在kafka上啟動消費者,然后運行客戶端java程序。
[root@bigdata11 kafka]$ bin/kafka-console-consumer.sh --zookeeper bigdata11:2181 --from-beginning --topic first
1501904047034,message0
1501904047225,message1
1501904047230,message2
1501904047234,message3
1501904047236,message4
1501904047240,message5
1501904047243,message6
1501904047246,message7
1501904047249,message8
1501904047252,message9
(2)觀察java平臺控制臺輸出數據如下:
Successful sent: 10
Failed sent: 0
? Kafka Streams。Apache Kafka開源項目的一個組成部分。是一個功能強大,易于使用的庫。用于在Kafka上構建高可分布式、拓展性,容錯的應用程序。有如下特點:
1)功能強大
? 高擴展性,彈性,容錯
2)輕量級
? 無需專門的集群
? 一個庫,而不是框架
3)完全集成
? 100%的Kafka 0.10.0版本兼容
? 易于集成到現有的應用程序
4)實時性
? 毫秒級延遲
? 并非微批處理
? 窗口允許亂序數據
? 允許遲到數據
? 當前已經有非常多的流式處理系統,最知名且應用最多的開源流式處理系統有Spark Streaming和Apache Storm。Apache Storm發展多年,應用廣泛,提供記錄級別的處理能力,當前也支持SQL on Stream。而Spark Streaming基于Apache Spark,可以非常方便與圖計算,SQL處理等集成,功能強大,對于熟悉其它Spark應用開發的用戶而言使用門檻低。另外,目前主流的Hadoop發行版,如Cloudera和Hortonworks,都集成了Apache Storm和Apache Spark,使得部署更容易。
? 既然Apache Spark與Apache Storm擁用如此多的優勢,那為何還需要Kafka Stream呢?主要有如下原因。
? 第一,Spark和Storm都是流式處理框架,而Kafka Stream提供的是一個基于Kafka的流式處理類庫。框架要求開發者按照特定的方式去開發邏輯部分,供框架調用。開發者很難了解框架的具體運行方式,從而使得調試成本高,并且使用受限。而Kafka Stream作為流式處理類庫,直接提供具體的類給開發者調用,整個應用的運行方式主要由開發者控制,方便使用和調試。
? 第二,雖然Cloudera與Hortonworks方便了Storm和Spark的部署,但是這些框架的部署仍然相對復雜。而Kafka Stream作為類庫,可以非常方便的嵌入應用程序中,它對應用的打包和部署基本沒有任何要求。
? 第三,就流式處理系統而言,基本都支持Kafka作為數據源。例如Storm具有專門的kafka-spout,而Spark也提供專門的spark-streaming-kafka模塊。事實上,Kafka基本上是主流的流式處理系統的標準數據源。換言之,大部分流式系統中都已部署了Kafka,此時使用Kafka Stream的成本非常低。
? 第四,使用Storm或Spark Streaming時,需要為框架本身的進程預留資源,如Storm的supervisor和Spark on YARN的node manager。即使對于應用實例而言,框架本身也會占用部分資源,如Spark Streaming需要為shuffle和storage預留內存。但是Kafka作為類庫不占用系統資源。
? 第五,由于Kafka本身提供數據持久化,因此Kafka Stream提供滾動部署和滾動升級以及重新計算的能力。
? 第六,由于Kafka Consumer Rebalance機制,Kafka Stream可以在線動態調整并行度。
(1)需求
實時處理單詞帶有”>>>”前綴的內容。例如輸入”test>>>ximenqing”,最終處理成“ximenqing”
(2)代碼程序:
業務處理類:
package com.king.kafka.stream;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
//實現 Processor 接口,用于實現具體業務邏輯
public class LogProcessor implements Processor<byte[], byte[]> {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
//這里是具體業務邏輯
@Override
public void process(byte[] key, byte[] value) {
String input = new String(value);
// 如果包含“>>>”則只保留該標記后面的內容
if (input.contains(">>>")) {
input = input.split(">>>")[1].trim();
// 輸出到下一個topic
context.forward("logProcessor".getBytes(), input.getBytes());
}else{
context.forward("logProcessor".getBytes(), input.getBytes());
}
}
@Override
public void punctuate(long timestamp) {
}
@Override
public void close() {
}
}
主類入口:
package com.king.kafka.stream;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
public class Application {
public static void main(String[] args) {
// 定義輸入的topic
String from = "first";
// 定義輸出的topic
String to = "second";
// 設置參數
Properties settings = new Properties();
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bigdata11:9092");
StreamsConfig config = new StreamsConfig(settings);
// 構建拓撲
TopologyBuilder builder = new TopologyBuilder();
//創建一個builder,指定source ,processor ,sink。并給它們起別名。
//這里的parentName實際上是指定上一層是什么的名字
builder.addSource("SOURCE", from)
.addProcessor("PROCESS", new ProcessorSupplier<byte[], byte[]>() {
@Override
public Processor<byte[], byte[]> get() {
// 具體分析處理
return new LogProcessor();
}
}, "SOURCE")
.addSink("SINK", to, "PROCESS");
//構建處理任務,包括配置以及任務詳情
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
}
}
(3)測試
運行程序,然后在命令行下分別啟動producer和consumer,看情況:
在bigdata13上啟動生產者
[root@bigdata13 kafka]$ bin/kafka-console-producer.sh --broker-list bigdata11:9092 --topic first
>hello>>>world
>h>>>itstar
>hahaha
(6)在bigdata12上啟動消費者
[root@bigdata12 kafka]$ bin/kafka-console-consumer.sh --zookeeper bigdata11:2181 --from-beginning --topic second
world
itstar
hahaha
可以看到消費處理的數據是符合預期的。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。