91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何使用RabbitMQ消息中間件

發布時間:2021-04-15 15:49:01 來源:億速云 閱讀:183 作者:Leah 欄目:編程語言

如何使用RabbitMQ消息中間件?相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。

RabbitMQ 是使用 Erlang 語言開發的消息中間件, 其遵循了高級消息隊列協議(Advanced Message Queuing Protocol, AMQP)。

與 Kafka 等消息隊列相比,RabbitMQ 最大的優勢在于其較高的可靠性:

  • 提供確認(ACK)和重傳機制保證消息完成消費, 消費者異常不會導致消息丟失

  • 提供消息持久化機制, broker 崩潰不會導致消息丟失

  • 集群模式下工作, 保證高可用

因為具有較高可靠性和一致性, RabbitMQ 可以勝任訂單處理、秒殺等一致性要求較高的業務場景。

RabbitMQ 概念與機制

RabbitMQ 中的概念模型:

  • Broker: 消息中間件實例, 可能是單個節點也可能是運行在多節點集群上的邏輯實體

  • 消息(Message): 消息由消息頭和消息體兩部分組成。消息頭中包括routing-key、priority等標準消息頭以及其它自定義消息頭,用于定義RabbitMQ對消息行為。消息體是字節流,包含消息內容。

  • 連接(Connection): 客戶端與 Broker 之間的 TCP連接

  • 信道(Channel): Channel 是建立在 TCP 連接上的邏輯(虛擬)連接。多個 Channel 復用同一個 TCP 連接, 以避免建立 TCP 連接的巨大開銷。 RabbitMQ 官方要求每個線程使用獨立的 Channel, 禁止多個線程共用 Channel。

  • 生產者(Publisher): 發送消息的客戶端線程

  • 消費者(Consumer): 處理消息的客戶端線程

  • 交換機(Exchange): 交換機負責將消息投遞到相應的隊列

  • 隊列(Queue): 接收并保存交換機投遞的消息,直至被消費者成功消費。邏輯結構遵循先進先出FIFO。

  • 綁定(Binding): 將隊列(Queue)注冊到交換機(Exchange)的路由表

  • 虛擬主機(Vhost): 每個Broker下可建立多個vhost, 每個 vhost 可建立獨立的 Exchange、Queue、綁定及權限系統。同一個 Broker 下的 vhost 共享 Connection、Channel 和 用戶系統,就是說可以使用同一個用戶身份使用同一個 Channel 訪問不同 vhost。

交換機(Exchange)

生產者發送的消息會首先送到交換機(Exchange), 交換機根據自身類型和消息的 routing-key 等信息將消息投遞到綁定的消息隊列中。

RabbitMQ中的四種標準交換機:

direct: 如果消息的 routing-key 與隊列的 binding-key 完全相同,direct類型的交換機則會將消息投遞到該隊列中。

  • 多個隊列可以使用相同的 binding-key 綁定到同一個 direct 交換機,direct 交換機會把消息投遞到所有 binding-key 與消息 routing-key 相同的隊列

topic: 允許隊列的 binding-key 中包含通配符*和#, topic 交換機會將消息投遞到 binding-key 與 routing-key 匹配的隊列中。

  • 通配符按照關鍵字進行匹配,如news.cn.a中的關鍵字是news、cn和a,即關鍵字按照.分割

  • #通配符匹配0個或多個關鍵字, news.#.a可以匹配news.a, news.cn.a和news.asia.cn.a等

  • *通配符匹配一個關鍵字, news.*.a匹配news.cn.a不匹配news.a、news.asia.cn.a

fanout: fanout 交換機不進行任何匹配, 將消息投遞到所有綁定的隊列

header: header 交換機根據消息頭進行投遞,現在已較少使用

我們可以使用 RabbitMQ 的插件機制使用第三方交換機或自行開發交換機。如實現延時投遞的delayed-message-exchange。

消息頭中的delivery-mode可以設置為 persistent(持久化) 或者 transient(易失)。 Exchange 和 Queue 在處理持久化的消息時都會先將消息寫入磁盤中再進行下一步處理, 即使 RabbitMQ 崩潰也不會丟失。

