您好,登錄后才能下訂單哦!
這篇文章給大家介紹如何在SpringMVC項目中使用rabbitmq,內容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。
1.添加maven依賴
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.5.1</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.4.5.RELEASE</version> </dependency>
2.spring主配置文件中加入rabbitMQ xml文件的配置
<!-- rabbitMQ 配置 --> <import resource="/application-mq.xml"/>
3.jdbc配置文件中加入 rabbitmq的鏈接配置
#rabbitMQ配置 mq.host=localhost mq.username=donghao mq.password=donghao mq.port=5672 mq.vhost=testMQ
4.新建application-mq.xml文件,添加配置信息
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" > <description>rabbitmq 連接服務配置</description> <!-- 連接配置 --> <rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" port="${mq.port}" virtual-host="${mq.vhost}"/> <rabbit:admin connection-factory="connectionFactory"/> <!-- spring template聲明--> <rabbit:template exchange="koms" id="amqpTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" /> <!-- 消息對象json轉換類 --> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> <!-- durable:是否持久化 exclusive: 僅創建者可以使用的私有隊列,斷開后自動刪除 auto_delete: 當所有消費客戶端連接斷開后,是否自動刪除隊列 --> <!-- 申明一個消息隊列Queue --> <rabbit:queue id="order" name="order" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue id="activity" name="activity" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue id="mail" name="mail" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue id="stock" name="stock" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue id="autoPrint" name="autoPrint" durable="true" auto-delete="false" exclusive="false" /> <!-- rabbit:direct-exchange:定義exchange模式為direct,意思就是消息與一個特定的路由鍵完全匹配,才會轉發。 rabbit:binding:設置消息queue匹配的key --> <!-- 交換機定義 --> <rabbit:direct-exchange name="koms" durable="true" auto-delete="false" id="koms"> <rabbit:bindings> <rabbit:binding queue="order" key="order"/> <rabbit:binding queue="activity" key="activity"/> <rabbit:binding queue="mail" key="mail"/> <rabbit:binding queue="stock" key="stock"/> <rabbit:binding queue="autoPrint" key="autoPrint"/> </rabbit:bindings> </rabbit:direct-exchange> <!-- queues:監聽的隊列,多個的話用逗號(,)分隔 ref:監聽器 --> <!-- 配置監聽 acknowledeg = "manual" 設置手動應答 當消息處理失敗時:會一直重發 直到消息處理成功 --> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual"> <!-- 配置監聽器 --> <rabbit:listener queues="activity" ref="activityListener"/> <rabbit:listener queues="order" ref="orderListener"/> <rabbit:listener queues="mail" ref="mailListener"/> <rabbit:listener queues="stock" ref="stockListener"/> <rabbit:listener queues="autoPrint" ref="autoPrintListener"/> </rabbit:listener-container> </beans>
5.新增公共入隊類
@Service public class MQProducerImpl{ @Resource private AmqpTemplate amqpTemplate; private final static Logger logger = LoggerFactory.getLogger(MQProducerImpl.class); //公共入隊方法 public void sendDataToQueue(String queueKey, Object object) { try { amqpTemplate.convertAndSend(queueKey, object); } catch (Exception e) { logger.error(e.toString()); } } }
6.創建監聽類
import java.io.IOException; import java.util.List; import javax.annotation.Resource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.utils.SerializationUtils; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import com.cn.framework.domain.BaseDto; import com.cn.framework.util.ConstantUtils; import com.cn.framework.util.RabbitMq.producer.MQProducer; import com.kxs.service.activityService.IActivityService; import com.kxs.service.messageService.IMessageService; import com.rabbitmq.client.Channel; /** * 活動處理listener * @author * @date 2017年6月30日 **/ @Component public class ActivityListener implements ChannelAwareMessageListener { private static final Logger log = LoggerFactory.getLogger(ActivityListener.class); @Override @Transactional public void onMessage(Message message,Channel channel) { } }
項目啟動后 控制臺會打印出監聽的日志信息 這里寫圖片描述
結尾:僅供參考,自己用作學習記錄,不喜勿噴,共勉!
補充:RabbitMQ與SpringMVC集成并實現發送消息和接收消息(持久化)方案
RabbitMQ本篇不介紹了,直接描述RabbitMQ與SpringMVC集成并實現發送消息和接收消息(持久化)。
使用了Spring-rabbit 發送消息和接收消息,我們使用的Maven來管理Jar包,在Maven的pom.xml文件中引入jar包
<span > <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.3.6.RELEASE</version> </dependency></span>
1.實現生產者
第一步:是要設置調用安裝RabbitMQ的IP、端口等
配置一個global.properties文件
第二步:通過SpringMVC把global.properties文件讀進來
<span ><!-- 注入屬性文件 --> <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>classpath:global.properties</value> </list> </property> </bean> </span>
第三步:配置 RabbitMQ服務器連接、創建rabbitTemplate 消息模板類等,在SpringMVC的配置文件加入下面這些
<bean id="rmqProducer2" class="cn.test.spring.rabbitmq.RmqProducer"></bean> <span > <!-- 創建連接類 --> <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> <constructor-arg value="localhost" /> <property name="username" value="${rmq.manager.user}" /> <property name="password" value="${rmq.manager.password}" /> <property name="host" value="${rmq.ip}" /> <property name="port" value="${rmq.port}" /> </bean> <bean id="rabbitAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin"> <constructor-arg ref="connectionFactory" /> </bean> <!-- 創建rabbitTemplate 消息模板類 --> <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate"> <constructor-arg ref="connectionFactory"></constructor-arg> </bean> </span>
第四步:實現消息類實體和發送消息
類實體
<span >/** * 消息 * */ public class RabbitMessage implements Serializable { private static final long serialVersionUID = -6487839157908352120L; private Class<?>[] paramTypes;//參數類型 private String exchange;//交換器 private Object[] params; private String routeKey;//路由key public RabbitMessage(){} public RabbitMessage(String exchange,String routeKey,Object...params) { this.params=params; this.exchange=exchange; this.routeKey=routeKey; } @SuppressWarnings("rawtypes") public RabbitMessage(String exchange,String routeKey,String methodName,Object...params) { this.params=params; this.exchange=exchange; this.routeKey=routeKey; int len=params.length; Class[] clazzArray=new Class[len]; for(int i=0;i<len;i++) clazzArray[i]=params[i].getClass(); this.paramTypes=clazzArray; } public byte[] getSerialBytes() { byte[] res=new byte[0]; ByteArrayOutputStream baos=new ByteArrayOutputStream(); ObjectOutputStream oos; try { oos = new ObjectOutputStream(baos); oos.writeObject(this); oos.close(); res=baos.toByteArray(); } catch (IOException e) { e.printStackTrace(); } return res; } public String getRouteKey() { return routeKey; } public String getExchange() { return exchange; } public void setExchange(String exchange) { this.exchange = exchange; } public void setRouteKey(String routeKey) { this.routeKey = routeKey; } public Class<?>[] getParamTypes() { return paramTypes; } public Object[] getParams() { return params; } } </span>
發送消息
<span >/** * 生產著 * */ public class RmqProducer { @Resource private RabbitTemplate rabbitTemplate; /** * 發送信息 * @param msg */ public void sendMessage(RabbitMessage msg) { try { System.out.println(rabbitTemplate.getConnectionFactory().getHost()); System.out.println(rabbitTemplate.getConnectionFactory().getPort()); //發送信息 rabbitTemplate.convertAndSend(msg.getExchange(), msg.getRouteKey(), msg); } catch (Exception e) { } } }</span>
說明:
1. rabbitTemplate.convertAndSend(msg.getExchange(), msg.getRouteKey(), msg);
源代碼中的send調用的方法,一些發送消息幫我們實現好了。
2.上面的代碼實現沒申明交換器和隊列,RabbitMQ不知交換器和隊列他們的綁定關系,如果RabbitMQ管理器上沒有對應的交換器和隊列是不會新建的和關聯的,需要手動關聯。
我們也可以用代碼申明:
rabbitAdmin要申明:eclareExchange方法 參數是交換器
BindingBuilder.bind(queue).to(directExchange).with(queueName);//將queue綁定到exchange rabbitAdmin.declareBinding(binding);//聲明綁定關系
源代碼有這些方法:
這樣就可以實現交換器和隊列的綁定關系
交換器我們可以申明為持久化,還有使用完不會自動刪除
TopicExchange 參數的說明:name是交換器名稱,durable:true 是持久化 autoDelete:false使用完不刪除
源代碼:
隊列也可以申明為持久化
第五步:實現測試類
<span >@Resource private RmqProducer rmqProducer2; @Test public void test() throws IOException { String exchange="testExchange";交換器 String routeKey="testQueue";//隊列 String methodName="test";//調用的方法 //參數 Map<String,Object> param=new HashMap<String, Object>(); param.put("data","hello"); RabbitMessage msg=new RabbitMessage(exchange,routeKey, methodName, param); //發送消息 rmqProducer2.sendMessage(msg); }</span>
結果:RabbitMQ有一條消息
2.消費者
第一步:RabbitMQ服務器連接這些在生產者那邊已經介紹了,這邊就不介紹了,我們要配置 RabbitMQ服務器連接、創建rabbitTemplate 消息模板類、消息轉換器、消息轉換器監聽器等,在SpringMVC的配置文件加入下面這些
<span > <!-- 創建連接類 --> <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> <constructor-arg value="localhost" /> <property name="username" value="${rmq.manager.user}" /> <property name="password" value="${rmq.manager.password}" /> <property name="host" value="${rmq.ip}" /> <property name="port" value="${rmq.port}" /> </bean> <bean id="rabbitAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin"> <constructor-arg ref="connectionFactory" /> </bean> <!-- 創建rabbitTemplate 消息模板類 --> <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate"> <constructor-arg ref="connectionFactory"></constructor-arg> </bean> <!-- 創建消息轉換器為SimpleMessageConverter --> <bean id="serializerMessageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter"></bean> <!-- 設置持久化的隊列 --> <bean id="queue" class="org.springframework.amqp.core.Queue"> <constructor-arg index="0" value="testQueue"></constructor-arg> <constructor-arg index="1" value="true"></constructor-arg> <constructor-arg index="2" value="false"></constructor-arg> <constructor-arg index="3" value="false"></constructor-arg> </bean> <!--創建交換器的類型 并持久化--> <bean id="topicExchange" class="org.springframework.amqp.core.TopicExchange"> <constructor-arg index="0" value="testExchange"></constructor-arg> <constructor-arg index="1" value="true"></constructor-arg> <constructor-arg index="2" value="false"></constructor-arg> </bean> <util:map id="arguments"> </util:map> <!-- 綁定交換器、隊列 --> <bean id="binding" class="org.springframework.amqp.core.Binding"> <constructor-arg index="0" value="testQueue"></constructor-arg> <constructor-arg index="1" value="QUEUE"></constructor-arg> <constructor-arg index="2" value="testExchange"></constructor-arg> <constructor-arg index="3" value="testQueue"></constructor-arg> <constructor-arg index="4" value="#{arguments}"></constructor-arg> </bean> <!-- 用于接收消息的處理類 --> <bean id="rmqConsumer" class="cn.test.spring.rabbitmq.RmqConsumer"></bean> <bean id="messageListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter"> <constructor-arg ref="rmqConsumer" /> <property name="defaultListenerMethod" value="rmqProducerMessage"></property> <property name="messageConverter" ref="serializerMessageConverter"></property> </bean> <!-- 用于消息的監聽的容器類SimpleMessageListenerContainer,監聽隊列 queues可以傳多個--> <bean id="listenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer"> <property name="queues" ref="queue"></property> <property name="connectionFactory" ref="connectionFactory"></property> <property name="messageListener" ref="messageListenerAdapter"></property> </bean> </span>
說明:
1.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer中的queues可以傳入多個隊列
2.org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter
有哪個消費者適配器來處理 ,參數defaultListenerMethod是默認調用方法來處理消息。
3.交換器和隊列的持久化在生產者有介紹過了。
4.org.springframework.amqp.core.Binding這個類的綁定,在SpringMVC配置文件中配置時,
DestinationType這個參數要注意點
源代碼:
第二步:處理消息
<span >/** * 消費者 * */ public class RmqConsumer { public void rmqProducerMessage(Object object){ RabbitMessage rabbitMessage=(RabbitMessage) object; System.out.println(rabbitMessage.getExchange()); System.out.println(rabbitMessage.getRouteKey()); System.out.println(rabbitMessage.getParams().toString()); } }</span>
在啟動過程中會報這樣的錯誤,可能是你的交換器和隊列沒配置好
關于如何在SpringMVC項目中使用rabbitmq就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。