Results 1 to 3 of 3

Thread: TransactionAwareConnectionFactoryProxy under JTA (Atomikos) & ActiveMQ

  1. #1
    Join Date
    Jul 2012
    Posts
    2

    Default TransactionAwareConnectionFactoryProxy under JTA (Atomikos) & ActiveMQ

    Hi,
    I set up a junit testcase (standalone application) using:
    • Spring v. 3.1.0.RELEASE
    • Atomikos JTA implementation v. 3.8.0
    • ActiveMQ JMS provider v 5.6.0

    This testcase runs a bean method under container-managed transaction which just sends a JMS message to some destination.
    I am using org.springframework.jms.connection.TransactionAwar eConnectionFactoryProxy for accessing JMS connection.
    I am facing two problems:
    1. JMS connection obtained thru connectionFactory.createConnection is not closed (and returned into pool) on transaction completion
    2. Invoking close on JMS connection in bean method delegates method invocation to underlying connection even if the transaction is still active and subsequent JTA transaction commit fails (I was looking into TransactionAwareDataSourceProxy and this does not happen there, connection is released on transaction completion).


    Please anyone has an idea what i am doing wrong (in code, configuration) or generally where the problem could be. Below is my code and spring configuration file...

    Thank you

    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
      xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/tx
    http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
    http://activemq.apache.org/schema/core
    http://activemq.apache.org/schema/core/activemq-core.xsd">
    
      <context:component-scan base-package="cz.gpe.spring.preview.tx.jms.cfp">
        <context:include-filter type="annotation" expression="javax.ejb.Stateless" />
      </context:component-scan>
    
      <!-- enable the configuration of transactional behavior based on annotations -->
      <tx:annotation-driven transaction-manager="txManager" />
    
      <!-- ActiveMQ JMS Broker -->
      <amq:broker id="my-broker" useJmx="false" persistent="false">
        <amq:transportConnectors>
          <amq:transportConnector uri="tcp://localhost:61617" />
        </amq:transportConnectors>
      </amq:broker>
      <!-- ActiveMQ JMS XA CF -->
      <bean id="amq.connectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory" depends-on="my-broker">
        <property name="brokerURL" value="tcp://localhost:61617?jms.redeliveryPolicy.maximumRedeliveries=2" />
      </bean>
    
      <!-- ActiveMQ JMS CF -->
      <bean id="amq.test.connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" depends-on="my-broker">
        <property name="brokerURL" value="tcp://localhost:61617?jms.redeliveryPolicy.maximumRedeliveries=2" />
      </bean>
    
      <!-- The Atomikos JTA-enabled ConnectionFactory, configured with the vendor's XA factory. -->
      <bean id="atomikosConnectionFactoryBean" class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init" destroy-method="close" depends-on="my-broker">
        <!-- The unique resource name needed for recovery by the Atomikos core. -->
        <property name="uniqueResourceName">
          <value>JMS_BROKER_CFP</value>
        </property>
        <property name="xaConnectionFactory" ref="amq.connectionFactory" />
        <property name="localTransactionMode" value="false" />
      </bean>
    
      <!-- Spring transaction-aware CF proxy, this is what programmer physically uses in a code -->
      <bean name="jms/connectionFactory" class="org.springframework.jms.connection.TransactionAwareConnectionFactoryProxy">
        <property name="targetConnectionFactory" ref="atomikosConnectionFactoryBean" />
        <property name="synchedLocalTransactionAllowed" value="true" />
      </bean>
    
      <bean name="jms/queue" class="org.apache.activemq.command.ActiveMQQueue">
        <property name="physicalName">
          <value>SPRING_PREVIEW_QUEUE</value>
        </property>
      </bean>
    
      <bean id="transactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close" depends-on="userTransaction">
        <!-- IMPORTANT: disable startup because the userTransactionService above does this -->
        <property name="startupTransactionService" value="false" />
    
        <!-- when close is called, should we force transactions to terminate or not? -->
        <property name="forceShutdown" value="false" />
      </bean>
    
      <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory">
          <ref bean="atomikosConnectionFactoryBean" />
        </property>
        <property name="defaultDestination">
          <ref bean="jms/queue" />
        </property>
        <property name="sessionTransacted" value="true" />
      </bean>
    
      <bean id="userTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
        <property name="transactionTimeout" value="30000" />
      </bean>
    
      <bean id="txManager" class="org.springframework.transaction.jta.JtaTransactionManager">
        <property name="userTransaction" ref="userTransaction" />
        <property name="transactionManager" ref="transactionManager" />
      </bean>
    </beans>
    Code:
    @Stateless
    @Local
    @TransactionAttribute
    public class ConnectionFactoryProxyBean {
    
    	@Resource(name = "jms/queue")
    	private Destination destination;
    
    	@Resource(name = "jms/connectionFactory")
    	private ConnectionFactory cf;
    	
    	@Resource
    	private JmsTemplate jmsTemplate;
    
    	public void sendMessage(final String message) {
    		Connection connection = null;
    		try {
    			connection = cf.createConnection();
    			Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    			MessageProducer producer = session.createProducer(destination);
    			producer.send(session.createTextMessage(message));
    		}
    		catch (JMSException e) {
    			throw new RuntimeException(e);
    		}
    		finally {
    			if (connection != null) {
    				try {
    				    //if i close the connection here i get "org.springframework.jms.connection.SynchedLocalTransactionFailedException: Local JMS transaction failed to commit; nested exception is javax.jms.IllegalStateException: Session was closed already - calling commit is no longer allowed." when the JTA transaction is commited. See the stacktrace below...
                                       connection.close();
    				}
    				catch (Exception e) {
    					e.printStackTrace();
    				}
    			}
    		}
    	}
    }
    Stacktrace for problem #2
    Code:
    org.springframework.jms.connection.SynchedLocalTransactionFailedException: Local JMS transaction failed to commit; nested exception is javax.jms.IllegalStateException: Session was closed already - calling commit is no longer allowed.
    	at org.springframework.jms.connection.ConnectionFactoryUtils$JmsResourceSynchronization.processResourceAfterCommit(ConnectionFactoryUtils.java:407)
    	at org.springframework.jms.connection.ConnectionFactoryUtils$JmsResourceSynchronization.processResourceAfterCommit(ConnectionFactoryUtils.java:1)
    	at org.springframework.transaction.support.ResourceHolderSynchronization.afterCommit(ResourceHolderSynchronization.java:79)
    	at org.springframework.transaction.support.TransactionSynchronizationUtils.invokeAfterCommit(TransactionSynchronizationUtils.java:133)
    	at org.springframework.transaction.support.TransactionSynchronizationUtils.triggerAfterCommit(TransactionSynchronizationUtils.java:121)
    	at org.springframework.transaction.support.AbstractPlatformTransactionManager.triggerAfterCommit(AbstractPlatformTransactionManager.java:950)
    	at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:796)
    	at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:723)
    	at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:393)
    	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:120)
    	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
    	at org.springframework.aop.framework.Cglib2AopProxy$DynamicAdvisedInterceptor.intercept(Cglib2AopProxy.java:622)
    	at cz.gpe.spring.preview.tx.jms.cfp.ConnectionFactoryProxyBean$$EnhancerByCGLIB$$50a8d85e.sendMessage(<generated>)
    ....
    Caused by: javax.jms.IllegalStateException: Session was closed already - calling commit is no longer allowed.
    	at com.atomikos.jms.AtomikosJmsXaSessionProxy.invoke(AtomikosJmsXaSessionProxy.java:117)
    	at $Proxy19.commit(Unknown Source)
    	at org.springframework.jms.connection.JmsResourceHolder.commitAll(JmsResourceHolder.java:181)
    	at org.springframework.jms.connection.ConnectionFactoryUtils$JmsResourceSynchronization.processResourceAfterCommit(ConnectionFactoryUtils.java:404)
    	... 43 more

  2. #2
    Join Date
    Oct 2011
    Posts
    3

    Default

    Same problem here, is there any solution to this?

  3. #3
    Join Date
    Jul 2012
    Posts
    2

    Default

    Quote Originally Posted by borkert View Post
    Same problem here, is there any solution to this?
    I dont have any...

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •