您好,登錄后才能下訂單哦!
這篇文章主要介紹“activemq特性是什么”,在日常操作中,相信很多人在activemq特性是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”activemq特性是什么”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
activemq特點:用通配符訂閱多個destination,用組合發布多重destionation
activemq支持destination的層次結構【topic和queen】便于歸類和管理。
通配符有三個:
. 用來分隔路徑
* 用來匹配路徑中的一節
> 用來匹配任意節的路徑
opics: <sport><League>.<team>
。例如: football.division.leeds。 如果leeds 參加兩種運動--Scccer 和 Rugby,為了方便,我們希望通過一個消息消費者而看到Leeds兩種運動的最新戰績,這個時候,通配符就有用武之地了
. : used to separate elements in the destination name
* : used to match one element
> : match one or all trailing elements
所以,對于上面的例子, 你可以訂閱這樣的主題: *.*.Leeds
如果你想知道division1 這個賽區的所有分數, 你可以訂閱這個: soccer.division1.*
如果你想知道Rugby的分數: 你可以訂閱這個: rugby.>.
然而, 通配符中是為消費者服務的,如果你發送了這樣的一個主題: rugby.>., 這個消息僅會發送到命名了rugby.>.的主題,并不是所有的主題都是以rugby開頭的。
這里有一種 方法,使消息生產者能將一條
消息發送到多個目的地。通過使用 composite destination。
將同一條消息發送到不同的目的地是很有用的。 比如一個用來存儲信息的應用,會發送一條消息給隊列
同時也要將這條消息廣播給監控的所有系統。通常,你會通過用兩個producer 發送兩次消息來達到這個目的。composite destination就是用來解決這種情況的
例如,如果你創建了名子為: store.order.backoffice,store.order.warehouse 的 Queue,這樣 就會發送同時兩個Queue。
訂閱信息 解釋
PRICE.> Any price for any product on any exchange
PRICE.STOCK.> Any price for a stock on any exchange
PRICE.STOCK.NASDAQ.* Any stock price on NASDAQ
PRICE.STOCK.*.IBM Any IBM stock price on any exchange
從5.5 版本以后,可以自定義路徑分隔符:
<plugins>
.....
<destinationPathSeparatorPlugin/>
</plugins>
此時FOO.BAR.* 可以表示為 FOO/BAR/*
也可以通過pathSeparator 屬性定義其他符號位路徑分隔符。
public void subscribeToLeeds() throws JMSException {
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic allLeeds = session.createTopic("*.*.Leeds");
MessageConsumer consumer = session.createConsumer(allLeeds);
Message result = consumer.receive();
}
11.1.2發送一個message到多重destinations
發送相同的message到不同的destination上:案列發送一個[queen,opic]組合模式,默認的組合destination用,分隔
列如store.order.backoffice,store.order.warehouse
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue ordersDestination = session.createQueue("store.orders, topic://store.orders");
MessageProducer producer = session.createProducer(ordersDestination);
Message order = session.createObjectMessage();
producer.send(order);
11.2通知消息
單的說就是實現了ActiveMQ的broker上各種操作的記錄跟蹤和通知。
使用這個功能,你可以實時的知道broker上
創建或銷毀了連接,
添加或刪除了生存者或消費者,
添加或刪除了主題或隊列,
有消息發送和接收,
什么時候有慢消費者,
什么時候有快生產者
什么時候什么消息被丟棄
什么時候broker被添加到集群(主從或是網絡連接)
這個機制是ActiveMQ對JMS協議的重要補充,也是基于JMS實現的ActiveMQ的可管理性的一部分。多個ActiveMQ的相互協調和互操作的基礎設置。
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic connectionAdvisory = org.apache.activemq.advisory.AdvisorySupport.CONNECTION_ADVISORY_TOPIC;
MessageConsumer consumer = session.createConsumer(connectionAdvisory);
ActiveMQMessage message = (ActiveMQMessage) consumer.receive();
DataStructure data = (DataStructure) message.getDataStructure();
if (data.getDataStructureType() == ConnectionInfo.DATA_STRUCTURE_TYPE) {
ConnectionInfo connectionInfo = (ConnectionInfo) data;
System.out.println("Connection started: " + connectionInfo);
} else if (data.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
RemoveInfo removeInfo = (RemoveInfo) data;
System.out.println("Connection stopped: " + removeInfo.getObjectId());
} else {
System.err.println("Unknown message " + data);
}
大多數advisor消息都是完整的對于destiation,但是呢advisorysupport類有一些方法來決定監聽哪個advisorytopic,你也能使用通配符-
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection connection = connectionFactory.createConnection();
connection.start();
// Lets first create a Consumer to listen too
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Lets first create a Consumer to listen too
Queue queue = session.createQueue("test.Queue");
MessageConsumer testConsumer = session.createConsumer(queue);
// so lets listen for the Consumer starting/stoping
Topic advisoryTopic = org.apache.activemq.advisory.AdvisorySupport.getConsumerAdvisoryTopic(queue);
MessageConsumer consumer = session.createConsumer(advisoryTopic);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message m) {
ActiveMQMessage message = (ActiveMQMessage) m;
try {
System.out.println("Consumer Count = " + m.getStringProperty("consumerCount"));
DataStructure data = (DataStructure) message.getDataStructure();
if (data.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE) {
ConsumerInfo consumerInfo = (ConsumerInfo) data;
System.out.println("Consumer started: " + consumerInfo);
} else if (data.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
RemoveInfo removeInfo = (RemoveInfo) data;
System.out.println("Consumer stopped: " + removeInfo.getObjectId());
} else {
System.err.println("Unknown message " + data);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
});
testConsumer.close();
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" advisoryForSlowConsumers="true">
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
ActiveMQ中,topic只有在持久訂閱(durablesubscription)下是持久化的。存在持久訂閱時,每個持久訂閱者,都相當于一個持久化的queue的客戶端,它會收取所有消息。這種情況下存在兩個問題:
1. 同一應用內consumer端負載均衡的問題:同一個應用上的一個持久訂閱不能使用多個consumer來共同承擔消息處理功能。因為每個都會獲取所有消息。queue模式可以解決這個問題,broker
端又不能將消息發送到多個應用端。所以,既要發布訂閱,又要讓消費者分組,這個功能jms規范本身是沒有的。
2. 同一應用內consumer端failover的問題:由于只能使用單個的持久訂閱者,如果這個訂閱者出錯,則應用就無法處理消息了,系統的健壯性不高,為了解決這兩個問題,ActiveMQ中實現了虛擬
Topic的功能。使用起來非常簡單。對于消息發布者來說,就是一個正常的Topic,名稱以VirtualTopic.開頭。例如VirtualTopic.TEST。對于消息接收端來說,是個隊列,不同應用里使用不同的前綴作為
隊列的名稱,即可表明自己的身份即可實現消費端應用分組。例如Consumer.A.VirtualTopic.TEST,說明它是名稱為A的消費端,同理Consumer.B.VirtualTopic.TEST說明是一個名稱為B的客戶端。
可以在同一個應用里使用多個consumer消費此queue,則可以實現上面兩個功能。又因為不同應用使用的queue名稱不同(前綴不同),所以不同的應用中都可以接收到全部的消息。每個客戶端相當于一個持久訂
閱者,而且這個客戶端可以使用多個消費者共同來承擔消費任務。
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection consumerConnection = connectionFactory.createConnection();
consumerConnection.start();
Session consumerSessionA = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Queue consumerAQueue = consumerSessionA.createQueue("Consumer.A.VirtualTopic.orders");
MessageConsumer consumerA = consumerSessionA.createConsumer(consumerAQueue);
Session consumerSessionB = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Queue consumerBQueue = consumerSessionB.createQueue("Consumer.B.VirtualTopic.orders");
MessageConsumer consumerB = consumerSessionB.createConsumer(consumerAQueue);
//setup the sender
Connection senderConnection = connectionFactory.createConnection();
senderConnection.start();
Session senerSession = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic ordersDestination = senerSession.createTopic("VirtualTopic.orders");
MessageProducer producer = senerSession.createProducer(ordersDestination);
同樣queue名稱的消費者會平分所有消息。
從queue接收到的消息,message.getJMSDestination().toString()為topic://VirtualTopic.TEST,即原始的destination。消息的persistent屬性為true,即每個相當于一個持久訂閱。
Virtual Topic這個功能特性在broker上有個總開關,useVirtualTopics屬性,默認為true,設置為false即可關閉此功能。
當此功能開啟,并且使用了持久化的存儲時,broker啟動的時候會從持久化存儲里拿到所有的destinations的名稱,如果名稱模式與Virtual Topics匹配,則把它們添加到系統的Virtual Topics列表中去。
當然,沒有顯式定義的Virtual Topics,也可以直接使用的,系統會自動創建對應的實際topic。
當有consumer訪問此VirtualTopics時,系統會自動創建持久化的queue,并在每次Topic收到消息時,分發到具體的queue。
可追溯”消費者,只對Topic有效,如果consumer是可追溯的,那么它可以獲取實例創建之前的消息。通常而言,訂閱者不可能獲取實例創建之前的消息,因為broker根本不知道它的存在。對于broker而言,如果
一個Topic通道創建,且有發布者發布消息(Publisher),那么broker將會在內存中(非持久化)或者磁盤中(持久化)保存已經發布的消息,直到所有的訂閱者都消費者,才會清除原始消息內容。那么retroactive
類型的訂閱者,就可以獲取這些原本不屬于自己但broker上還保存的舊消息,就像我們訂閱一種Feed,可以立即獲取舊的內容列表一樣。如果此訂閱者不是durable(耐久的),它可以獲取最近發布的一些消息;如果是durable,它可以獲取存儲器中尚未刪除的所有的舊消息。[下文會詳細介紹Topic的數據轉發模型]
//在destinationUrl中設置,默認為false
feedTopic?consumer.retroactive=true
在broker端,可以配置當前Topic默認為“可追溯的”,不過Topic并不會在此種情況下額外的保存消息,只不過表示訂閱者默認都是可追溯的而已。
<!-- 只對topic有效,默認為false -->
<policyEntry topic="feedTopic" alwaysRetroactive="true" />
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("soccer.division1.leeds?consumer.retroactive=true");
MessageConsumer consumer = session.createConsumer(topic);
Message result = consumer.receive();
redeliveryPolicy
consumer使用的重發策略,當消息在client端處理失敗(比如onMessage方法拋出異常,事務回滾等),將會觸發消息重發。對于Broker端,需要重發的消息將會被立即發送(如果broker端使用異步發送,
且發送隊列中還有其他消息,那么重發的消息可能不會被立即到達Consumer)。我們通過此Policy配置最大重發次數、重發頻率等,如果你的Consumer客戶端處于不良網絡環境中,可以適當調整相關參數。參數列表,
請參見(RedeliveryPolicy)
//在brokerUrl中設置
tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=6
. redeliveryPolicy
RedelieveryPolicy policy=connection.getRedelieveryPolicy();
policy.setInitialRedelieveryDelay(500);
policy.setBackOffMultiplier(2)
policy.setUseExponentialBackOff(true)
policy.setMaximumRedelieveries(2)
DLQ-死信隊列(Dead Letter Queue)用來保存處理失敗或者過期的消息。
出現以下情況時,消息會被redelivered
A transacted session is used and rollback() is called.
A transacted session is closed before commit is called.
A session is using CLIENT_ACKNOWLEDGE and Session.recover() is called.
當一個消息被redelivered超過maximumRedeliveries(缺省為6次,具體設置請參考后面的鏈接)次數時,會給broker發送一個"Poison ack",這個消息被認為是a poison pill,這時broker會將這
消息發送到DLQ,以便后續處理。缺省的死信隊列是ActiveMQ.DLQ,如果沒有特別指定,死信都會被發送到這個隊列。缺省持久消息過期,會被送到DLQ,非持久消息不會送到DLQ可以通過配置文件(activemq.xml)
來調整死信發送策略
<destinationPolicy>
<policyMap>
<policyEntries>
<!— 設置所有隊列,使用 '>' ,否則用隊列名稱 -->
<policyEntry queue=">">
<deadLetterStrategy>
<!--
queuePrefix:設置死信隊列前綴
useQueueForQueueMessages: 設置使用隊列保存死信,還可以設置useQueueForTopicMessages,使用Topic來保存死信
-->
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" processExpired="false" processNonPersistent="false"/>
</deadLetterStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
...
</broker>
在一個電子系統中可能接受來自不同供應商的各種訂單信息,不同類型的訂單走的流程不盡相同,為了快速處理各種不同的訂單完成不同的業務。特定義不同的路由 信息。根據路由信息的不同,將消息進行不同的處理。如果采用ActiveMQ那么最好采用apache-camel整合,使不同的消息根據不同的流程自動 處理到不同的隊列中去。
<beans>
<broker brokerName="testBroker">
<transportConnectors>
<transportConnector uri="tcp://localhos:61616">
</transportConnectors>
<import resource="camel.xml">
</beans>
到此,關于“activemq特性是什么”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。