您好,登錄后才能下訂單哦!
最近公司做了一個以信息安全為主的項目,其中有一個業務需求就是,項目定時監控操作用戶的行為,對于一些違規操作嚴重的行為,以發送郵件(FoxMail)的形式進行郵件告警,可能是多人,也可能是一個人,第一次是以單人的形式,,直接在業務層需要告警的地方發送郵件即可,可是后邊需求變更了,對于某些告警郵件可能會發送多人,這其中可能就會有阻塞發郵件的可能,直到把所有郵件發送完畢后再繼續做下邊的業務,領導說這樣會影響用戶體驗,發郵件的時候用戶一直處于等待狀態,不能干別的事情。最后研究說用消息隊列,當有需要發送郵件告警的時候,就向隊列中添加一個標識消息,ActiveMQ通過監聽器的形式,實時監聽隊列里邊的小時,收到消息后,判斷是不是需要發送告警的標識,是的話就自行就行發送郵件!這是就研究的消息隊列ActiveMQ,下邊就是具體內容:
?ActiveMQ是Apache所提供的一個開源的消息系統,完全采用Java來實現,因此,它能很好地支持J2EE提出的JMS(Java Message Service,即Java消息服務)規范。JMS是一組Java應用程序接口,它提供消息的創建、發送、讀取等一系列服務。JMS提供了一組公共應用程序接口和響應的語法,類似于Java數據庫的統一訪問接口JDBC,它是一種與廠商無關的API,使得Java程序能夠與不同廠商的消息組件很好地進行通信。
JMS支持兩種消息發送和接收模型。
類似送快遞,快遞員(producer)將快遞(Message)放到指定地點(destination)后,就可以離開了,拿快遞的人(customer)在接收到通知后,到指定地點(destination)去取快遞(Message)就可以了。當然,取快遞時可能要進行身份驗證,這就涉及到創建連接(connection)時,需要指定用戶名和密碼了。還有就是,實際生活中,當快遞員把快遞放好之后,照理說應該通知客戶去哪里取快遞,而ActiveMq幫我們做好了一切,通知的工作Activemq會幫我們實現,而無需我們親自編碼通知消費者,生產者只需要將Message放到Mq中即可,通知消費者的工作,mq會幫我們處理
用途就是用來處理消息,也就是處理JMS在大型電子商務類網站,如京東、淘寶、去哪兒等網站有著深入的應用,隊列的主要作用是消除高并發訪問高峰,加快網站的響應速度。
在不使用消息隊列的情況下,用戶的請求數據直接寫入數據庫,高發的情況下,會對數據庫造成巨大的壓力,同時也使得系統響應延遲加劇,但使用隊列后,用戶的請求發給隊列后立即返回。
/apache-activemq-5.15.3/bin/win64/目錄下雙擊activemq.bat文件,在瀏覽器中輸入http://localhost:8161/admin/, 用戶名和密碼輸入admin即可
1 <?xml version="1.0" encoding="UTF-8"?>
2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4 <modelVersion>4.0.0</modelVersion>
5
6 <groupId>www.cnblogs.com.hongmoshu</groupId>
7 <artifactId>test_actmq</artifactId>
8 <version>0.0.1-SNAPSHOT</version>
9 <packaging>war</packaging>
10 <name>test_actmq Maven Webapp</name>
11 <url>http://www.example.com</url>
12
13 <!-- 版本管理 -->
14 <properties>
15 <springframework>4.1.8.RELEASE</springframework>
16 </properties>
17
18
19 <dependencies>
20
21 <!-- junit單元測試 -->
22 <dependency>
23 <groupId>junit</groupId>
24 <artifactId>junit</artifactId>
25 <version>4.11</version>
26 <scope>test</scope>
27 </dependency>
28
29 <!-- JSP相關 -->
30 <dependency>
31 <groupId>jstl</groupId>
32 <artifactId>jstl</artifactId>
33 <version>1.2</version>
34 </dependency>
35 <dependency>
36 <groupId>javax.servlet</groupId>
37 <artifactId>servlet-api</artifactId>
38 <scope>provided</scope>
39 <version>2.5</version>
40 </dependency>
41
42 <!-- spring -->
43 <dependency>
44 <groupId>org.springframework</groupId>
45 <artifactId>spring-core</artifactId>
46 <version>${springframework}</version>
47 </dependency>
48 <dependency>
49 <groupId>org.springframework</groupId>
50 <artifactId>spring-context</artifactId>
51 <version>${springframework}</version>
52 </dependency>
53 <dependency>
54 <groupId>org.springframework</groupId>
55 <artifactId>spring-tx</artifactId>
56 <version>${springframework}</version>
57 </dependency>
58 <dependency>
59 <groupId>org.springframework</groupId>
60 <artifactId>spring-webmvc</artifactId>
61 <version>${springframework}</version>
62 </dependency>
63 <dependency>
64 <groupId>org.springframework</groupId>
65 <artifactId>spring-jms</artifactId>
66 <version>${springframework}</version>
67 </dependency>
68
69 <!-- xbean 如<amq:connectionFactory /> -->
70 <dependency>
71 <groupId>org.apache.xbean</groupId>
72 <artifactId>xbean-spring</artifactId>
73 <version>3.16</version>
74 </dependency>
75
76 <!-- activemq -->
77 <dependency>
78 <groupId>org.apache.activemq</groupId>
79 <artifactId>activemq-core</artifactId>
80 <version>5.7.0</version>
81 </dependency>
82 <dependency>
83 <groupId>org.apache.activemq</groupId>
84 <artifactId>activemq-pool</artifactId>
85 <version>5.12.1</version>
86 </dependency>
87
88 <!-- gson -->
89 <dependency>
90 <groupId>com.google.code.gson</groupId>
91 <artifactId>gson</artifactId>
92 <version>1.7.1</version>
93 </dependency>
94
95 <!-- JSON -->
96 <dependency>
97 <groupId>net.sf.json-lib</groupId>
98 <artifactId>json-lib</artifactId>
99 <version>2.4</version>
100 <classifier>jdk15</classifier>
101 </dependency>
102
103 </dependencies>
104
105 <build>
106 <finalName>test_actmq</finalName>
107
108 </build>
109 </project>
1 <?xml version="1.0" encoding="UTF-8"?>
2 <beans xmlns="http://www.springframework.org/schema/beans"
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4 xmlns:amq="http://activemq.apache.org/schema/core"
5 xmlns:jms="http://www.springframework.org/schema/jms"
6 xmlns:context="http://www.springframework.org/schema/context"
7 xmlns:mvc="http://www.springframework.org/schema/mvc"
8 xsi:schemaLocation="
9 http://www.springframework.org/schema/beans
10 http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
11 http://www.springframework.org/schema/context
12 http://www.springframework.org/schema/context/spring-context-4.1.xsd
13 http://www.springframework.org/schema/mvc
14 http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd
15 http://www.springframework.org/schema/jms
16 http://www.springframework.org/schema/jms/spring-jms-4.1.xsd
17 http://activemq.apache.org/schema/core
18 http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd"
19 >
20
21 <context:component-scan base-package="com.svse.service" />
22 <mvc:annotation-driven />
23
24 <!-- jms.useAsyncSend=true 允許異步接收消息 -->
25 <amq:connectionFactory id="amqConnectionFactory"
26 brokerURL="tcp://192.168.6.111:61616?jms.useAsyncSend=true"
27 userName="admin"
28 password="admin" />
29
30 <!-- 配置JMS連接工 廠 -->
31 <bean id="connectionFactory"
32 class="org.springframework.jms.connection.CachingConnectionFactory">
33 <constructor-arg ref="amqConnectionFactory" />
34 <property name="sessionCacheSize" value="100" />
35 </bean>
36
37 <!-- 定義消息隊列名稱(Queue) -->
38 <bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
39 <!-- 設置消息隊列的名字 -->
40 <constructor-arg>
41 <value>Jaycekon</value>
42 </constructor-arg>
43 </bean>
44
45 <!-- 配置JMS模板(Queue),Spring提供的JMS工具類,它發送、接收消息。 -->
46 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
47 <property name="connectionFactory" ref="connectionFactory" />
48 <property name="defaultDestination" ref="demoQueueDestination" />
49 <property name="receiveTimeout" value="10000" />
50 <!-- true是topic,false是queue,默認是false,此處顯示寫出false -->
51 <property name="pubSubDomain" value="false" />
52 <!-- 消息轉換器 -->
53 <property name="messageConverter" ref="userMessageConverter"/>
54 </bean>
55
56 <!-- 類型轉換器 -->
57 <bean id="userMessageConverter" class="com.svse.util.ObjectMessageConverter"/>
58
59
60 <!-- 配置消息隊列監聽者(Queue) -->
61 <bean id="queueMessageListener" class="com.svse.util.QueueMessageListener" />
62
63 <!-- 顯示注入消息監聽容器(Queue),配置連接工廠,監聽的目標是demoQueueDestination,監聽器是上面定義的監聽器 -->
64 <bean id="queueListenerContainer"
65 class="org.springframework.jms.listener.DefaultMessageListenerContainer">
66 <property name="connectionFactory" ref="connectionFactory" />
67 <property name="destination" ref="demoQueueDestination" />
68 <property name="messageListener" ref="queueMessageListener" />
69 </bean>
70
71 </beans>
1 <?xml version="1.0" encoding="UTF-8"?>
2 <beans xmlns="http://www.springframework.org/schema/beans"
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4 xmlns:context="http://www.springframework.org/schema/context"
5 xmlns:mvc="http://www.springframework.org/schema/mvc"
6 xsi:schemaLocation="http://www.springframework.org/schema/beans
7 http://www.springframework.org/schema/beans/spring-beans.xsd
8 http://www.springframework.org/schema/context
9 http://www.springframework.org/schema/context/spring-context-4.1.xsd
10 http://www.springframework.org/schema/mvc
11 http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd">
12
13 <context:component-scan base-package="com.svse.controller" />
14 <mvc:annotation-driven />
15 <bean id="viewResolver" class="org.springframework.web.servlet.view.UrlBasedViewResolver">
16 <property name="viewClass"
17 value="org.springframework.web.servlet.view.JstlView" />
18 <property name="prefix" value="/WEB-INF/views/" />
19 <property name="suffix" value=".jsp" />
20 </bean>
21
22 </beans>
1 <?xml version="1.0" encoding="UTF-8"?>
2 <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3 xmlns="http://xmlns.jcp.org/xml/ns/javaee"
4 xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
5 id="WebApp_ID" version="3.1">
6 <display-name>mydemo</display-name>
7
8 <welcome-file-list>
9 <welcome-file>index.jsp</welcome-file>
10 </welcome-file-list>
11
12 <!-- 加載spring及active的配置文件,classpath為項目src下的路徑 -->
13 <context-param>
14 <param-name>contextConfigLocation</param-name>
15 <param-value>
16 classpath:spring-mvc.xml;
17 classpath:ActiveMQ.xml;
18 </param-value>
19 </context-param>
20
21
22 <listener>
23 <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
24 </listener>
25
26 <servlet>
27 <servlet-name>springMVC</servlet-name>
28 <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
29 <init-param>
30 <param-name>contextConfigLocation</param-name>
31 <param-value>classpath:spring-mvc.xml</param-value>
32 </init-param>
33 <load-on-startup>1</load-on-startup>
34 </servlet>
35 <servlet-mapping>
36 <servlet-name>springMVC</servlet-name>
37 <url-pattern>/</url-pattern>
38 </servlet-mapping>
39
40 <!-- 處理編碼格式 -->
41 <filter>
42 <filter-name>characterEncodingFilter</filter-name>
43 <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
44 <init-param>
45 <param-name>encoding</param-name>
46 <param-value>UTF-8</param-value>
47 </init-param>
48 <init-param>
49 <param-name>forceEncoding</param-name>
50 <param-value>true</param-value>
51 </init-param>
52 </filter>
53 <filter-mapping>
54 <filter-name>characterEncodingFilter</filter-name>
55 <url-pattern>/*</url-pattern>
56 </filter-mapping>
57
58 </web-app>
1 package com.svse.entity;
2 import java.io.Serializable;
3
4 public class Users implements Serializable{
5
6 private String userId;
7 private String userName;
8 private String sex;
9 private String age;
10 private String type;
11
12
13 public Users() {
14 super();
15 }
16 public Users(String userId, String userName, String sex, String age,
17 String type) {
18 super();
19 this.userId = userId;
20 this.userName = userName;
21 this.sex = sex;
22 this.age = age;
23 this.type = type;
24 }
25 public String getUserId() {
26 return userId;
27 }
28 public void setUserId(String userId) {
29 this.userId = userId;
30 }
31 public String getUserName() {
32 return userName;
33 }
34 public void setUserName(String userName) {
35 this.userName = userName;
36 }
37 public String getSex() {
38 return sex;
39 }
40 public void setSex(String sex) {
41 this.sex = sex;
42 }
43 public String getAge() {
44 return age;
45 }
46 public void setAge(String age) {
47 this.age = age;
48 }
49 public String getType() {
50 return type;
51 }
52 public void setType(String type) {
53 this.type = type;
54 }
55 @Override
56 public String toString() {
57 return "Users [userId=" + userId + ", userName=" + userName + ", sex="
58 + sex + ", age=" + age + ", type=" + type + "]";
59 }
60
61
62 }
1 package com.svse.service;
2
3 import javax.annotation.Resource;
4 import javax.jms.Destination;
5 import javax.jms.JMSException;
6 import javax.jms.Message;
7 import javax.jms.Session;
8
9 import org.springframework.jms.core.JmsTemplate;
10 import org.springframework.jms.core.MessageCreator;
11 import org.springframework.stereotype.Service;
12
13 import com.svse.entity.Users;
14
15 @Service
16 public class ProducerService {
17
18 @Resource(name="jmsTemplate")
19 private JmsTemplate jmsTemplate;
20
21
22 /**
23 * 向指定隊列發送消息 (發送文本消息)
24 */
25 public void sendMessage(Destination destination,final String msg){
26
27 jmsTemplate.setDeliveryPersistent(true);
28
29 System.out.println(Thread.currentThread().getName()+" 向隊列"+destination.toString()+"發送消息---------------------->"+msg);
30 jmsTemplate.send(destination, new MessageCreator() {
31 public Message createMessage(Session session) throws JMSException {
32 return session.createTextMessage(msg);
33 }
34 });
35 }
36
37 /**
38 * 向指定隊列發送消息以對象的方式 (發送對象消息)
39 */
40 public void sendMessageNew(Destination destination,Users user){
41 System.out.println(Thread.currentThread().getName()+" 向隊列"+destination.toString()+"發送消息---------------------->"+user);
42 jmsTemplate.convertAndSend(user);
43 }
44
45 /**
46 * 向默認隊列發送消息
47 */
48 public void sendMessage(final String msg){
49 String destination = jmsTemplate.getDefaultDestinationName();
50 System.out.println(Thread.currentThread().getName()+" 向隊列"+destination+"發送消息---------------------->"+msg);
51 jmsTemplate.send(new MessageCreator() {
52 public Message createMessage(Session session) throws JMSException {
53 return session.createTextMessage(msg);
54 }
55 });
56 }
57 }
1 package com.svse.service;
2
3 import javax.annotation.Resource;
4 import javax.jms.Destination;
5 import javax.jms.JMSException;
6 import javax.jms.ObjectMessage;
7 import javax.jms.TextMessage;
8
9 import net.sf.json.JSONObject;
10
11 import org.springframework.jms.core.JmsTemplate;
12 import org.springframework.stereotype.Service;
13
14 import com.svse.entity.Users;
15
16 @Service
17 public class ConsumerService {
18
19 @Resource(name="jmsTemplate")
20 private JmsTemplate jmsTemplate;
21 //接收文本消息
22 public TextMessage receive(Destination destination){
23 TextMessage textMessage = (TextMessage) jmsTemplate.receive(destination);
24 try{
25 JSONObject json=JSONObject.fromObject(textMessage.getText());
26 System.out.println("name:"+json.getString("userName"));
27 System.out.println("從隊列" + destination.toString() + "收到了消息:\t"
28 + textMessage.getText());
29 } catch (JMSException e) {
30 e.printStackTrace();
31 }
32 return textMessage;
33 }
34 //接收對象消息
35 public ObjectMessage receiveNew(Destination destination){
36 ObjectMessage objMsg=(ObjectMessage) jmsTemplate.receive(destination);
38 try{
39 Users users= (Users) objMsg.getObject();
44 System.out.println("name:"+users.getUserName());
47 System.out.println("從隊列" + destination.toString() + "收到了消息:\t"
48 + users);
49 } catch (JMSException e) {
50 e.printStackTrace();
51 }
52 return objMsg;
53 }
54 }
1 package com.svse.controller.mq;
2
3 import java.io.IOException;
4 import java.text.SimpleDateFormat;
5 import java.util.Date;
7 import javax.annotation.Resource;
8 import javax.jms.DeliveryMode;
9 import javax.jms.Destination;
10 import javax.jms.JMSException;
11 import javax.jms.ObjectMessage;
12 import javax.jms.TextMessage;
13 import javax.management.MBeanServerConnection;
14 import javax.management.remote.JMXConnector;
15 import javax.management.remote.JMXConnectorFactory;
16 import javax.management.remote.JMXServiceURL;
18 import org.springframework.stereotype.Controller;
19 import org.springframework.web.bind.annotation.RequestMapping;
20 import org.springframework.web.bind.annotation.RequestMethod;
21 import org.springframework.web.bind.annotation.RequestParam;
22 import org.springframework.web.servlet.ModelAndView;
24 import com.google.gson.Gson;
25 import com.svse.entity.Users;
26 import com.svse.service.ConsumerService;
27 import com.svse.service.ProducerService;
28
29 @Controller
30 public class DemoController {
35
36 //隊列名Jaycekon (ActiveMQ中設置的隊列的名稱)
37 @Resource(name="demoQueueDestination")
38 private Destination demoQueueDestination;
39
40 //隊列消息生產者
41 @Resource(name="producerService")
42 private ProducerService producer;
43
44 //隊列消息消費者
45 @Resource(name="consumerService")
46 private ConsumerService consumer;
47
48 /*
49 * 準備發消息
50 */
51 @RequestMapping(value="/producer",method=RequestMethod.GET)
52 public ModelAndView producer(){
53 System.out.println("------------go producer");
54
55 Date now = new Date();
56 SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
57 String time = dateFormat.format( now );
58 System.out.println(time);
59
60 ModelAndView mv = new ModelAndView();
61 mv.addObject("time", time);
62 mv.setViewName("producer");
63 return mv;
64 }
65
66 /*
67 * 發消息
68 */
69 @RequestMapping(value="/onsend",method=RequestMethod.POST)
70 public ModelAndView producer(@RequestParam("message") String message) {
71 System.out.println("------------send to jms");
72 ModelAndView mv = new ModelAndView();
73 for(int i=0;i<5;i++){
74 try {
75 Users users=new Users("10"+(i+1),"趙媛媛"+(i+1),"女","27","影視演員");
76 Gson gson=new Gson();
77 String sendMessage=gson.toJson(users);
78 System.out.println("發送的消息sendMessage:"+sendMessage.toString());
79 // producer.sendMessage(demoQueueDestination,sendMessage.toString());//以文本的形式
80 producer.sendMessageNew(demoQueueDestination, users);//以對象的方式
81
82 } catch (Exception e) {
83 e.printStackTrace();
84 }
85 }
86 mv.setViewName("index");
87 return mv;
88 }
89 /*
90 * 手動接收消息
91 */
92 @RequestMapping(value="/receive",method=RequestMethod.GET)
93 public ModelAndView queue_receive() throws JMSException {
94 System.out.println("------------receive message");
95 ModelAndView mv = new ModelAndView();
96
97 //TextMessage tm = consumer.receive(demoQueueDestination);//接收文本消息
98
99 ObjectMessage objMsg=consumer.receiveNew(demoQueueDestination);//接收對象消息
100 Users users= (Users) objMsg.getObject();
101 //mv.addObject("textMessage", tm.getText());
102 mv.addObject("textMessage", users.getUserId()+" || "+users.getUserName());
103 mv.setViewName("receive");
104 return mv;
105 }
106
107 /*
108 * ActiveMQ Manager Test
109 */
110 @RequestMapping(value="/jms",method=RequestMethod.GET)
111 public ModelAndView jmsManager() throws IOException {
112 System.out.println("------------jms manager");
113 ModelAndView mv = new ModelAndView();
114 mv.setViewName("index");
115
116 JMXServiceURL url = new JMXServiceURL("");
117 JMXConnector connector = JMXConnectorFactory.connect(url);
118 connector.connect();
119 MBeanServerConnection connection = connector.getMBeanServerConnection();
120
121 return mv;
122 }
123
124 }
在上邊的ProducerService和ConsumerService中,不論是發送消息還是接收消息,都可以以文本TextMessage的方式和ObjectMessage的方式.如果是簡單的文本消息可以以TextMessage,但是如果需要發送的內容比較多,結構比較復雜,這時候就建議用對象文本ObjectMessage的方式向隊列queue中發送消息了.但是這時候就需要用到對象消息轉換器MessageConverter.
MessageConverter的作用主要有兩方面,一方面它可以把我們的非標準化Message對象轉換成我們的目標Message對象,這主要是用在發送消息的時候;另一方面它又可以把我們的Message對象
轉換成對應的目標對象,這主要是用在接收消息的時候。
1 package com.svse.util;
2
3 import java.io.Serializable;
4
5 import javax.jms.JMSException;
6 import javax.jms.Message;
7 import javax.jms.ObjectMessage;
8 import javax.jms.Session;
9
10 import org.springframework.jms.support.converter.MessageConversionException;
11 import org.springframework.jms.support.converter.MessageConverter;
12
13 /**
14 *功能說明:通用的消息對象轉換類
15 *@author:zsq
16 *create date:2019年7月12日 上午9:28:31
17 *修改人 修改時間 修改描述
18 *Copyright (c)2019北京智華天成科技有限公司-版權所有
19 */
20 public class ObjectMessageConverter implements MessageConverter {
21
22
23 //把一個Java對象轉換成對應的JMS Message (生產消息的時候)
24 public Message toMessage(Object object, Session session)
25 throws JMSException, MessageConversionException {
26
27 return session.createObjectMessage((Serializable) object);
28 }
29
30 //把一個JMS Message轉換成對應的Java對象 (消費消息的時候)
31 public Object fromMessage(Message message) throws JMSException,
32 MessageConversionException {
33 ObjectMessage objMessage = (ObjectMessage) message;
34 return objMessage.getObject();
35 }
36
37 }
注意:寫了消息轉化器之后還需要的ActiveMQ.xml中進行配置
MessageageListe作用就是動態的自行監聽消息隊列的生產者發送的消息,不需要人工手動接收!
1 package com.svse.util;
2 import javax.jms.JMSException;
3 import javax.jms.Message;
4 import javax.jms.MessageListener;
5 import javax.jms.ObjectMessage;
6 import javax.jms.TextMessage;
7
8 import com.svse.entity.Users;
9
10
11 public class QueueMessageListener implements MessageListener {
12
13 //添加了監聽器,只要生產者發布了消息,監聽器監聽到有消息消費者就會自動消費(獲取消息)
14 public void onMessage(Message message) {
15 //(第1種方式)沒加轉換器之前接收到的是文本消息
16 //TextMessage tm = (TextMessage) message;
17
18 //(第2種方式)加了轉換器之后接收到的ObjectMessage對象消息
19 ObjectMessage objMsg=(ObjectMessage) message;
20 Users users;
21 try {
22 users = (Users) objMsg.getObject();
23 //System.out.println("QueueMessageListener監聽到了文本消息:\t" + tm.getText());
24 System.out.println("QueueMessageListener監聽到了文本消息:\t" + users);
25 //do something ...
26 } catch (JMSException e1) {
27 // TODO Auto-generated catch block
28 e1.printStackTrace();
29 }
30 }
31
32 }
同樣寫好監聽器后也是需在ActiveMQ.xml中進行配置注冊的
(1)注冊JmsTemplate時,pubSubDomain這個屬性的值要特別注意。默認值是false,也就是說默認只是支持queue模式,不支持topic模式。但是,如果將它改為true,則不支持queue模式。因此如果項目需要同時支持queue和topic模式,那么需要注冊2個JmsTemplate,同時監聽容器也需要注冊2個
(2)使用Queue時,生產者只要將Message發送到MQ服務器端,消費者就可以進行消費,而無需生產者程序一直運行;
(3)消息是按照先入先出的順序,一旦有消費者將Message消費,該Message就會從MQ服務器隊列中刪去;
(4)有文章說,“生產者”<-->"消費者"是一對一的關系,其實并不準確,從應用中可以看出,一個生產者產生的消息,可以被多個消費者進行消費,只不過多個消費者在消費消息時是競爭的關系,先得到的先消費,一旦消費完成,該消息就會出隊列,
就不能被其他消費者再消費了,即“一次性消費”。就是我們熟悉的“點對點”通信了;
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。