Page 1 of 2 12 LastLast
Results 1 to 10 of 14

Thread: JMS Message Driven Channel Adapter + Transactional

  1. #1

    Question JMS Message Driven Channel Adapter + Transactional

    The jms container sends the message in a transaction /sessionTransacted = true

    Code:
      <jms:message-driven-channel-adapter id="Reader"
                                            container="QueueContainer"
                                            message-converter="MessageConverter"
                                            channel="Process"/>
    In between there is a router that sends to different other multiple jms outbound channel adapters which uses different message-converters.

    If one of the adapters fails, the message starts from the beginning again. But what happens to the message that is already sent to the jms outbound channel adapters? Does it rollback? Looks like it is sending messages again and again. Is this expected behaviour?

  2. #2
    Join Date
    Jan 2008
    Location
    San Diego
    Posts
    780

    Default

    Quote Originally Posted by srikanthradix View Post
    The jms container sends the message in a transaction /sessionTransacted = true

    Code:
      <jms:message-driven-channel-adapter id="Reader"
                                            container="QueueContainer"
                                            message-converter="MessageConverter"
                                            channel="Process"/>
    In between there is a router that sends to different other multiple jms outbound channel adapters which uses different message-converters.

    If one of the adapters fails, the message starts from the beginning again. But what happens to the message that is already sent to the jms outbound channel adapters? Does it rollback? Looks like it is sending messages again and again. Is this expected behaviour?
    This is the expected behavior...you eventually want the message to be delivered, right?

  3. #3

    Question Yes, but not duplicates.

    Yes, I eventually want the message to be delivered but only once to each destination.

    Transacted Message comes in
    1) goes through Router to first destination (sends the message to destination)
    2) goes through Router to second destination (fails before sending message again).

    Rollback transaction
    1) goes through Router to first destination "again". So, the first destination winds up having multiple messages sent(which results in a duplicate)
    I thought when the message is rolledback, the message is removed from the destination Queue (because I thought session.commit should occur).

    So, are you saying first destination will have duplicates if the message is rolledback? We are seeing duplicate messages in the destination Queue though.

    What if we don't want to send the message to the destination Queue(s) more than once but should send them once?

  4. #4
    Join Date
    Jan 2008
    Location
    San Diego
    Posts
    780

    Default

    I'd have to see exactly what you are doing but if you are sending all of the messages in the same transaction, if one fails, all previous sends should roll back.

    Based on what you are saying, this doesn't appear to be the case. This likely means you have something misconfigured.

    Note that if you are sending these messages to different providers you will need to use XA transactions for this to work correctly.

  5. #5

    Question Our current configuration is as follows

    Code:
        
        /** Configuration for Message-Driven-Channel-Adapter Inbound **/
        
        <bean id="common.topicListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"
                abstract="true">
            <property name="connectionFactory" ref="common.topicConnectionFactory"/>
            <property name="subscriptionDurable" value="true"/>
            <property name="pubSubDomain" value="true"/>
            <property name="sessionTransacted" value="true"/>
        </bean>    
        
            <bean id="lb.broadcastTopicContainer" parent="common.topicListenerContainer">
        		<property name="destination" ref="common.broadcastTopic"/>
        		<property name="durableSubscriptionName" value="${mt2.jms.lbdurablename}"/>		
        		<property name="clientId" value="${mt2.jms.lbdurableclientid}"/>
    	</bean>
        
        
          <jms:message-driven-channel-adapter id="lb.receiver" 
            	container="lb.broadcastTopicContainer"
            	channel="lb.fromBroadcast"
        	message-converter="lb.mt2MessageConverter"/>    
        	
        	
        	/** End of configuration of Inbound adapter **/
        	
        	/** Router Configuration **/
        	<si:router input-channel="lb.fromBroadcast" ref="lb.messageTypeRouter" method="route" />
    	
    	    <!-- The bean that will route messages based on type -->
    	    <bean id="lb.messageTypeRouter" class="com.xx.TypeListRouter">
    	        <property name="name" value="lb.messageTypeRouter" />
    	        <property name="destinations">
    	            <map>
    	                <entry key="NEW_ORDER">
    	                    <list>
    	                        <value>lb.eProcessing</value>
    	                        <value>lb.aProcessing</value>
    	                        <value>lb.bProcessing</value>
    	                    </list>
    	                </entry>
    		    </map>
    	    </bean>
            </si:router>
            
            /** End of Router configuration **/
            
            
    
    	/** JMS Outbound for processing in-between **/
    	
    	....After some processing the channel lb.eProcessing will put the message into lb.toEProcess
    
    
        <jms:outbound-channel-adapter channel="lb.toEProcess"
            jms-template="lb.toEJMSTemplate" />
            
        <bean id="lb.toEJMSTemplate" parent="lb.eQueueJmsTemplate">
            <property name="defaultDestination" ref="common.EProcessSendQueue" />
        </bean>
        
        <bean id="lb.eQueueJmsTemplate" abstract="true" parent="common.queueJmsTemplate">
                <property name="messageConverter" ref="lb.mtPreserveUndeliveredMessageConverter" />
                <property name="explicitQosEnabled" value="true" />
                <property name="timeToLive" value="1000" />
        </bean>
        
      
        
        ....After some processing the channel lb.aProcessing and lb.bProcessing will put the message into lb.toSB
        
        
        <jms:outbound-channel-adapter channel="lb.toSB"
            jms-template="legacy-backend.toSBJMSTemplate" />
        
        <bean id="lb.toSBJMSTemplate" parent="lb.mtQueueJmsTemplate">
                <property name="defaultDestination" ref="common.stm" />
        </bean>
        
        <bean id="lb.mtQueueJmsTemplate" abstract="true" parent="common.queueJmsTemplate">
                <property name="messageConverter" ref="lb.mtMessageConverter" />
        </bean>
        
         <bean id="lb.mtQueueJmsTemplate" abstract="true" parent="common.queueJmsTemplate">
                <property name="messageConverter" ref="lb.mtMessageConverter" />
         </bean>
    
    
    	
    All of them use the same queueJmsTemplate
    
        	<bean id="common.queueJmsTemplate" abstract="true" parent="common.baseJmsTemplate">
        		<property name="connectionFactory" ref="common.queueConnectionFactory" />
    	</bean>
    	
    	<bean id="common.queueConnectionFactory" parent="common.userCredentialsConnectionFactoryAdapter" lazy-init="true">
    		<property name="targetConnectionFactory">
    			<bean class="com.tibco.tibjms.TibjmsQueueConnectionFactory">
    				<constructor-arg index="0"
    					value="${ems.queue.conn.factory.url}" />
    			</bean>
    		</property>
    	</bean>
    	
    	<bean id="common.userCredentialsConnectionFactoryAdapter"
    			class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter"
    			abstract="true">
    			<property name="username" value="${ems.username}" />
    			<property name="password" value="${ems.password}" />
    	</bean>
    Please let us know where and how should we use XA Transactional boundaries

  6. #6
    Join Date
    Jan 2008
    Location
    San Diego
    Posts
    780

    Default

    You have no transaction manager attached to your message listener container. That's your primary problem.

    I'm not sure why you have a custom user credentials adapter for your tibco connection factory. You can set your credentials directly on the tibco connection factory bean as you do the server url.

    This is how we had our wired up (without credentials):

    Code:
    <bean id="queueConnectionFactoryBean" class="org.springframework.jms.connection.SingleConnectionFactory" destroy-method="destroy">
          <constructor-arg>
            <bean class="com.atomikos.jms.QueueConnectionFactoryBean" init-method="init">
              <property name="xaQueueConnectionFactory" ref="queueConnectionFactoryXa"/>
            </bean>
          </constructor-arg>
          <property name="reconnectOnException"  value="true"></property>
      </bean>
      
      <bean id="queueConnectionFactoryXa"
            class="com.tibco.tibjms.TibjmsXAQueueConnectionFactory">
            <property name="serverUrl">
                <value>${tibco.naming.provider.url}</value>
            </property>
      </bean>
    
      <bean id="tibcoDestinationResolver" class="org.springframework.jms.support.destination.JndiDestinationResolver">
        <property name="jndiTemplate"><ref local="tibcoJndiTemplate" /></property>
      </bean>
      
      
      <bean id="tibcoJndiTemplate" class="org.springframework.jndi.JndiTemplate">
        <property name="environment">
          <props>
            <prop key="java.naming.factory.initial">com.tibco.tibjms.naming.TibjmsInitialContextFactory</prop>
            <prop key="java.naming.provider.url">${tibco.naming.provider.url}</prop>
            <prop key="java.naming.security.principal">admin</prop>
          </props>
        </property>
      </bean>
      
      <bean id="jmsTemplateXa" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory"><ref local="queueConnectionFactoryBean" /></property>
        <property name="destinationResolver"><ref local="tibcoDestinationResolver" /></property>
        <property name="receiveTimeout" value="-1"/>
        <!-- Atomikos needs this to be set to work correctly -->
        <property name="sessionTransacted" value="true"/>
      </bean>

  7. #7

    Question Please provide link or sample code for transaction manager for JMS.

    Can you Please provide link or sample code on how to use transaction manager for EMS Queues?
    I thought using XA ConnectionFactories and using sessionTransacted=true would be sufficient, but seems like it is not.
    Furthermore, a tx manager requires a datasource right? Since we are dealing only with JMS, what is a datasource in this case?
    Last edited by srikanthradix; Apr 15th, 2010 at 06:04 AM.

  8. #8
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,840

    Default

    The 'session-transacted' attribute will only enable "local" transactions (at the Session level). You need to add the 'transaction-manager' reference for the MessageListener container. You also need to provide a bean definition for the transactionManager using Spring's JtaTransactionManager. No data source is needed for the JTA tx manager.

  9. #9
    Join Date
    Jan 2008
    Location
    San Diego
    Posts
    780

    Default

    I'm using Atomikos as my TM so mine looks like this:

    Code:
    <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" 
              init-method="init" destroy-method="close">
             <property name="forceShutdown" value="false"/>
        </bean>
        
        <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.J2eeUserTransaction">
            <property name="transactionTimeout" value="300"/>
        </bean>
        
        <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"
            depends-on="atomikosTransactionManager,atomikosUserTransaction">
            <property name="transactionManager" ref="atomikosTransactionManager"/>
            <property name="userTransaction" ref="atomikosUserTransaction"/>
            <!-- This is required for Spring Batch set set a custom isolation level when it runs -->
            <property name="allowCustomIsolationLevels" value="true"/>
        </bean>
    
        <bean id="messageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"
            depends-on="transactionManager">
          <property name="connectionFactory" ref="queueConnectionFactoryBean"/>
          <property name="transactionManager" ref="transactionManager"/>
          <property name="destination" ref="messageDestination"/>
          <property name="sessionTransacted" value="true"/>
          <property name="maxConcurrentConsumers" value="5"/>
          <property name="concurrentConsumers" value="1"/>
          <property name="receiveTimeout" value="5000"/>
          <property name="recoveryInterval" value="60000"/>
          <property name="autoStartup" value="true"/>     
      </bean>

  10. #10

    Question Do we have to use any transaction boundaries?

    I used the same configuration that you have given me. It seems like the previous sends are not getting rolledback.
    When the router puts the message into the channel1, the message is then written to the outbound-queue1 and already been acknowledged.

    When there is an error/exception in the second channel, the message from the first send is not being rolled back. Any suggestions?

    Let me know if I am missing any other configuration

    Queue and Topic Configurations:
    Code:
        <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" 
              init-method="init" destroy-method="close">
             <property name="forceShutdown" value="false"/>
        </bean>
        
        <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.J2eeUserTransaction"/>
        
        <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"
            depends-on="atomikosTransactionManager,atomikosUserTransaction">
            <property name="transactionManager" ref="atomikosTransactionManager"/>
            <property name="userTransaction" ref="atomikosUserTransaction"/>
            <!-- This is required for Spring Batch set set a custom isolation level when it runs -->
            <property name="allowCustomIsolationLevels" value="false"/>
        </bean>
        <bean id="topicMainContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"
                abstract="true">
            <property name="connectionFactory" ref="topicConnectionFactory"/>
            <property name="subscriptionDurable" value="true"/>
            <property name="pubSubDomain" value="true"/>
            <property name="sessionTransacted" value="true"/>
            <property name="transactionManager" ref="transactionManager"></property>
            <property name="destination" ref="broadcastTopic"/>
    	<property name="durableSubscriptionName" value="${jms.durablename}"/>		
    	<property name="clientId" value="${jms.durableclientid}"/>
        </bean>
    
    
    	<bean id="topicConnectionFactory" parent="userCredentialsConnectionFactoryAdapter" lazy-init="true">
    		<property name="targetConnectionFactory">
    			<bean class="com.tibco.tibjms.TibjmsTopicConnectionFactory">
    				<constructor-arg index="0"
    					value="${ems.topic.conn.factory.url}" />
    			</bean>
    		</property>
    	</bean>
    
    	<bean id="queueConnectionFactory" parent="userCredentialsConnectionFactoryAdapter" lazy-init="true">
    		<property name="targetConnectionFactory">
    			<bean class="com.tibco.tibjms.TibjmsQueueConnectionFactory">
    				<constructor-arg index="0"
    					value="${ems.queue.conn.factory.url}" />
    			</bean>
    		</property>
    	</bean>
    Message-driven-channel-adapter gets the messages, sends it to router, When the Router sends to destinations(there are two destinations)
    Code:
             <jms:message-driven-channel-adapter id="receiver" 
             	container="topicMainContainer"
             	channel="fromBroadcast"
        		message-converter="MessageConverter"/>    
        	
    	     <bean id="QueueJmsTemplate" abstract="true" class="org.springframework.jms.core.JmsTemplate"/>
    	            <property name="messageConverter" ref="preserveUndeliveredMessageConverter" />
    	            <property name="connectionFactory" ref="queueConnectionFactory" />
    	            <property name="explicitQosEnabled" value="true" />
    	            <property name="timeToLive" value="1000" />
    	    </bean>
    
           <si:router input-channel="fromBroadcast" ref="testRouter" method="route" />
    
    	<bean id="testRouter" class="com.xx.TestRouter">
    		<property name="destinations">
    		    <map>
    			<entry key="1" value="channel1" />
    			<entry key="2" value="channel2" />
    		    </map>
    		</property>
    	</bean>
    1) Sends it to first destination channel.
    This channel puts the message into the legacy-outbound-channel-adapter.
    Code:
        <jms:outbound-channel-adapter channel="channel1"
            jms-template="toFirstDestinationQueueTemplate" />
        <bean id="toFirstDestinationQueueTemplate" parent="QueueJmsTemplate">
    	<property name="defaultDestination" ref="SendQueue1" />
        </bean>
    2) Sends it to second destination channel.
    This channel puts the message into the legacy-channel-outbound-adapter.
    Code:
        <jms:outbound-channel-adapter channel="channel2"
            			  jms-template="toSecondDestinationQueueTemplate" />
        <bean id="toSecondDestinationQueueTemplate" parent="QueueJmsTemplate">
    	  <property name="defaultDestination" ref="SendQueue2" />
        </bean>
    Last edited by srikanthradix; Apr 16th, 2010 at 02:09 PM. Reason: added code.

Tags for this Thread

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •