您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關ActiveMQ要入門什么,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
1. 發布消息
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class BookProducer implements Runnable{ public static final String BROKER_URL = "tcp://localhost:61616"; @Override public void run() { try { //1.創建連接工廠,指定ip和端口 ConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL); //2.使用連接工廠創建一個連接對象 Connection connection = factory.createConnection(); //3.開啟連接(JMS會話) connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //使用會話創建目的地 /** * ① 點對點(Point-to-Point)。在點對點的消息系統中,消息分發給一個單獨的使用者。點對點消息往往與隊列(javax.jms.Queue)相關聯。 * ② 發布/訂閱(Publish/Subscribe)。發布/訂閱消息系統支持一個事件驅動模型,消息生產者和消費者都參與消息的傳遞。生產者發布事件, 而使用者訂閱感興趣的事件,并使用事件。該類型消息一般與特定的主題(javax.jms.Topic)關聯。 */ Destination destination = session.createQueue("book-broker") //創建生產者/消費者 MessageProducer producer = session.createProducer(destination); // MessageConsumer consumer = session.createConsumer(destination); //consumer.receive(); /** * 創建消息,支持的消息類型: * TextMessage * MapMessage * ObjectMessage:對象需要實現序列化接口 * BytesMessage * StreamMessage */ Message message = session.createTextMessage("我是一個香蕉......."); //發送消息 producer.send(message); //釋放資源 producer.close(); session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } }
2. 接收消息
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.JMSException; import javax.jms.Session; import javax.jms.TextMessage; public class BookConsumer implements Runnable { @Override public void run() { try { var connection = new ActiveMQConnectionFactory(BookProducer.BROKER_URL).createConnection(); connection.start(); /** * connection.createSession(boolean transacted, int acknowledgeMode); * transacted:是否使用事務 * acknowledgeMode:應答模式 * AUTO_ACKNOWLEDGE:自動應答 * 對于同步消費者,receive方法調用返回,且沒有異常發生時,將自動對收到的消息予以確認. * 對于異步消息,當onMessage方法返回,且沒有異常發生時,即對收到的消息自動確認. * CLIENT_ACKNOWLEDGE:客戶端手動應答 * 這種方式要求客戶端使用javax.jms.Message.acknowledge()方法完成確認. * DUPS_OK_ACKNOWLEDGE:延時//批量通知 * 這種確認方式允許JMS不必急于確認收到的消息,允許在收到多個消息之后一次完成確認, 與Auto_AcKnowledge相比,這種確認方式在某些情況下可能更有效, 因為沒有確認,當系統崩潰或者網絡出現故障的時候,消息可以被重新傳遞. * 使用事務消息確認模式: * SESSION_TRANSACTED */ var session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); var consumer = session.createConsumer(session.createQueue("tmall-queue")); var message = ((TextMessage)consumer.receive()).getText(); System.out.println(message); session.close(); connection.close(); } }
或者設置監聽器接收(消費者不用一直在線,監聽到消息自動接收)
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.JMSException; import javax.jms.Session; import javax.jms.TextMessage; public class BookConsumer implements Runnable { @Override public void run() { try { var connection = new ActiveMQConnectionFactory(BookProducer.BROKER_URL).createConnection(); connection.start(); var session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); var consumer = session.createConsumer(session.createQueue("tmall-queue")); consumer.setMessageListener(message -> { try { System.out.println(((TextMessage) message).getText()); }catch (JMSException e){ e.printStackTrace(); } }); } catch (JMSException e) { e.printStackTrace(); } } }
3. java內嵌ActiveMQ,自動啟動一個ActiveMQ,不需要Linux啟動
import org.apache.activemq.broker.BrokerService; public class Broker { //導入依賴compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.9.9' public void producter(){ BrokerService brokerService = new BrokerService(); brokerService.setUseJmx(true);//設置Broker的服務是否應該公開給JMX try { brokerService.addConnector("tcp://localhost:61616"); brokerService.start(); } catch (Exception e) { e.printStackTrace(); } } }
以上就是ActiveMQ要入門什么,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。