您好,登錄后才能下訂單哦!
Spring Integration Java DSL已經融合到Spring Integration Core 5.0,這是一個聰明而明顯的舉動,因為:
讓我們看看基于ActiveMQ JMS的示例如何使用它。
Maven依賴:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-core</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-jms</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-kahadb-store</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-java-dsl --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-java-dsl</artifactId> <version>1.2.3.RELEASE</version> </dependency> </dependencies>
示例1:Jms入站網關
我們有以下ServiceActivator:
@Service public class ActiveMQEndpoint { @ServiceActivator(inputChannel = "inboundChannel") public void processMessage(final String inboundPayload) { System.out.println("Inbound message: "+inboundPayload); } }
如果您想使用SI Java DSL 將inboundPayload從Jms隊列發送到Gateway風格的激活器,那么請使用DSLJms工廠:
@Bean public DynamicDestinationResolver dynamicDestinationResolver() { return new DynamicDestinationResolver(); } @Bean public ActiveMQConnectionFactory connectionFactory() { return new ActiveMQConnectionFactory(); } @Bean public DefaultMessageListenerContainer listenerContainer() { final DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer(); defaultMessageListenerContainer.setDestinationResolver(dynamicDestinationResolver()); defaultMessageListenerContainer.setConnectionFactory(connectionFactory()); defaultMessageListenerContainer.setDestinationName("jms.activeMQ.Test"); return defaultMessageListenerContainer; } @Bean public MessageChannel inboundChannel() { return MessageChannels.direct("inboundChannel").get(); } @Bean public JmsInboundGateway dataEndpoint() { return Jms.inboundGateway(listenerContainer()) .requestChannel(inboundChannel()).get(); }
通過dataEndpoint bean 返回JmsInboundGatewaySpec,您還可以向SI通道或Jms目標發送回復。查看文檔。
示例2:Jms消息驅動的通道適配器
如果您正在尋找替換消息驅動通道適配器的XML JMS配置,那么JmsMessageDrivenChannelAdapter是一種適合您的方式:
@Bean public DynamicDestinationResolver dynamicDestinationResolver() { return new DynamicDestinationResolver(); } @Bean public ActiveMQConnectionFactory connectionFactory() { return new ActiveMQConnectionFactory(); } @Bean public DefaultMessageListenerContainer listenerContainer() { final DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer(); defaultMessageListenerContainer.setDestinationResolver(dynamicDestinationResolver()); defaultMessageListenerContainer.setConnectionFactory(connectionFactory()); defaultMessageListenerContainer.setDestinationName("jms.activeMQ.Test"); return defaultMessageListenerContainer; } @Bean public MessageChannel inboundChannel() { return MessageChannels.direct("inboundChannel").get(); } @Bean public JmsMessageDrivenChannelAdapter dataEndpoint() { final ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener = new ChannelPublishingJmsMessageListener(); channelPublishingJmsMessageListener.setExpectReply(false); final JmsMessageDrivenChannelAdapter messageDrivenChannelAdapter = new JmsMessageDrivenChannelAdapter(listenerContainer(), channelPublishingJmsMessageListener ); messageDrivenChannelAdapter.setOutputChannel(inboundChannel()); return messageDrivenChannelAdapter; }
與前面的示例一樣,入站有效負載如樣本1中一樣發送給激活器。
示例3:使用JAXB的Jms消息驅動的通道適配器
在典型的場景中,您希望通過Jms接受XML作為文本消息,將其轉換為JAXB存根并在服務激活器中處理它。我將向您展示如何使用SI Java DSL執行此操作,但首先讓我們為xml處理添加兩個依賴項:
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-xml</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-oxm</artifactId> </dependency>
我們將通過JMS接受shiporders ,所以首先XSD命名為shiporder.xsd:
<?xml version="1.0" encoding="UTF-8" ?> <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"> <xs:element name="shiporder"> <xs:complexType> <xs:sequence> <xs:element name="orderperson" type="xs:string"/> <xs:element name="shipto"> <xs:complexType> <xs:sequence> <xs:element name="name" type="xs:string"/> <xs:element name="address" type="xs:string"/> <xs:element name="city" type="xs:string"/> <xs:element name="country" type="xs:string"/> </xs:sequence> </xs:complexType> </xs:element> <xs:element name="item" maxOccurs="unbounded"> <xs:complexType> <xs:sequence> <xs:element name="title" type="xs:string"/> <xs:element name="note" type="xs:string" minOccurs="0"/> <xs:element name="quantity" type="xs:positiveInteger"/> <xs:element name="price" type="xs:decimal"/> </xs:sequence> </xs:complexType> </xs:element> </xs:sequence> <xs:attribute name="orderid" type="xs:string" use="required"/> </xs:complexType> </xs:element> </xs:schema>
新增JAXB maven plugin 生成JAXB存根:
<plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>jaxb2-maven-plugin</artifactId> <version>2.3.1</version> <executions> <execution> <id>xjc-schema1</id> <goals> <goal>xjc</goal> </goals> <configuration> <!-- Use all XSDs under the west directory for sources here. --> <sources> <source>src/main/resources/xsds/shiporder.xsd</source> </sources> <!-- Package name of the generated sources. --> <packageName>com.example.stubs</packageName> <outputDirectory>src/main/java</outputDirectory> <clearOutputDir>false</clearOutputDir> </configuration> </execution> </executions> </plugin>
我們已經準備好了存根類和一切,現在使用Jaxb magic的Java DSL JMS消息驅動適配器:
/** * Sample 3: Jms message driven adapter with JAXB */ @Bean public JmsMessageDrivenChannelAdapter dataEndpoint() { final ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener = new ChannelPublishingJmsMessageListener(); channelPublishingJmsMessageListener.setExpectReply(false); channelPublishingJmsMessageListener.setMessageConverter(new MarshallingMessageConverter(shipOrdersMarshaller())); final JmsMessageDrivenChannelAdapter messageDrivenChannelAdapter = new JmsMessageDrivenChannelAdapter(listenerContainer(), channelPublishingJmsMessageListener ); messageDrivenChannelAdapter.setOutputChannel(inboundChannel()); return messageDrivenChannelAdapter; } @Bean public Jaxb2Marshaller shipOrdersMarshaller() { Jaxb2Marshaller marshaller = new Jaxb2Marshaller(); marshaller.setContextPath("com.example.stubs"); return marshaller; }
XML配置在Java中使用它可以為您提供如此強大的功能和靈活性。要完成此示例,inboundChannel的服務激活器將如下所示:
/** * Sample 3 * @param shiporder */ @ServiceActivator(inputChannel = "inboundChannel") public void processMessage(final Shiporder shiporder) { System.out.println(shiporder.getOrderid()); System.out.println(shiporder.getOrderperson()); }
要測試流,您可以使用以下XML通過JConsole發送到JMS隊列:
<?xml version="1.0" encoding="UTF-8"?> <shiporder orderid="889923" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="shiporder.xsd"> <orderperson>John Smith</orderperson> <shipto> <name>Ola Nordmann</name> <address>Langgt 23</address> <city>4000 Stavanger</city> <country>Norway</country> </shipto> <item> <title>Empire Burlesque</title> <note>Special Edition</note> <quantity>1</quantity> <price>10.90</price> </item> <item> <title>Hide your heart</title> <quantity>1</quantity> <price>9.90</price> </item> </shiporder>
示例4:具有JAXB和有效負載根路由的Jms消息驅動的通道適配器
另一種典型情況是接受XML作為JMS文本消息,將其轉換為JAXB存根并根據有效負載根類型將有效負載路由到某個服務激活器。當然SI Java DSL支持所有類型的路由,我將向您展示如何根據有效載荷類型進行路由。
首先,將以下XSD添加到shiporder.xsd所在的文件夾中,并將其命名為purchaseorder.xsd:
<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:tns="http://tempuri.org/PurchaseOrderSchema.xsd" targetNamespace="http://tempuri.org/PurchaseOrderSchema.xsd" elementFormDefault="qualified"> <xsd:element name="PurchaseOrder"> <xsd:complexType> <xsd:sequence> <xsd:element name="ShipTo" type="tns:USAddress" maxOccurs="2"/> <xsd:element name="BillTo" type="tns:USAddress"/> </xsd:sequence> <xsd:attribute name="OrderDate" type="xsd:date"/> </xsd:complexType> </xsd:element> <xsd:complexType name="USAddress"> <xsd:sequence> <xsd:element name="name" type="xsd:string"/> <xsd:element name="street" type="xsd:string"/> <xsd:element name="city" type="xsd:string"/> <xsd:element name="state" type="xsd:string"/> <xsd:element name="zip" type="xsd:integer"/> </xsd:sequence> <xsd:attribute name="country" type="xsd:NMTOKEN" fixed="US"/> </xsd:complexType> </xsd:schema>
然后添加到jaxb maven插件配置:
<plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>jaxb2-maven-plugin</artifactId> <version>2.3.1</version> <executions> <execution> <id>xjc-schema1</id> <goals> <goal>xjc</goal> </goals> <configuration> <!-- Use all XSDs under the west directory for sources here. --> <sources> <source>src/main/resources/xsds/shiporder.xsd</source> <source>src/main/resources/xsds/purchaseorder.xsd</source> </sources> <!-- Package name of the generated sources. --> <packageName>com.example.stubs</packageName> <outputDirectory>src/main/java</outputDirectory> <clearOutputDir>false</clearOutputDir> </configuration> </execution> </executions> </plugin>
運行mvn clean install以生成新XSD的JAXB存根。現在承諾有效負載根映射:
@Bean public Jaxb2Marshaller ordersMarshaller() { Jaxb2Marshaller marshaller = new Jaxb2Marshaller(); marshaller.setContextPath("com.example.stubs"); return marshaller; } /** * Sample 4: Jms message driven adapter with Jaxb and Payload routing. * @return */ @Bean public JmsMessageDrivenChannelAdapter dataEndpoint() { final ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener = new ChannelPublishingJmsMessageListener(); channelPublishingJmsMessageListener.setMessageConverter(new MarshallingMessageConverter(ordersMarshaller())); final JmsMessageDrivenChannelAdapter messageDrivenChannelAdapter = new JmsMessageDrivenChannelAdapter(listenerContainer(), channelPublishingJmsMessageListener ); messageDrivenChannelAdapter.setOutputChannel(inboundChannel()); return messageDrivenChannelAdapter; } @Bean public IntegrationFlow payloadRootMapping() { return IntegrationFlows.from(inboundChannel()).<Object, Class<?>>route(Object::getClass, m->m .subFlowMapping(Shiporder.class, sf->sf.handle((MessageHandler) message -> { final Shiporder shiporder = (Shiporder) message.getPayload(); System.out.println(shiporder.getOrderperson()); System.out.println(shiporder.getOrderid()); })) .subFlowMapping(PurchaseOrder.class, sf->sf.handle((MessageHandler) message -> { final PurchaseOrder purchaseOrderType = (PurchaseOrder) message.getPayload(); System.out.println(purchaseOrderType.getBillTo().getName()); })) ).get(); }
注意payloadRootMapping bean,讓我們解釋一下重要的部分:
要測試ShipOrder有效負載,請使用示例3中的XML,以測試PurchaseOrder有效負載,使用以下XML:
<?xml version="1.0" encoding="utf-8"?> <PurchaseOrder OrderDate="1900-01-01" xmlns="http://tempuri.org/PurchaseOrderSchema.xsd"> <ShipTo country="US"> <name>name1</name> <street>street1</street> <city>city1</city> <state>state1</state> <zip>1</zip> </ShipTo> <ShipTo country="US"> <name>name2</name> <street>street2</street> <city>city2</city> <state>state2</state> <zip>-79228162514264337593543950335</zip> </ShipTo> <BillTo country="US"> <name>name1</name> <street>street1</street> <city>city1</city> <state>state1</state> <zip>1</zip> </BillTo> </PurchaseOrder>
應根據subflow 子流Map路由兩個有效載荷。
示例5:IntegrationFlowAdapter
除了企業集成模式的其他實現(check them out)),我需要提到IntegrationFlowAdapter。通過擴展此類并實現buildFlow方法,如:
[url=https://bitbucket.org/Component/]@Component[/url] public class MyFlowAdapter extends IntegrationFlowAdapter { @Autowired private ConnectionFactory rabbitConnectionFactory; @Override protected IntegrationFlowDefinition<?> buildFlow() { return from(Amqp.inboundAdapter(this.rabbitConnectionFactory, "myQueue")) .<String, String>transform(String::toLowerCase) .channel(c -> c.queue("myFlowAdapterOutput")); }
你可以將bean的重復聲明包裝成一個組件并給它們所需的流量。然后可以配置這樣的組件并將其作為一個類實例提供給調用代碼!
因此,讓我們舉例說明這個repo中的示例3更短一些,并為所有JmsEndpoints定義基類,并在其中定義重復bean:
public class JmsEndpoint extends IntegrationFlowAdapter { private String queueName; private String channelName; private String contextPath; /** * @param queueName * @param channelName * @param contextPath */ public JmsEndpoint(String queueName, String channelName, String contextPath) { this.queueName = queueName; this.channelName = channelName; this.contextPath = contextPath; } @Override protected IntegrationFlowDefinition<?> buildFlow() { return from(Jms.messageDrivenChannelAdapter(listenerContainer()) .jmsMessageConverter(new MarshallingMessageConverter(shipOrdersMarshaller())) ).channel(channelName); } @Bean public Jaxb2Marshaller shipOrdersMarshaller() { Jaxb2Marshaller marshaller = new Jaxb2Marshaller(); marshaller.setContextPath(contextPath); return marshaller; } @Bean public DynamicDestinationResolver dynamicDestinationResolver() { return new DynamicDestinationResolver(); } @Bean public ActiveMQConnectionFactory connectionFactory() { return new ActiveMQConnectionFactory(); } @Bean public DefaultMessageListenerContainer listenerContainer() { final DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer(); defaultMessageListenerContainer.setDestinationResolver(dynamicDestinationResolver()); defaultMessageListenerContainer.setConnectionFactory(connectionFactory()); defaultMessageListenerContainer.setDestinationName(queueName); return defaultMessageListenerContainer; } @Bean public MessageChannel inboundChannel() { return MessageChannels.direct(channelName).get(); } }
現在聲明特定隊列的Jms端點很容易:
@Bean public JmsEndpoint jmsEndpoint() { return new JmsEndpoint("jms.activeMQ.Test", "inboundChannel", "com.example.stubs"); }
inboundChannel的服務激活器:
/** * Sample 3, 5 * @param shiporder */ @ServiceActivator(inputChannel = "inboundChannel") public void processMessage(final Shiporder shiporder) { System.out.println(shiporder.getOrderid()); System.out.println(shiporder.getOrderperson()); }
您不應該錯過在項目中使用IntegrationFlowAdapter。我喜歡它的概念。
我最近在Embedit的新的基于Spring Boot的項目中開始使用Spring Integration Java DSL 。即使有一些配置,我發現它非常有用。
源碼地址:https://bitbucket.org/tomask79/spring-integration-java-dsl
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持億速云。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。