消費者客戶端通常使用的channel.basicConsume使用推(push)模式投遞消息, 即當有新消息時 Broker 通過 channel 主動向客戶端發送消息。客戶端也可以使用channel.basicGet從 Broker 拉取消息。

ACK機制

RabbitMQ 提供了確認送達(acknowledge)機制保證消息被正確處理不會丟失。

確認送達的回執有三種:

  • ACK: 消息已被成功處理

  • NACK: 消息處理異常, 需要重新投遞

  • REJECT: 消息非法, 丟棄消息

RabbitMQ 的 Queue 可以設置 no_ack=true, 則消息被投遞后即刪除不等待回執。

channel.basicConsume 可以指定auto_ack模式,若auto_ack=true當客戶端收到完整消息后即會自動發出ACK回執,否則必須顯式的發出回執。

Java 代碼示例

首先安裝并啟動RabbitMQ實例, Mac用戶可以使用 Homebrew 進行安裝:

brew install rabbitmq

啟動服務:

brew services start rabbitmq

或者使用官方docker鏡像:

docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3-management

RabbitMQ官網提供了Ubuntu、RPM以及Windows等多種平臺安裝方式。

RabbitMQ默認TCP端口為5672, Web控制臺默認端口15672。

在Maven中添加依賴:

<dependency>
 <groupId>com.rabbitmq</groupId>
 <artifactId>amqp-client</artifactId>
 <version>5.5.1</version>
</dependency>

編寫生產者:

package rabbit;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @author finley
 */
public class RabbitProducer {

 public static void main(String[] args) throws IOException, TimeoutException {
  ConnectionFactory factory = new ConnectionFactory();
  factory.setUsername("guest");
  factory.setPassword("guest");
  factory.setHost("localhost");
  try (Connection conn = factory.newConnection();
    Channel channel = conn.createChannel()) {
   String exchangeName = "test-exchange";
   channel.exchangeDeclare(exchangeName, "direct", true);

   String routingKey = "hello";

   byte[] msg = "hello world".getBytes();
   AMQP.BasicProperties.Builder propsBuilder = new AMQP.BasicProperties.Builder();
   propsBuilder.deliveryMode(2); // persistent
   propsBuilder.priority(0); // normal
   propsBuilder.contentType("text/plain");
   channel.basicPublish(exchangeName, routingKey, propsBuilder.build(), msg);
  }
 }
}

編寫消費者:

package rabbit;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.*;

/**
 * @author finley
 */
public class RabbitConsumer {

 public static void main(String[] args) throws IOException, TimeoutException {
  ConnectionFactory factory = new ConnectionFactory();
  factory.setUsername("guest");
  factory.setPassword("guest");
  factory.setHost("localhost");
  try (Connection conn = factory.newConnection();
    Channel channel = conn.createChannel()) {
   String exchangeName = "test-exchange";
   channel.exchangeDeclare(exchangeName, "direct", true);

   String queueName = channel.queueDeclare().getQueue();
   String bindingKey = "hello";
   channel.queueBind(queueName, exchangeName, bindingKey);

   while(true) {
    channel.basicConsume(queueName, false, "", new DefaultConsumer(channel) {
     @Override
     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
      String routingKey = envelope.getRoutingKey();
      String contentType = properties.getContentType();
      String bodyStr = new String(body, "UTF-8");
      System.out.println("routingKey: " + routingKey + ", contentType: " + contentType + ", body: " + bodyStr);
      long deliveryTag = envelope.getDeliveryTag();
      channel.basicAck(deliveryTag, false);
     }
    });
   }
  }
 }

}

看完上述內容,你們掌握如何使用RabbitMQ消息中間件的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

罗田县| 乌鲁木齐县| 凤凰县| 蒙自县| 福贡县| 泰安市| 吴忠市| 淅川县| 克山县| 汉寿县| 琼海市| 西和县| 涟水县| 中牟县| 九龙县| 开远市| 丘北县| 晴隆县| 大新县| 襄樊市| 东阿县| 建德市| 元氏县| 镶黄旗| 平安县| 鹿邑县| 平泉县| 中江县| 金门县| 富源县| 淮南市| 乐至县| 沧源| 开封市| 甘泉县| 德阳市| 页游| 逊克县| 贵港市| 黎平县| 思茅市|