91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

springboot是如何集成kafka

發布時間:2022-02-25 15:03:18 來源:億速云 閱讀:170 作者:小新 欄目:開發技術

這篇文章主要介紹了springboot是如何集成kafka,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

為什么使用kafka?

  • 削峰填谷。緩沖上下游瞬時突發流量,保護 “脆弱” 的下游系統不被壓垮,避免引發全鏈路服務 “雪崩”。

  • 系統解耦。發送方和接收方的松耦合,一定程度簡化了開發成本,減少了系統間不必要的直接依賴。

  • 異步通信:消息隊列允許用戶把消息放入隊列但不立即處理它。

  • 可恢復性:即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復后被處理。

業務場景

  • 一些同步業務流程的非核心邏輯,對時間要求不是特別高,可以解耦異步來執行

  • 系統日志收集,采集并同步到kafka,一般采用ELK組合玩法

  • 一些大數據平臺,用于各個系統間數據傳遞

基本架構

Kafka 運行在一個由一臺或多臺服務器組成的集群上,并且分區可以跨集群節點分布

1、Producer 生產消息,發送到Broker中

2、Leader狀態的Broker接收消息,寫入到相應topic中。在一個分區內,這些消息被索引并連同時間戳存儲在一起

3、Leader狀態的Broker接收完畢以后,傳給Follow狀態的Broker作為副本備份

4、 Consumer 消費者的進程可以從分區訂閱,并消費消息

常用術語

  • Broker。負責接收和處理客戶端發送過來的請求,以及對消息進行持久化。雖然多個 Broker 進程能夠運行在同一臺機器上,但更常見的做法是將不同的 Broker 分散運行在不同的機器上

  • 主題:Topic。主題是承載消息的邏輯容器,在實際使用中多用來區分具體的業務。

  • 分區:Partition。一個有序不變的消息序列。每個主題下可以有多個分區。

  • 消息:這里的消息就是指 Kafka 處理的主要對象。

  • 消息位移:Offset。表示分區中每條消息的位置信息,是一個單調遞增且不變的值。

  • 副本:Replica。Kafka 中同一條消息能夠被拷貝到多個地方以提供數據冗余,這些地方就是所謂的副本。副本還分為領導者副本和追隨者副本,各自有不同的角色劃分。每個分區可配置多個副本實現高可用。一個分區的N個副本一定在N個不同的Broker上。

  • Leader:每個分區多個副本的“主”副本,生產者發送數據的對象,以及消費者消費數據的對象,都是 Leader。

  • Follower:每個分區多個副本的“從”副本,實時從 Leader 中同步數據,保持和 Leader 數據的同步。Leader 發生故障時,某個 Follower 還會成為新的 Leader。

  • 生產者:Producer。向主題發布新消息的應用程序。

  • 消費者:Consumer。從主題訂閱新消息的應用程序。

  • 消費者位移:Consumer Offset。表示消費者消費進度,每個消費者都有自己的消費者位移。offset保存在broker端的內部topic中,不是在clients中保存

  • 消費者組:Consumer Group。多個消費者實例共同組成的一個組,同時消費多個分區以實現高吞吐。

  • 重平衡:Rebalance。消費者組內某個消費者實例掛掉后,其他消費者實例自動重新分配訂閱主題分區的過程。Rebalance 是 Kafka 消費者端實現高可用的重要手段。

代碼演示

外部依賴:

在 pom.xml 中添加 Kafka 依賴:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

由于spring-boot-starter-parent 指定的版本號是2.1.5.RELEASE,spring boot 會對外部框架的版本號統一管理,spring-kafka 引入的版本是 2.2.6.RELEASE

配置文件:

在配置文件 application.yaml 中配置 Kafka 的相關參數,具體內容如下:

Spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      retries: 3  # 生產者發送失敗時,重試次數
      batch-size: 16384
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer # 生產者消息key和消息value的序列化處理類
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: tomge-consumer-group  # 默認消費者group id
      auto-offset-reset: earliest
      enable-auto-commit: true
      auto-commit-interval: 100
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

對應的配置類 org.springframework.boot.autoconfigure.kafka.KafkaProperties,來初始化kafka相關的bean實例對象,并注冊到spring容器中。

發送消息:

Spring Boot 作為一款支持快速開發的集成性框架,同樣提供了一批以 -Template 命名的模板工具類用于實現消息通信。對于 Kafka 而言,這個工具類就是KafkaTemplate

KafkaTemplate 提供了一系列 send 方法用來發送消息,典型的 send 方法定義如下代碼所示:

public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
 。。。。 省略
}

生產端提供了一個restful接口,模擬發送一條創建新用戶消息。

@GetMapping("/add_user")
public Object add() {
    try {
        Long id = Long.valueOf(new Random().nextInt(1000));
        User user = User.builder().id(id).userName("TomGE").age(29).address("上海").build();
        ListenableFuture<SendResult> listenableFuture = kafkaTemplate.send(addUserTopic, JSON.toJSONString(user));
        
        // 提供回調方法,可以監控消息的成功或失敗的后續處理
        listenableFuture.addCallback(new ListenableFutureCallback<SendResult>() {
            @Override
            public void onFailure(Throwable throwable) {
                System.out.println("發送消息失敗," + throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult sendResult) {
                // 消息發送到的topic
                String topic = sendResult.getRecordMetadata().topic();
                // 消息發送到的分區
                int partition = sendResult.getRecordMetadata().partition();
                // 消息在分區內的offset
                long offset = sendResult.getRecordMetadata().offset();
                System.out.println(String.format("發送消息成功,topc:%s, partition: %s, offset:%s ", topic, partition, offset));
            }
        });
        return "消息發送成功";
    } catch (Exception e) {
        e.printStackTrace();
        return "消息發送失敗";
    }
}

實際上開發使用的Kafka默認允許自動創建Topic,創建Topic時默認的分區數量是1,可以通過server.properties文件中的num.partitions=1修改默認分區數量。在生產環境中通常會關閉自動創建功能,Topic需要由運維人員先創建好。

消費消息:

在 Kafka 中消息通過服務器推送給各個消費者,而 Kafka 的消費者在消費消息時,需要提供一個監聽器(Listener)對某個 Topic 實現監聽,從而獲取消息,這也是 Kafka 消費消息的唯一方式。

定義一個消費類,在處理具體消息業務邏輯的方法上添加 @KafkaListener 注解,并配置要消費的topic,代碼如下所示:

@Component
public class UserConsumer {

    @KafkaListener(topics = "add_user")
    public void receiveMesage(String content) {
        System.out.println("消費消息:" + content);
    }
}

是不是很簡單,添加kafka依賴、使用KafkaTemplate、@KafkaListener注解就完成消息的生產和消費,其實是SpringBoot在背后默默的做了很多工作

感謝你能夠認真閱讀完這篇文章,希望小編分享的“springboot是如何集成kafka”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,更多相關知識等著你來學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

志丹县| 壤塘县| 镶黄旗| 富顺县| 黄山市| 肥西县| 鱼台县| 卓尼县| 丰都县| 新蔡县| 克什克腾旗| 黎川县| 嘉善县| 琼中| 淮滨县| 兴安盟| 砚山县| 平泉县| 英德市| 手游| 商城县| 乐东| 星座| 湘潭县| 双流县| 曲松县| 仙游县| 汝州市| 枣庄市| 崇明县| 西峡县| 东乌珠穆沁旗| 德保县| 台山市| 溧水县| 武邑县| 焉耆| 建平县| 永平县| 株洲市| 乃东县|