Results 1 to 6 of 6

Thread: Transaction demarcation on Gateway with JMS

  1. #1
    Join Date
    Feb 2010
    Posts
    14

    Default Transaction demarcation on Gateway with JMS

    Hi

    I'm bringing Spring and Spring Integration into an existing Java program (runs standalone, not in a web environment) in order to replace a shared database integration with a messaging system.

    I'm looking for some guidance on publishing a JMS message to a queue within a transaction boundary.

    I have a service method that needs to carry out two functions:
    1. Using a Spring Integration Gateway get a message onto the bus; publishing to a JMS queue (direct channels, single thread)
    2. Assuming part 1 succeeds, send a TCP/IP packet over legacy system.
    If part 2 fails (IOException) then I want the JMS transaction to rollback.

    So I am thinking I need to annotate the service method something like
    Code:
    @Transactional(value="jmsTransactionManager",rollbackFor={IOException.class})
    .

    Am I on the right lines here? Is it possible to create this kind of transaction boundary with S-I ?

    Jon

  2. #2
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,844

    Default

    Assuming you add that annotation to the Gateway interface method that you are invoking, and that method declares IOException in its 'throws' clause it should work. Are you planning on invoking two separate gateway methods... one for the JMS sending and one for the TCP sending? Or were you planing on having all of that behind the same gateway interface invocation?

  3. #3
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,040

    Default

    Assuming you are doing all the work in a single SI message flow (behind a gateway), it will work, but you don't need to rollbackFor an IOException because the IOException will be wrapped in a MessagingException which is a RuntimeException and default rollback rules will rollback on a RuntimeException.

    I just ran a test (based on the tcp-client-server sample) and all worked as expected. I used a <recipient-list-router /> to send the message to jms and then tcp...

    Code:
    public interface SimpleGateway {
    
    	@Transactional(value="txManager")
    	public void send(String text);
    	
    }
    Code:
    @ContextConfiguration("/META-INF/spring/integration/jmsAndTcpClientTxDemo-context.xml")
    @RunWith(SpringJUnit4ClassRunner.class)
    public class JmsAndTcpClientTxDemoTest {
    
    	@Autowired 
    	SimpleGateway gw;
    	
    	@Autowired
    	JmsTemplate jmsTemplate;
    	
    	@Test
    	public void testHappyDay() {
    		String text = "Hello world!";
    		try {
    			gw.send(text);
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    		jmsTemplate.setReceiveTimeout(100);
    		Object jmsMessage = jmsTemplate.receiveAndConvert();
    		assertEquals(text, jmsMessage);
    	}
    
    }
    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans:beans xmlns:beans="http://www.springframework.org/schema/beans"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xmlns="http://www.springframework.org/schema/integration"
    	xmlns:ip="http://www.springframework.org/schema/integration/ip"
    	xmlns:jms="http://www.springframework.org/schema/integration/jms"
    	xmlns:tx="http://www.springframework.org/schema/tx"
    	xsi:schemaLocation="http://www.springframework.org/schema/integration/ip http://www.springframework.org/schema/integration/ip/spring-integration-ip.xsd
    		http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
    		http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
    		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    		http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">
    
    	<!-- Client side -->
    	<tx:annotation-driven/>
    	
    	<gateway id="gw" 
                 service-interface="org.springframework.integration.samples.tcpclientserver.SimpleGateway"
                 default-request-channel="toRouter"/>
            
            <recipient-list-router input-channel="toRouter">
            	<recipient channel="toJms"/>
         	        <recipient channel="toTcp"/>
            </recipient-list-router>
    
    	<ip:tcp-connection-factory id="client"
    		type="client"
    		host="localhost"
    		port="11111"
    		single-use="true"
    		so-timeout="10000"
            />
    	
    	<channel id="toJms" />
    	
    	<jms:outbound-channel-adapter
    		channel="toJms"
    		connection-factory="connectionFactory"
    		destination="queue"/>
    
            <!-- only for message retrieval in test case -->
    	<beans:bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    		<beans:property name="connectionFactory" ref="connectionFactory" />
    		<beans:property name="defaultDestination" ref="queue" />
    	</beans:bean>
    
    	<channel id="toTcp" />
    	
    	<ip:tcp-outbound-channel-adapter
    		channel="toTcp"
    		connection-factory="client"
    		/>
    
    	<beans:bean id="txManager" class="org.springframework.jms.connection.JmsTransactionManager">
    		<beans:property name="connectionFactory" ref="connectionFactory"/>
    	</beans:bean>
    
    	<beans:bean id="queue" class="org.apache.activemq.command.ActiveMQQueue">
    		<beans:constructor-arg value="test.queue"/>
    	</beans:bean>
    
    	<beans:bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    		<beans:property name="targetConnectionFactory">
    			<beans:bean class="org.apache.activemq.ActiveMQConnectionFactory">
    				<beans:property name="brokerURL" value="vm://localhost"/>		
    			</beans:bean>
    		</beans:property>
    		<beans:property name="sessionCacheSize" value="10"/>
    		<beans:property name="cacheProducers" value="false"/>
    	</beans:bean>
    	
    	<!-- Server side -->
    	
    	<ip:tcp-connection-factory id="crLfServer"
    		type="server"
    		port="11111"/>
    			
    	<ip:tcp-inbound-channel-adapter
    		connection-factory="crLfServer"
    		channel="serverBytes2StringChannel"/>
    		
    	<transformer id="serverBytes2String"
    		input-channel="serverBytes2StringChannel"
    		output-channel="toSA" 
    		expression="new String(payload)"/>
    
    	<channel id="toSA" />
    
    	<service-activator input-channel="toSA"
    					   ref="someService"
    					   method="test1"
    	/>
    
    	<beans:bean id="someService" 
    		  class="org.springframework.integration.samples.tcpclientserver.SomeService" />
    		  
    		  
    </beans:beans>
    If you change the port on the client connection factory to, say, 11112 and re-run the test, it fails to retrieve the message because it was rolled back after the tcp operation failed to create a connection.

    With debugging turned on, you can also see the commit or rollback in each case.

    Hope that helps.

  4. #4
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,040

    Default

    Of course, if you are doing the TCP I/O yourself (*after* you send the message via a gateway), then a method annotated as you suggest, where the method is in a spring bean, and encapsulates the gateway call *and* the socket I/O, will work too.

  5. #5
    Join Date
    Feb 2010
    Posts
    14

    Default

    Thanks for the replies.

    Gary, it's the scenario in your second post that I am dealing with. Here is an idea of the code:

    I've not yet got the transactions working so I guess its something wrong with my context configuration. I'll work through it again now and then post back later with my xml config.

    Code:
    @Service
    public class MyService {
    
        @Autowired
        private DefaultGateway gateway;
    
        @Transactional(value="jmsTransactionManager",rollbackFor={IOException.class})
        public void respondAndRelay(Session session, Packet response, byte[] smppCommand) throws IOException {
    
            // get the response onto the messaging bus (JMS Queue)
            this.gateway.relaySmppCommand(smppCommand);
    
            // legacy response over existing tcp/ip socket connection. Only sends if JMS publication succeeded 
            if (!session.sendPDU(response)) {
                // must rollback jms publication
                throw new IOException("Failed to send smpp response pdu");
            }
    
            
    
        }

  6. #6
    Join Date
    Feb 2010
    Posts
    14

    Default

    I think maybe I'm misunderstanding JMS transactions here (its been a while!). I've been trying to rollback just the publishing side of the JMS operation instead of the consumption. As the consumers are in a different process and connecting to the broker over tcp/ip I guess I'm going to have to introduce XA transactions here too.

Tags for this Thread

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •