Results 1 to 8 of 8

Thread: JMS & JTA & Spring Integration

  1. #1
    Join Date
    Dec 2006
    Posts
    5

    Default JMS & JTA & Spring Integration

    I have been attempting to configure spring integration to use JMS as its "backing" messaging system and so far I have the sending working correctly. It sends the message in a JTA transaction, and all works well. On the receiving end is where things break down. I have not been able to receive a message within a JTA transaction, I can get it to work if I disable all of the JTA-ness (changing the ConnectionFactory and changing configuration).

    I guess my line of thinking is that the MessageListener would subscribe as transactional to the session, put that on the DirectChannel, still in the transaction, I could do my processing and then everything would work. But I can't figure out how to get the underlying message listener to subscribe in a transactional way.

    my configuration
    Code:
        <message-bus/>
        <annotation-driven/>
    
        <jee:jndi-lookup jndi-name="jms/XAConnectionFactory" id="connectionFactory" resource-ref="true"/>
        <jee:jndi-lookup jndi-name="jms/queues/queue id="queue" resource-ref="true"/>
    
        <direct-channel id="toQueue" />
        <direct-channel id="fromQueue" source="fromQueueSource"/>
    
        <jms-target id="toQueueTarget" jms-template="toQueueJMSTemplate"  />
        <jms-source id="fromQueueSource" jms-template="fromQueueJMSTemplate"/>
    
        <target-endpoint  input-channel="toQueue" target="toQueueTarget" />
    
        <source-endpoint source="fromQueueSource" channel="fromQueue">
            <schedule period="50"/>
        </source-endpoint>
    
    
        <beans:bean id="toQueueJMSTemplate" class="org.springframework.jms.core.JmsTemplate">
            <beans:property name="connectionFactory" ref="connectionFactory"/>
            <beans:property name="deliveryPersistent" value="true"/>
            <beans:property name="sessionTransacted" value="true"/>
            <beans:property name="defaultDestination" ref="queue"/>
            <beans:property name="messageConverter" ref="simpleMessageConverter"/>
        </beans:bean>
    
         <beans:bean id="fromQueueJMSTemplate" class="org.springframework.jms.core.JmsTemplate">
            <beans:property name="connectionFactory" ref="connectionFactory"/>
            <beans:property name="deliveryPersistent" value="true"/>
            <beans:property name="sessionTransacted" value="true"/>
            <beans:property name="defaultDestination" ref="queue"/>
            <beans:property name="messageConverter" ref="simpleMessageConverter"/>
        </beans:bean>
    
        <beans:bean id="simpleMessageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
    Thanks,

    Chris

  2. #2
    Join Date
    Apr 2008
    Posts
    151

    Default

    I'm not sure how to do JTA transactions with just JmsTemplate, but you can look at DefaultMessageListener using Spring's Message listener containers

  3. #3
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,854

    Default

    In Spring Integration, you can use the "jms-gateway" instead of "jms-source" in order to have a MessageListenerContainer-based approach (instead of the jmsTemplate.receive() calls). For example, you could try the following:
    Code:
    <jms-gateway id="jmsGateway"
                  destination="someQueue"
                  request-channel="someChannel"
                  expect-reply="false"/>

  4. #4
    Join Date
    Dec 2006
    Posts
    5

    Default Problems and some fixes

    On the suggestion of using the jmsGateway xml setup it does not appear to be calling start or initialize, (tried setting breakpoints) therefore its never subscribing to the JMS queue.

    So I attempted just to create it as a bean as follows.
    Code:
     <beans:bean id="jmsGateWay" class="org.springframework.integration.adapter.jms.JmsGateway" init-method="start" destroy-method="destroy">
            <beans:property name="connectionFactory" ref="connectionFactory"/>
            <beans:property name="requestChannel" ref="channel"/>
            <beans:property name="destination" ref="queue"/>
            <beans:property name="expectReply" value="false"/>
            <beans:property name="sessionTransacted" value="true"/>
        </beans:bean>
    This causes me to get an exception
    Code:
    DefaultMessageListenerContainer-2 DefaultMessageListenerContainer INFO  - Setup of JMS message listener invoker failed - trying to recover
    javax.jms.JMSException: JTA transaction required for JtaMessageConsumer
    	at com.atomikos.jms.DefaultJtaMessageConsumer.enlist(Unknown Source)
    	at com.atomikos.jms.DefaultJtaMessageConsumer.receive(Unknown Source)
    	at com.atomikos.jms.DefaultJtaMessageConsumer.receive(Unknown Source)
    	at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:404)
    	at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:307)
    	at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:260)
    	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:944)
    	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:868)
    	at java.lang.Thread.run(Thread.java:613)
    This is caused by DefaultMessageListenerContainer / AbstractPollingMessageListenerContainer not having a reference to the transactionManager and there being no way to pass the transactionManager through the jmsGateway (since jmsGateway by default creates an instance of DefaultMessageListenerContainer) and jmsGateway does not have a transactionManager property.

    Therefore I finally change the code to this.
    Code:
    <beans:bean id="jmsGateWay" class="org.springframework.integration.adapter.jms.JmsGateway" init-method="start" destroy-method="destroy">
            <beans:property name="connectionFactory" ref="connectionFactory"/>
            <beans:property name="requestChannel" ref="channel"/>
            <beans:property name="destination" ref="queue"/>
            <beans:property name="expectReply" value="false"/>
            <beans:property name="sessionTransacted" value="true"/>
            <beans:property name="container" ref="messageListenerContainer"/>
        </beans:bean>
    
        <beans:bean id="messageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <beans:property name="connectionFactory" ref="connectionFactory"/>
            <beans:property name="destination" ref="queue"/>
            <beans:property name="transactionManager" ref="transactionManager"/>
            <beans:property name="sessionTransacted" value="true"/>
        </beans:bean>
    and my test finally passes.

    So two suggestions.

    1. Make sure jmsGateway calls setup and destroy methods (as needed), or prove me wrong - I could have easily screwed up my configuration.
    2. Add the transactionManager property to jmsGateway (xml schema and object) and pass it through to the DefaultMessageListenerContainer.


    This has been posted jira as INT-237

    Thanks,

    Chris

  5. #5
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,854

    Default

    Chris,

    (first, what version of Spring Integration are you using?)

    The JmsGateway should be calling start(). This is the expected behavior:
    1) A "jms-gateway" element is parsed
    2) the parser registers an instance of JmsGateway (which implements Lifecycle)
    3) when the ApplicationContext instantiation process completes it fires an event
    4) MessageBus receives the event, and if 'auto-startup' is true (the default), it will start()
    5) any endpoints that implement Lifecycle will be started from the MessageBus' start() method
    6) a JmsGateway will start() its MessageListener container

    I would be interested if you could double-check the breakpoint knowing that order of events.

    The configuration of a transaction-manager reference for the 'jms-gateway' element is definitely needed - as well as some other configuration parameters.

    Thanks,
    Mark

  6. #6
    Join Date
    Dec 2006
    Posts
    5

    Default

    My current version from the manifest is
    Integration - Bundle-Version: 1.0.0.M4
    Spring - Spring-Version: 2.5.4

    1) Happens
    2) Something funky appears to be happening here. LifeCycle start methods don't appear to be getting called. The MessageBus start is called from the onApplicationEvent method which is why it starts up successfully. But I still never see JmsGateway.start() get called and I only see AbstractApplicationContext.getLifeCycleBeans() get called during shutdown and I never see AbstractApplicationContext.start() get called but I am not fore sure if its really supposed to (is it only used for sub contexts?)
    3) Happens
    4) Happens though before I didn't have auto-startup set to true
    5) See that
    6) JmsGateway.start() never gets called

    So I don't know if this is a problem with spring or spring-integration

  7. #7
    Join Date
    Dec 2006
    Posts
    5

    Default

    One of the things I did was do a search for calls to start and I didn't find many and none in the test case framework.

    Could part of the problem the usage of context.refresh() vs context.start() and how spring integration is using context.start()?


    Chris

  8. #8
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,854

    Default

    Chris,

    Thanks for reporting this. The issues have now been resolved. See the brief description in the JIRA comments here: http://jira.springframework.org/browse/INT-237

    Since the source had been refactored to a "gateway" it was no longer being automatically recognized by the MessageBus. Now, the MessageBus also recognizes any gateway within the application context that also implements Lifecycle. So, when the MessageBus starts (after the application context refresh event), it will also start the JmsGateway (which in turn starts the MessageListener container).

    Also, the <jms-gateway/> element now supports a "transaction-manager" attribute to reference the bean name of your PlatformTransactionManager instance.

    Whenever you get a chance, please try out the SVN head version (r661 or later) and let us know how it goes.

    Thanks again,
    Mark

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •