91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

activemq特性是什么

發布時間:2021-12-16 16:56:36 來源:億速云 閱讀:167 作者:iii 欄目:云計算

這篇文章主要介紹“activemq特性是什么”,在日常操作中,相信很多人在activemq特性是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”activemq特性是什么”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

activemq特點:用通配符訂閱多個destination,用組合發布多重destionation
activemq支持destination的層次結構【topic和queen】便于歸類和管理。
通配符有三個:
.  用來分隔路徑
* 用來匹配路徑中的一節
> 用來匹配任意節的路徑
opics: <sport><League>.<team>  
。例如: football.division.leeds。 如果leeds 參加兩種運動--Scccer 和 Rugby,為了方便,我們希望通過一個消息消費者而看到Leeds兩種運動的最新戰績,這個時候,通配符就有用武之地了
.  : used to separate elements in the destination name
*  : used to match one element    
>  : match one or all trailing elements
所以,對于上面的例子, 你可以訂閱這樣的主題: *.*.Leeds
如果你想知道division1 這個賽區的所有分數, 你可以訂閱這個: soccer.division1.*
如果你想知道Rugby的分數: 你可以訂閱這個: rugby.>.
然而, 通配符中是為消費者服務的,如果你發送了這樣的一個主題: rugby.>., 這個消息僅會發送到命名了rugby.>.的主題,并不是所有的主題都是以rugby開頭的。
這里有一種  方法,使消息生產者能將一條
消息發送到多個目的地。通過使用   composite destination。
將同一條消息發送到不同的目的地是很有用的。 比如一個用來存儲信息的應用,會發送一條消息給隊列
同時也要將這條消息廣播給監控的所有系統。通常,你會通過用兩個producer 發送兩次消息來達到這個目的。composite destination就是用來解決這種情況的
例如,如果你創建了名子為: store.order.backoffice,store.order.warehouse 的 Queue,這樣 就會發送同時兩個Queue。
訂閱信息     解釋
PRICE.>     Any price for any product on any exchange
PRICE.STOCK.>     Any price for a stock on any exchange
PRICE.STOCK.NASDAQ.*     Any stock price on NASDAQ
PRICE.STOCK.*.IBM     Any IBM stock price on any exchange
從5.5 版本以后,可以自定義路徑分隔符:

    <plugins>
       .....
       <destinationPathSeparatorPlugin/>
    </plugins>

此時FOO.BAR.* 可以表示為 FOO/BAR/*
也可以通過pathSeparator 屬性定義其他符號位路徑分隔符。
   public void subscribeToLeeds() throws JMSException {
        String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic allLeeds = session.createTopic("*.*.Leeds");
        MessageConsumer consumer = session.createConsumer(allLeeds);
        Message result = consumer.receive();
    }
    11.1.2發送一個message到多重destinations
    發送相同的message到不同的destination上:案列發送一個[queen,opic]組合模式,默認的組合destination用,分隔
    列如store.order.backoffice,store.order.warehouse
     String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue ordersDestination = session.createQueue("store.orders, topic://store.orders");
        MessageProducer producer = session.createProducer(ordersDestination);
        Message order = session.createObjectMessage();
        producer.send(order);
11.2通知消息
單的說就是實現了ActiveMQ的broker上各種操作的記錄跟蹤和通知。

使用這個功能,你可以實時的知道broker上

    創建或銷毀了連接,
    添加或刪除了生存者或消費者,
    添加或刪除了主題或隊列,
    有消息發送和接收,
    什么時候有慢消費者,
    什么時候有快生產者
    什么時候什么消息被丟棄
    什么時候broker被添加到集群(主從或是網絡連接)

這個機制是ActiveMQ對JMS協議的重要補充,也是基于JMS實現的ActiveMQ的可管理性的一部分。多個ActiveMQ的相互協調和互操作的基礎設置。
 String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic connectionAdvisory = org.apache.activemq.advisory.AdvisorySupport.CONNECTION_ADVISORY_TOPIC;
        MessageConsumer consumer = session.createConsumer(connectionAdvisory);
        ActiveMQMessage message = (ActiveMQMessage) consumer.receive();
        DataStructure data = (DataStructure) message.getDataStructure();
        if (data.getDataStructureType() == ConnectionInfo.DATA_STRUCTURE_TYPE) {
            ConnectionInfo connectionInfo = (ConnectionInfo) data;
            System.out.println("Connection started: " + connectionInfo);
        } else if (data.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
            RemoveInfo removeInfo = (RemoveInfo) data;
            System.out.println("Connection stopped: " + removeInfo.getObjectId());
        } else {
            System.err.println("Unknown message " + data);
        }
        大多數advisor消息都是完整的對于destiation,但是呢advisorysupport類有一些方法來決定監聽哪個advisorytopic,你也能使用通配符-
             String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        // Lets first create a Consumer to listen too
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // Lets first create a Consumer to listen too
        Queue queue = session.createQueue("test.Queue");
        MessageConsumer testConsumer = session.createConsumer(queue);
        // so lets listen for the Consumer starting/stoping
        Topic advisoryTopic = org.apache.activemq.advisory.AdvisorySupport.getConsumerAdvisoryTopic(queue);
        MessageConsumer consumer = session.createConsumer(advisoryTopic);
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message m) {
                ActiveMQMessage message = (ActiveMQMessage) m;
                try {
                    System.out.println("Consumer Count = " + m.getStringProperty("consumerCount"));
                    DataStructure data = (DataStructure) message.getDataStructure();
                    if (data.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE) {
                        ConsumerInfo consumerInfo = (ConsumerInfo) data;
                        System.out.println("Consumer started: " + consumerInfo);
                    } else if (data.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
                        RemoveInfo removeInfo = (RemoveInfo) data;
                        System.out.println("Consumer stopped: " + removeInfo.getObjectId());
                    } else {
                        System.err.println("Unknown message " + data);
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        testConsumer.close();
         <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" advisoryForSlowConsumers="true">
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>

ActiveMQ中,topic只有在持久訂閱(durablesubscription)下是持久化的。存在持久訂閱時,每個持久訂閱者,都相當于一個持久化的queue的客戶端,它會收取所有消息。這種情況下存在兩個問題:

1.        同一應用內consumer端負載均衡的問題:同一個應用上的一個持久訂閱不能使用多個consumer來共同承擔消息處理功能。因為每個都會獲取所有消息。queue模式可以解決這個問題,broker
端又不能將消息發送到多個應用端。所以,既要發布訂閱,又要讓消費者分組,這個功能jms規范本身是沒有的。
2.        同一應用內consumer端failover的問題:由于只能使用單個的持久訂閱者,如果這個訂閱者出錯,則應用就無法處理消息了,系統的健壯性不高,為了解決這兩個問題,ActiveMQ中實現了虛擬
Topic的功能。使用起來非常簡單。對于消息發布者來說,就是一個正常的Topic,名稱以VirtualTopic.開頭。例如VirtualTopic.TEST。對于消息接收端來說,是個隊列,不同應用里使用不同的前綴作為
隊列的名稱,即可表明自己的身份即可實現消費端應用分組。例如Consumer.A.VirtualTopic.TEST,說明它是名稱為A的消費端,同理Consumer.B.VirtualTopic.TEST說明是一個名稱為B的客戶端。
可以在同一個應用里使用多個consumer消費此queue,則可以實現上面兩個功能。又因為不同應用使用的queue名稱不同(前綴不同),所以不同的應用中都可以接收到全部的消息。每個客戶端相當于一個持久訂
閱者,而且這個客戶端可以使用多個消費者共同來承擔消費任務。
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
        
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
        Connection consumerConnection = connectionFactory.createConnection();
        consumerConnection.start();
        
        Session consumerSessionA = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        Queue consumerAQueue = consumerSessionA.createQueue("Consumer.A.VirtualTopic.orders");
        MessageConsumer consumerA = consumerSessionA.createConsumer(consumerAQueue);
        
        Session consumerSessionB = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        Queue consumerBQueue = consumerSessionB.createQueue("Consumer.B.VirtualTopic.orders");
        MessageConsumer consumerB = consumerSessionB.createConsumer(consumerAQueue);
        
        //setup the sender
        Connection senderConnection = connectionFactory.createConnection();
        senderConnection.start();
        Session senerSession = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic ordersDestination = senerSession.createTopic("VirtualTopic.orders");
        MessageProducer producer = senerSession.createProducer(ordersDestination);
同樣queue名稱的消費者會平分所有消息。
從queue接收到的消息,message.getJMSDestination().toString()為topic://VirtualTopic.TEST,即原始的destination。消息的persistent屬性為true,即每個相當于一個持久訂閱。
Virtual Topic這個功能特性在broker上有個總開關,useVirtualTopics屬性,默認為true,設置為false即可關閉此功能。
當此功能開啟,并且使用了持久化的存儲時,broker啟動的時候會從持久化存儲里拿到所有的destinations的名稱,如果名稱模式與Virtual Topics匹配,則把它們添加到系統的Virtual Topics列表中去。
當然,沒有顯式定義的Virtual Topics,也可以直接使用的,系統會自動創建對應的實際topic。
當有consumer訪問此VirtualTopics時,系統會自動創建持久化的queue,并在每次Topic收到消息時,分發到具體的queue。



可追溯”消費者,只對Topic有效,如果consumer是可追溯的,那么它可以獲取實例創建之前的消息。通常而言,訂閱者不可能獲取實例創建之前的消息,因為broker根本不知道它的存在。對于broker而言,如果
一個Topic通道創建,且有發布者發布消息(Publisher),那么broker將會在內存中(非持久化)或者磁盤中(持久化)保存已經發布的消息,直到所有的訂閱者都消費者,才會清除原始消息內容。那么retroactive
類型的訂閱者,就可以獲取這些原本不屬于自己但broker上還保存的舊消息,就像我們訂閱一種Feed,可以立即獲取舊的內容列表一樣。如果此訂閱者不是durable(耐久的),它可以獲取最近發布的一些消息;如果是durable,它可以獲取存儲器中尚未刪除的所有的舊消息。[下文會詳細介紹Topic的數據轉發模型]
//在destinationUrl中設置,默認為false
feedTopic?consumer.retroactive=true
在broker端,可以配置當前Topic默認為“可追溯的”,不過Topic并不會在此種情況下額外的保存消息,只不過表示訂閱者默認都是可追溯的而已。
<!-- 只對topic有效,默認為false -->
<policyEntry topic="feedTopic" alwaysRetroactive="true" />
    String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic("soccer.division1.leeds?consumer.retroactive=true");
        MessageConsumer consumer = session.createConsumer(topic);
        Message result = consumer.receive();

 redeliveryPolicy
    consumer使用的重發策略,當消息在client端處理失敗(比如onMessage方法拋出異常,事務回滾等),將會觸發消息重發。對于Broker端,需要重發的消息將會被立即發送(如果broker端使用異步發送,
且發送隊列中還有其他消息,那么重發的消息可能不會被立即到達Consumer)。我們通過此Policy配置最大重發次數、重發頻率等,如果你的Consumer客戶端處于不良網絡環境中,可以適當調整相關參數。參數列表,
請參見(RedeliveryPolicy)
//在brokerUrl中設置
tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=6
 . redeliveryPolicy
RedelieveryPolicy policy=connection.getRedelieveryPolicy();
policy.setInitialRedelieveryDelay(500);
policy.setBackOffMultiplier(2)
policy.setUseExponentialBackOff(true)
policy.setMaximumRedelieveries(2)

DLQ-死信隊列(Dead Letter Queue)用來保存處理失敗或者過期的消息。
出現以下情況時,消息會被redelivered
A transacted session is used and rollback() is called.
A transacted session is closed before commit is called.
A session is using CLIENT_ACKNOWLEDGE and Session.recover() is called.

當一個消息被redelivered超過maximumRedeliveries(缺省為6次,具體設置請參考后面的鏈接)次數時,會給broker發送一個"Poison ack",這個消息被認為是a poison pill,這時broker會將這
消息發送到DLQ,以便后續處理。缺省的死信隊列是ActiveMQ.DLQ,如果沒有特別指定,死信都會被發送到這個隊列。缺省持久消息過期,會被送到DLQ,非持久消息不會送到DLQ可以通過配置文件(activemq.xml)
來調整死信發送策略
<destinationPolicy>

    <policyMap>

      <policyEntries>

        <!— 設置所有隊列,使用 '>' ,否則用隊列名稱 -->

        <policyEntry queue=">">

          <deadLetterStrategy>

            <!--

                    queuePrefix:設置死信隊列前綴

                    useQueueForQueueMessages: 設置使用隊列保存死信,還可以設置useQueueForTopicMessages,使用Topic來保存死信

            -->

            <individualDeadLetterStrategy   queuePrefix="DLQ." useQueueForQueueMessages="true"  processExpired="false" processNonPersistent="false"/>

          </deadLetterStrategy>

        </policyEntry>

      </policyEntries>

    </policyMap>

  </destinationPolicy>

  ...

</broker>

        在一個電子系統中可能接受來自不同供應商的各種訂單信息,不同類型的訂單走的流程不盡相同,為了快速處理各種不同的訂單完成不同的業務。特定義不同的路由 信息。根據路由信息的不同,將消息進行不同的處理。如果采用ActiveMQ那么最好采用apache-camel整合,使不同的消息根據不同的流程自動 處理到不同的隊列中去。

<beans>

<broker brokerName="testBroker">

<transportConnectors>

<transportConnector uri="tcp://localhos:61616">

</transportConnectors>

<import resource="camel.xml">

</beans>

到此,關于“activemq特性是什么”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

向AI問一下細節
推薦閱讀:
  1. ActiveMQ Tips
  2. ActiveMQ安裝

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

怀安县| 台北县| 曲松县| 登封市| 黄陵县| 泌阳县| 曲周县| 灵丘县| 民和| 香河县| 洱源县| 原平市| 大竹县| 禹城市| 綦江县| 吉木萨尔县| 三明市| 黔江区| 和政县| 绥芬河市| 鄢陵县| 潼南县| 龙门县| 延川县| 共和县| 津市市| 诸城市| 黎平县| 中西区| 本溪市| 阿克| 西昌市| 三门县| 葵青区| 丰顺县| 大城县| 巴青县| 贵阳市| 伊春市| 类乌齐县| 苏州市|