91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Springboot如何整合RocketMQ收發消息

發布時間:2021-12-29 12:45:16 來源:億速云 閱讀:226 作者:小新 欄目:開發技術

這篇文章將為大家詳細講解有關Springboot如何整合RocketMQ收發消息,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。

Springboot 整合 RocketMQ 收發消息

創建springboot項目

pom.xml添加rocketmq-spring-boot-starter依賴。

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.0</version>
</dependency>

yml 配置

application.yml

rocketmq:
  name-server: 192.168.64.141:9876

application-demo1.yml

使用 demo1 profile 指定生產者組組名

rocketmq:
  producer:
    group: producer-demo1

application-demo2.yml

使用 demo2 profile 指定生產者組組名

rocketmq:
  producer:
    group: producer-demo2

測試

demo 1

  • 發送普通消息

  • 發送 Spring 的通用 Message 對象

  • 發送異步消息

  • 發送順序消息

生產者

package cn.tedu.demo2.m1;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class Producer {
    @Autowired
    private RocketMQTemplate t ;
    public  void send(){
        //發送同步消息
        t.convertAndSend("Topic1:TagA", "Hello world! ");
        //發送spring的Message
        Message<String> message = MessageBuilder.withPayload("Hello Spring message! ").build();
        t.send("Topic1:TagA",message);
        //發送異步消息
        t.asyncSend("Topic1:TagA", "hello world asyn", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("發送成功");
            }
            @Override
            public void onException(Throwable throwable) {
                System.out.println("發送失敗");
            }
        });
        //發送順序消息
        t.syncSendOrderly("Topic1", "98456237,創建", "98456237");
        t.syncSendOrderly("Topic1", "98456237,支付", "98456237");
        t.syncSendOrderly("Topic1", "98456237,完成", "98456237");
    }
}

消費者

package cn.tedu.demo2.m1;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "Topic1",consumerGroup = "consumer-demo1")
public class Consumer  implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("收到"+s);
    }
}

主類

package cn.tedu.demo2.m1;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Main {
    public static void main(String[] args) {
        SpringApplication.run(Main.class, args);
    }
}

測試類

需要放在 test 文件夾

激活 demo1 profile  @ActiveProfiles("demo1")

package cn.tedu.demo2.m1;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
@SpringBootTest
@ActiveProfiles("demo1")
public class Test1 {
    @Autowired
    private  Producer producer;
    @Test
    public void test1(){
        producer.send();
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

demo 2

發送事務消息

生產者

package cn.tedu.demo2.m2;

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component

public class Producer {

    @Autowired
    private RocketMQTemplate t;

    public void send(){
        Message<String> message = MessageBuilder.withPayload("Hello world").build();
        //一旦發送消息,則執行監聽器
        t.sendMessageInTransaction("Topic2",message,null);
    }
    @RocketMQTransactionListener
    class Lis implements RocketMQLocalTransactionListener {
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
            System.out.println("執行本地事務");
            return RocketMQLocalTransactionState.UNKNOWN;
        }

        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
            System.out.println("執行事務回查");
            return RocketMQLocalTransactionState.COMMIT;
        }
    }

}

消費者

package cn.tedu.demo2.m2;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "Topic2",consumerGroup = "consumer-demo2")
public class Consumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("收到"+s);
    }
}

主類

package cn.tedu.demo2.m2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Main {
    public static void main(String[] args) {
        SpringApplication.run(Main.class, args);
    }
}

測試類

package cn.tedu.demo2.m2;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
@SpringBootTest
@ActiveProfiles("demo2")
public class Test2 {
    @Autowired
    private  Producer producer;
    @Test
    public void  test1(){
        producer.send();
        //為了能夠收到消費者消費的數據,這里通過休眠模擬等待時間
        try {
            Thread.sleep(30000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

關于“Springboot如何整合RocketMQ收發消息”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

卢湾区| 呼图壁县| 上饶市| 利川市| 天祝| 汝南县| 得荣县| 阜康市| 阿克陶县| 西乌珠穆沁旗| 古交市| 静海县| 太白县| 贡觉县| 长岛县| 灵台县| 天门市| 黄陵县| 沁阳市| 武山县| 兴化市| 南开区| 文安县| 定西市| 天全县| 达孜县| 鄂州市| 英山县| 浠水县| 宝山区| 迭部县| 临清市| 滨海县| 合阳县| 福建省| 三都| 潜江市| 抚顺市| 东兰县| 上蔡县| 航空|