Hi,
I set up a junit testcase (standalone application) using:
- Spring v. 3.1.0.RELEASE
- Atomikos JTA implementation v. 3.8.0
- ActiveMQ JMS provider v 5.6.0
This testcase runs a bean method under container-managed transaction which just sends a JMS message to some destination.
I am using org.springframework.jms.connection.TransactionAwar eConnectionFactoryProxy for accessing JMS connection.
I am facing two problems:
- JMS connection obtained thru connectionFactory.createConnection is not closed (and returned into pool) on transaction completion
- Invoking close on JMS connection in bean method delegates method invocation to underlying connection even if the transaction is still active and subsequent JTA transaction commit fails (I was looking into TransactionAwareDataSourceProxy and this does not happen there, connection is released on transaction completion).
Please anyone has an idea what i am doing wrong (in code, configuration) or generally where the problem could be. Below is my code and spring configuration file...
Thank you
Code:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
<context:component-scan base-package="cz.gpe.spring.preview.tx.jms.cfp">
<context:include-filter type="annotation" expression="javax.ejb.Stateless" />
</context:component-scan>
<!-- enable the configuration of transactional behavior based on annotations -->
<tx:annotation-driven transaction-manager="txManager" />
<!-- ActiveMQ JMS Broker -->
<amq:broker id="my-broker" useJmx="false" persistent="false">
<amq:transportConnectors>
<amq:transportConnector uri="tcp://localhost:61617" />
</amq:transportConnectors>
</amq:broker>
<!-- ActiveMQ JMS XA CF -->
<bean id="amq.connectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory" depends-on="my-broker">
<property name="brokerURL" value="tcp://localhost:61617?jms.redeliveryPolicy.maximumRedeliveries=2" />
</bean>
<!-- ActiveMQ JMS CF -->
<bean id="amq.test.connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" depends-on="my-broker">
<property name="brokerURL" value="tcp://localhost:61617?jms.redeliveryPolicy.maximumRedeliveries=2" />
</bean>
<!-- The Atomikos JTA-enabled ConnectionFactory, configured with the vendor's XA factory. -->
<bean id="atomikosConnectionFactoryBean" class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init" destroy-method="close" depends-on="my-broker">
<!-- The unique resource name needed for recovery by the Atomikos core. -->
<property name="uniqueResourceName">
<value>JMS_BROKER_CFP</value>
</property>
<property name="xaConnectionFactory" ref="amq.connectionFactory" />
<property name="localTransactionMode" value="false" />
</bean>
<!-- Spring transaction-aware CF proxy, this is what programmer physically uses in a code -->
<bean name="jms/connectionFactory" class="org.springframework.jms.connection.TransactionAwareConnectionFactoryProxy">
<property name="targetConnectionFactory" ref="atomikosConnectionFactoryBean" />
<property name="synchedLocalTransactionAllowed" value="true" />
</bean>
<bean name="jms/queue" class="org.apache.activemq.command.ActiveMQQueue">
<property name="physicalName">
<value>SPRING_PREVIEW_QUEUE</value>
</property>
</bean>
<bean id="transactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close" depends-on="userTransaction">
<!-- IMPORTANT: disable startup because the userTransactionService above does this -->
<property name="startupTransactionService" value="false" />
<!-- when close is called, should we force transactions to terminate or not? -->
<property name="forceShutdown" value="false" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<ref bean="atomikosConnectionFactoryBean" />
</property>
<property name="defaultDestination">
<ref bean="jms/queue" />
</property>
<property name="sessionTransacted" value="true" />
</bean>
<bean id="userTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
<property name="transactionTimeout" value="30000" />
</bean>
<bean id="txManager" class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="userTransaction" ref="userTransaction" />
<property name="transactionManager" ref="transactionManager" />
</bean>
</beans>
Code:
@Stateless
@Local
@TransactionAttribute
public class ConnectionFactoryProxyBean {
@Resource(name = "jms/queue")
private Destination destination;
@Resource(name = "jms/connectionFactory")
private ConnectionFactory cf;
@Resource
private JmsTemplate jmsTemplate;
public void sendMessage(final String message) {
Connection connection = null;
try {
connection = cf.createConnection();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(destination);
producer.send(session.createTextMessage(message));
}
catch (JMSException e) {
throw new RuntimeException(e);
}
finally {
if (connection != null) {
try {
//if i close the connection here i get "org.springframework.jms.connection.SynchedLocalTransactionFailedException: Local JMS transaction failed to commit; nested exception is javax.jms.IllegalStateException: Session was closed already - calling commit is no longer allowed." when the JTA transaction is commited. See the stacktrace below...
connection.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
Stacktrace for problem #2
Code:
org.springframework.jms.connection.SynchedLocalTransactionFailedException: Local JMS transaction failed to commit; nested exception is javax.jms.IllegalStateException: Session was closed already - calling commit is no longer allowed.
at org.springframework.jms.connection.ConnectionFactoryUtils$JmsResourceSynchronization.processResourceAfterCommit(ConnectionFactoryUtils.java:407)
at org.springframework.jms.connection.ConnectionFactoryUtils$JmsResourceSynchronization.processResourceAfterCommit(ConnectionFactoryUtils.java:1)
at org.springframework.transaction.support.ResourceHolderSynchronization.afterCommit(ResourceHolderSynchronization.java:79)
at org.springframework.transaction.support.TransactionSynchronizationUtils.invokeAfterCommit(TransactionSynchronizationUtils.java:133)
at org.springframework.transaction.support.TransactionSynchronizationUtils.triggerAfterCommit(TransactionSynchronizationUtils.java:121)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.triggerAfterCommit(AbstractPlatformTransactionManager.java:950)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:796)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:723)
at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:393)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:120)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.aop.framework.Cglib2AopProxy$DynamicAdvisedInterceptor.intercept(Cglib2AopProxy.java:622)
at cz.gpe.spring.preview.tx.jms.cfp.ConnectionFactoryProxyBean$$EnhancerByCGLIB$$50a8d85e.sendMessage(<generated>)
....
Caused by: javax.jms.IllegalStateException: Session was closed already - calling commit is no longer allowed.
at com.atomikos.jms.AtomikosJmsXaSessionProxy.invoke(AtomikosJmsXaSessionProxy.java:117)
at $Proxy19.commit(Unknown Source)
at org.springframework.jms.connection.JmsResourceHolder.commitAll(JmsResourceHolder.java:181)
at org.springframework.jms.connection.ConnectionFactoryUtils$JmsResourceSynchronization.processResourceAfterCommit(ConnectionFactoryUtils.java:404)
... 43 more