您好,登錄后才能下訂單哦!
本文源碼:GitHub·點這里 || GitEE·點這里
-- 下載
wget http://mirror.bit.edu.cn/apache/kafka/2.2.0/kafka_2.11-2.2.0.tgz
-- 解壓
tar -zxvf kafka_2.11-2.2.0.tgz
-- 重命名
mv kafka_2.11-2.2.0 kafka2.11
kafka依賴ZooKeeper服務,需要本地安裝并啟動ZooKeeper。
參考文章:Linux系統搭建ZooKeeper3.4中間件,常用命令總結
-- 執行位置
-- /usr/local/mysoft/kafka2.11
bin/kafka-server-start.sh config/server.properties
ps -aux |grep kafka
-- 基礎路徑
-- /usr/local/mysoft/kafka2.11/config
vim server.properties
-- 添加下面注釋
advertised.listeners=PLAINTEXT://192.168.72.130:9092
Kafka是由Apache開源,具有分布式、分區的、多副本的、多訂閱者,基于Zookeeper協調的分布式處理平臺,由Scala和Java語言編寫。通常用來搜集用戶在應用服務中產生的動作日志數據,并高速的處理。日志類的數據需要高吞吐量的性能要求,對于像Hadoop一樣的日志數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的并行加載機制來統一線上和離線的消息處理,也是為了通過集群來提供實時的消息。
(1)、通過磁盤數據結構提供消息的持久化,消息存儲也能夠保持長時間穩定性;
(2)、高吞吐量,即使是非常普通的硬件Kafka也可以支持每秒超高的并發量;
(3)、支持通過Kafka服務器和消費機集群來分區消息;
(4)、支持Hadoop并行數據加載;
(5)、API包封裝的非常好,簡單易用,上手快 ;
(6)、分布式消息隊列。Kafka對消息保存時根據Topic進行歸類,發送消息者稱為Producer,消息接受者稱為Consumer;
點對點模型通常是一個基于拉取或者輪詢的消息傳遞模型,消費者主動拉取數據,消息收到后從隊列移除消息,這種模型不是將消息推送到客戶端,而是從隊列中請求消息。特點是發送到隊列的消息被一個且只有一個消費者接收處理,即使有多個消費者監聽隊列也是如此。
發布訂閱模型則是一個基于推送的消息傳送模型,消息產生后,推送給所有訂閱者。發布訂閱模型可以有多種不同的訂閱者,臨時訂閱者只在主動監聽主題時才接收消息,而持久訂閱者則監聽主題的所有消息,即使當前訂閱者不可用,處于離線狀態。
一臺kafka服務器就是一個broker。一個集群由多個broker組成。一個broker可以容納多個topic。
消息生產者,就是向kafka broker發消息的客戶端。
消息消費者,向kafka broker取消息的客戶端。
每條發布到Kafka集群的消息都有一個類別,這個類別被稱為Topic,可以理解為一個隊列。
每個Consumer屬于一個特定的Consumer Group,可為每個Consumer指定group name,若不指定group name則屬于默認的分組。
一個龐大大的topic可以分布到多個broker上,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id。kafka只保證按一個partition中的順序將消息發給consumer,不保證一個topic的整體的順序。Partition是物理上的概念,方便在集群中擴展,提高并發。
消息生產者 : kafka-producer-server
kafka-consumer-server
<!-- SpringBoot依賴 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- kafka 依賴 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.4.RELEASE</version>
</dependency>
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
@RestController
public class ProducerWeb {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping("/send")
public String sendMsg () {
MsgLog msgLog = new MsgLog(1,"消息生成",
1,"消息日志",new Date()) ;
String msg = JSON.toJSONString(msgLog) ;
// 這里Topic如果不存在,會自動創建
kafkaTemplate.send("cicada-topic", msg);
return msg ;
}
}
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
consumer:
group-id: test-consumer-group
@Component
public class ConsumerMsg {
private static Logger LOGGER = LoggerFactory.getLogger(ConsumerMsg.class);
@KafkaListener(topics = "cicada-topic")
public void listenMsg (ConsumerRecord<?,String> record) {
String value = record.value();
LOGGER.info("ConsumerMsg====>>"+value);
}
}
生產者基于推push推模式將消息發布到broker,每條消息都被追加到分區patition中,屬于磁盤順序寫,效率比隨機寫內存要高,保障kafka高吞吐量。
消息發送時都被發送到一個topic,而topic是由Partition Logs(分區日志)組成,其組織結構如下圖所示:
每個Partition中的消息都是有序的,生產的消息被不斷追加到Partitionlog上,其中的每一個消息都被賦予了一個唯一的offset值。每個Partition可以通過調整以適配它所在的機器,而一個topic又可以有多個Partition組成,因此整個集群就可以適應任意大小的數據。分區的原則:指定patition,則直接使用;未指定patition但指定key,通過對key的value進行hash出一個patition;patition和key都未指定,使用輪詢選出一個patition。
消費者是以consumer group消費者組的方式工作,由一個或者多個消費者組成一個組,共同消費一個topic。每個分區在同一時間只能由group中的一個消費者讀取,但是多個group可以同時消費一個partition。
消費者采用pull拉模式從broker中讀取數據。對于Kafka而言,pull模式更合適,它可簡化broker的設計,consumer可自主控制消費消息的速率,同時consumer可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實現不同的數據傳輸場景。
GitHub·地址
https://github.com/cicadasmile/middle-ware-parent
GitEE·地址
https://gitee.com/cicadasmile/middle-ware-parent
推薦閱讀:SpringBoot2整合中間件
《整合 shard-jdbc 中間件,實現數據分庫分表》
《整合 JavaMail ,實現異步發送郵件功能》
《整合 RocketMQ ,實現請求異步處理》
《整合 Swagger2 ,構建接口管理界面》
《整合 QuartJob ,實現定時器實時管理》
《整合 Redis集群 ,實現消息隊列場景》
《整合 Dubbo框架 ,實現RPC服務遠程調用》
《整合 ElasticSearch框架,實現高性能搜索引擎》
《整合 JWT 框架,解決Token跨域驗證問題》
《整合 FastDFS 中間件,實現文件分布管理》
《整合 Shiro 框架,實現用戶權限管理》
《整合 Security 框架,實現用戶權限管理》
《整合 ClickHouse數據庫,實現數據高性能查詢分析》
《整合 Drools規則引擎,實現高效的業務規則》
《整合MybatisPlus增強插件,配置多數據源》
《整合 Zookeeper組件,管理架構中服務協調》
《整合Nacos組件,環境搭建和入門案例詳解》
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。