91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Springboot整合activeMQ之Topic,不懂也得懂了吧

發布時間:2020-08-05 19:01:28 來源:網絡 閱讀:1098 作者:wx5dca43872c5cc 欄目:編程語言

前言?

今天和大家分享springboot整合activeMq之topic(主題) - - 發布/訂閱模式,類似微信公眾號,我們關注公共就可以收到消息,topic需要消費者先訂閱才能收到消息,如果沒有消費者訂閱,生產者產生的消息就是廢消息(發布/訂閱模式,生產者生產了一個消息,可以由多個消費者進行消費)。本次實例支持websocket、消息重發、持久化…
版本信息:SpringBoot2.1.5 ActiveMQ 5.15.10?

消費者工程?

消費者工程目錄??

Springboot整合activeMQ之Topic,不懂也得懂了吧

pom文件?

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency> 

yml文件配置?

server
port: 8085
spring:
activemq:
broker-url: tcp://localhost:61616
user: admin
password: admin
jms:
pub-sub-domain: true
#自己的主題名字
myTopic: boot_actviemq_topic

配置類?

package com.example.topic_customer.config;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

import javax.jms.ConnectionFactory;
import javax.jms.Topic;

/**
 * @Date 2019/11/13  10:22
 * @Desc 消費者配置類
 */
@Configuration
public class BeanConfig {
    @Value("${myTopic}")
    private String myTopic;

    /**
     * websocket配置
     *
     * @return
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

    @Bean
    public Topic topic() {
        return new ActiveMQTopic(myTopic);
    }

    public RedeliveryPolicy redeliveryPolicy() {
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        //是否在每次嘗試重新發送失敗后,增長這個等待時間
        redeliveryPolicy.setUseExponentialBackOff(true);
        //重發次數,默認為6次,這里設置為10次,-1表示不限次數
        redeliveryPolicy.setMaximumRedeliveries(-1);
        //重發時間間隔,默認為1毫秒,設置為10000毫秒
        redeliveryPolicy.setInitialRedeliveryDelay(10000);
        //表示沒有拖延只有UseExponentialBackOff(true)為true時生效
        //第一次失敗后重新發送之前等待10000毫秒,第二次失敗再等待10000 * 2毫秒
        //第三次翻倍10000 * 2 * 2,以此類推
        redeliveryPolicy.setBackOffMultiplier(2);
        //是否避免消息碰撞
        redeliveryPolicy.setUseCollisionAvoidance(true);
        //設置重發最大拖延時間360000毫秒 表示沒有拖延只有UseExponentialBackOff(true)為true時生效
        redeliveryPolicy.setMaximumRedeliveryDelay(360000);
        return redeliveryPolicy;
    }

    public ConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        //設置重發屬性
        connectionFactory.setRedeliveryPolicy(redeliveryPolicy());
        return connectionFactory;
    }

    /**
     * JMS 隊列的監聽容器工廠
     */
    @Bean(name = "jmsTopicListener")
    public DefaultJmsListenerContainerFactory jmsTopicListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory =
                new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setPubSubDomain(true);
        factory.setSessionTransacted(true);
        factory.setAutoStartup(true);
        //開啟持久化訂閱
        factory.setSubscriptionDurable(true);
        //重連間隔時間
        factory.setRecoveryInterval(1000L);
        factory.setClientId("topic_provider:zb1");
        return factory;
    }
} 

設置消費者持久化主要有兩點:?

  1. //開啟持久化訂閱factory.setSubscriptionDurable(true);
    2.factory.setClientId(“topic_provider:zb1”); // 這個可以隨便設置?

.TopicCustomer類?

package com.example.topic_customer.customer;

import lombok.Data;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.TextMessage;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.server.ServerEndpoint;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * @Date 2019/11/13  13:31
 * @Desc
 */
@Component
@ServerEndpoint("/websocket")
@Data
public class TopicCustomer {
    /**
     * 每個客戶端都會有相應的session,服務端可以發送相關消息
     */
    private javax.websocket.Session session;

    /**
     * J.U.C包下線程安全的類,主要用來存放每個客戶端對應的webSocket連接
     */
    private static CopyOnWriteArraySet<TopicCustomer> copyOnWriteArraySet = new CopyOnWriteArraySet<>();

    @OnOpen
    public void onOpen(javax.websocket.Session session) {
        this.session = session;
        copyOnWriteArraySet.add(this);
    }

    @OnClose
    public void onClose() {
        copyOnWriteArraySet.remove(this);
    }

    @OnMessage
    public void onMessage(String message) {
    }

    @OnError
    public void onError(javax.websocket.Session session, Throwable error) {
        error.printStackTrace();
    }

    @JmsListener(destination = "${myTopic}", containerFactory = "jmsTopicListener")
    public void receive(TextMessage textMessage, javax.jms.Session session) throws JMSException {
        //遍歷客戶端
        for (TopicCustomer webSocket : copyOnWriteArraySet) {
            try {
                //服務器主動推送
                webSocket.session.getBasicRemote().sendText(textMessage.getText());
                System.out.println("-- 接收到topic持久化消息 -- " + textMessage.getText());
            } catch (Exception e) {
                System.out.println("-----測試重發-----");
                session.rollback();// 此不可省略 重發信息使用
            }
        }
    }
}

啟動類

package com.example.topic_customer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class TopicCustomerApplication {

    public static void main(String[] args) {
        SpringApplication.run(TopicCustomerApplication.class, args);
    }
} 

消費者啟動成功后mq的截圖:?
Springboot整合activeMQ之Topic,不懂也得懂了吧

生產者工程?

生產者工程目錄?

Springboot整合activeMQ之Topic,不懂也得懂了吧
yml配置文件?

server:
  port: 8084
spring:
  activemq:
    broker-url: tcp://localhost:61616
    user: admin
    password: admin
  jms:
    pub-sub-domain: true

myTopic: boot_actviemq_topic

配置類?

package com.example.topicprovider.config;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.stereotype.Component;

import javax.jms.ConnectionFactory;
import javax.jms.Topic;

/**
 * @Date 2019/11/13  10:22
 * @Desc 生產者配置文件
 */
@Component
public class BeanConfig {
    @Value("${myTopic}")
    private String myTopic;

    public RedeliveryPolicy redeliveryPolicy(){
        RedeliveryPolicy redeliveryPolicy=   new RedeliveryPolicy();
        //是否在每次嘗試重新發送失敗后,增長這個等待時間
        redeliveryPolicy.setUseExponentialBackOff(true);
        //重發次數,默認為6次,這里設置為10次,-1表示不限次數
        redeliveryPolicy.setMaximumRedeliveries(-1);
        //重發時間間隔,默認為1毫秒,設置為10000毫秒
        redeliveryPolicy.setInitialRedeliveryDelay(10000);
        //表示沒有拖延只有UseExponentialBackOff(true)為true時生效
        //第一次失敗后重新發送之前等待10000毫秒,第二次失敗再等待10000 * 2毫秒
        //第三次翻倍10000 * 2 * 2,以此類推
        redeliveryPolicy.setBackOffMultiplier(2);
        //是否避免消息碰撞
        redeliveryPolicy.setUseCollisionAvoidance(true);
        //設置重發最大拖延時間360000毫秒 表示沒有拖延只有UseExponentialBackOff(true)為true時生效
        redeliveryPolicy.setMaximumRedeliveryDelay(360000);
        return redeliveryPolicy;
    }

    @Bean
    public Topic topic() {
        return new ActiveMQTopic(myTopic);
    }

    public ConnectionFactory connectionFactory(){
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        //設置重發屬性
        connectionFactory.setRedeliveryPolicy(redeliveryPolicy());
        return connectionFactory;
    }

    /**
     * JMS 隊列的監聽容器工廠
     */
    @Bean(name = "jmsTopicListener")
    public DefaultJmsListenerContainerFactory jmsTopicListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory =
                new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setPubSubDomain(true);
        factory.setSessionTransacted(true);
        factory.setAutoStartup(true);
        //開啟持久化訂閱
        factory.setSubscriptionDurable(true);
        //重連間隔時間
        factory.setRecoveryInterval(1000L);
        return factory;
    }
}

TopicProvider類?

package com.example.topicprovider.topic_provider;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.jms.Topic;
import java.util.UUID;

/**
 * @Date 2019/11/13  10:25
 * @Desc
 */
@Component
public class TopicProvider {
    @Autowired
    private Topic topic;
    @Autowired
    private JmsTemplate jmsTemplate;

    @Scheduled(fixedDelay = 10000)
    private void produceMsg() {
        jmsTemplate.convertAndSend(topic, "主題生產者" + UUID.randomUUID().toString().substring(1, 7));
        System.out.println( jmsTemplate.getDeliveryMode());
        System.out.println("主題生產者1");
    }
}

啟動類?

package com.example.topicprovider;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class TopicProviderApplication {

    public static void main(String[] args) {
        SpringApplication.run(TopicProviderApplication.class, args);
    }
}

啟動成功后結果圖:

最后

喜歡的可以關注我的公眾號:java小瓜哥的分享平臺。謝謝支持!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

宿松县| 余干县| 临海市| 蒙阴县| 安阳县| 仪征市| 彩票| 缙云县| 大姚县| 阳高县| 武汉市| 永平县| 府谷县| 佛学| 武强县| 郸城县| 海阳市| 邵阳县| 西林县| 九龙坡区| 额尔古纳市| 讷河市| 侯马市| 沧州市| 定兴县| 墨竹工卡县| 江口县| 夏邑县| 新竹县| 沈丘县| 井陉县| 南宁市| 抚顺县| 开远市| 南京市| 宣威市| 黑山县| 湘乡市| 景洪市| 应用必备| 青田县|