Results 1 to 9 of 9

Thread: JMS Listening with JTA Transaction

  1. #1

    Default JMS Listening with JTA Transaction

    Hi,

    Is it possible to listen JMS messages with DefaultMessageListenerContainer and JTA Transaction? I tried to run this context:

    Code:
    <beans>
    
        <bean id="broker" class="org.apache.activemq.broker.BrokerService" init-method="start" destroy-method="stop">
            <property name="persistent" value="false"/>
            <property name="transportConnectorURIs">
                <list>
                    <value>tcp://localhost:5000</value>
                </list>
            </property>
        </bean>
    
        <bean id="jotm" class="org.springframework.transaction.jta.JotmFactoryBean"/>
    	<bean id="jotmTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
    		<property name="userTransaction" ref="jotm" />
    	</bean>	
    		
    	<bean id="jmsFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
            <property name="brokerURL" value="tcp://localhost:5000" />
        </bean>
        <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="messages.input"/>
        </bean>
    	
    	<bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    		<property name="concurrentConsumers" value="1"/>
    		<property name="maxMessagesPerTask" value="1"/>
    		<property name="connectionFactory" ref="jmsFactory"/>
    		<property name="destination" ref="destination"/>
    		<property name="messageListener" ref="myMessageProcessor"/>
    		<property name="transactionManager" ref="jotmTransactionManager"/>
    	</bean>
    	
    	<bean id="myMessageProcessor" class="messages.MyMessageProcessorSimple">
            <property name="jmsTemplate">
                <bean class="org.springframework.jms.core.JmsTemplate">
                    <property name="connectionFactory" ref="jmsFactory"/>
                </bean>
            </property>
        </bean>
    
    </beans>
    On running it I got:

    Code:
    INFO  CollectionFactory - JDK 1.4+ collections available
    INFO  XmlBeanDefinitionReader - Loading XML bean definitions from class path resource [messages/context-1.xml]
    INFO  ClassPathXmlApplicationContext - Bean factory for application context [org.springframework.context.support.ClassPathXmlApplicationContext;hashCode=30079646]: org.springframework.beans.factory.support.DefaultListableBeanFactory defining beans [broker,jotm,jotmTransactionManager,jmsFactory,destination,listenerContainer,myMessageProcessor]; root of BeanFactory hierarchy
    INFO  ClassPathXmlApplicationContext - 7 beans defined in application context [org.springframework.context.support.ClassPathXmlApplicationContext;hashCode=30079646]
    INFO  ClassPathXmlApplicationContext - Unable to locate MessageSource with name 'messageSource': using default [org.springframework.context.support.DelegatingMessageSource@18fb1f7]
    INFO  ClassPathXmlApplicationContext - Unable to locate ApplicationEventMulticaster with name 'applicationEventMulticaster': using default [org.springframework.context.event.SimpleApplicationEventMulticaster@228a02]
    INFO  DefaultListableBeanFactory - Pre-instantiating singletons in factory [org.springframework.beans.factory.support.DefaultListableBeanFactory defining beans [broker,jotm,jotmTransactionManager,jmsFactory,destination,listenerContainer,myMessageProcessor]; root of BeanFactory hierarchy]
    INFO  BrokerService - ActiveMQ null JMS Message Broker (localhost) is starting
    INFO  BrokerService - For help or more information please see: http://incubator.apache.org/activemq/
    INFO  TransportServerThreadSupport - Listening for connections at: tcp://prokopiev.stc.donpac.ru:5000
    INFO  TransportConnector - Connector tcp://prokopiev.stc.donpac.ru:5000 Started
    INFO  BrokerService - ActiveMQ JMS Message Broker (localhost, ID:prokopiev.stc.donpac.ru-36826-1155541697290-0:0) started
    INFO  jotm - JOTM started with a local transaction factory which is not bound.
    INFO  jotm - CAROL initialization
    INFO  ConfigurationRepository - No protocols were defined for property 'carol.protocols', trying with default protocol = 'jrmp'.
    INFO  jta - JOTM 2.0.10
    INFO  JtaTransactionManager - Using JTA UserTransaction: org.objectweb.jotm.Current@e95a56
    INFO  JtaTransactionManager - Using JTA TransactionManager: org.objectweb.jotm.Current@e95a56
    INFO  ManagementContext - JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
    
    
    ERROR DefaultMessageListenerContainer - Execution of JMS message listener failed
    javax.jms.JMSException: Session's XAResource has not been enlisted in a distributed transaction.
    	at org.apache.activemq.ActiveMQXASession.doStartTransaction(ActiveMQXASession.java:109)
    	at org.apache.activemq.ActiveMQMessageConsumer.ackLater(ActiveMQMessageConsumer.java:658)
    	at org.apache.activemq.ActiveMQMessageConsumer.beforeMessageIsConsumed(ActiveMQMessageConsumer.java:610)
    	at org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:469)
    	at org.springframework.jms.listener.DefaultMessageListenerContainer.doExecuteListener(DefaultMessageListenerContainer.java:301)
    	at org.springframework.jms.listener.DefaultMessageListenerContainer$1.doInTransactionWithoutResult(DefaultMessageListenerContainer.java:278)
    	at org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:33)
    	at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:128)
    	at org.springframework.jms.listener.DefaultMessageListenerContainer.executeListener(DefaultMessageListenerContainer.java:275)
    	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:375)
    	at org.springframework.core.task.SimpleAsyncTaskExecutor$ConcurrencyThrottlingRunnable.run(SimpleAsyncTaskExecutor.java:203)
    	at java.lang.Thread.run(Thread.java:595)
    In DefaultMessageListenerContainer javadoc I read:

    Message reception and listener execution can automatically be wrapped in transactions through passing a Spring PlatformTransactionManager into the "transactionManager" property. This will usually be a JtaTransactionManager, in combination with a JTA-aware ConnectionFactory that this message listener container fetches its Connections from.

    What is "JTA-aware"? Is this the same as XA-enabled? I use ActiveMQXAConnectionFactory as connection factory, so what's wrong?

  2. #2
    Join Date
    Nov 2005
    Location
    Chicago
    Posts
    122

    Default

    The problem is that the ActiveMQ connection is not registered as a resource in the XA transaction. The standard way this is done with JOTM is to use a XAPool data source, but this only works for database connections. It's possible to write a connection factory wrapper to do this, or there might be a class in ActiveMQ for it now.

    See this page for additional details.

    Jess

  3. #3

    Default

    I read http://opensource.atlassian.com/conf...A+++JOTM+usage and http://issues.apache.org/activemq/browse/AMQ-303 where can see that PooledSpringXAConnectionFactory has currently been integrated into Jencks.

    So, I tried to use PooledSpringXAConnectionFactory from Jencks:

    Code:
    <beans>
    
        <bean id="broker" class="org.apache.activemq.broker.BrokerService" init-method="start" destroy-method="stop">
            <property name="persistent" value="false"/>
            <property name="useJmx" value="false"/>
            <property name="transportConnectorURIs">
                <list>
                    <value>tcp://localhost:56789</value>
                </list>
            </property>
        </bean>
    
        <bean id="jotm" class="org.springframework.transaction.jta.JotmFactoryBean"/>
    	<bean id="jotmTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
    		<property name="userTransaction" ref="jotm" />
    	</bean>	
    		
    	<bean id="jmsFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
            <property name="brokerURL" value="tcp://localhost:56789" />
        </bean>
        <bean id="jmsConnectionFactory" class="org.jencks.pool.PooledSpringXAConnectionFactory">
            <property name="connectionFactory" ref="jmsFactory"/>
            <property name="jtaTransactionManager" ref="jotmTransactionManager"/>
        </bean>
        <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="messages.input"/>
        </bean>
    	
    	<bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    		<property name="concurrentConsumers" value="1"/>
    		<property name="maxMessagesPerTask" value="1"/>
    		<property name="connectionFactory" ref="jmsConnectionFactory"/>
    		<property name="destination" ref="destination"/>
    		<property name="messageListener" ref="myMessageProcessor"/>
    		<property name="transactionManager" ref="jotmTransactionManager"/>
    	</bean>
    	
    	<bean id="myMessageProcessor" class="messages.MyMessageProcessorSimple">
            <property name="jmsTemplate">
                <bean class="org.springframework.jms.core.JmsTemplate">
                    <property name="connectionFactory" ref="jmsFactory"/>
                </bean>
            </property>
        </bean>
    
    </beans>
    On running this context I got:

    Code:
    INFO  DefaultListableBeanFactory - Pre-instantiating singletons in factory [org.springframework.beans.factory.support.DefaultListableBeanFactory defining beans [broker,jotm,jotmTransactionManager,jmsFactory,jmsConnectionFactory,destination,listenerContainer,myMessageProcessor]; root of BeanFactory hierarchy]
    INFO  BrokerService - ActiveMQ null JMS Message Broker (localhost) is starting
    INFO  BrokerService - For help or more information please see: http://incubator.apache.org/activemq/
    INFO  TransportServerThreadSupport - Listening for connections at: tcp://oracle.ewsd.donpac.ru:56789
    INFO  TransportConnector - Connector tcp://oracle.ewsd.donpac.ru:56789 Started
    INFO  BrokerService - ActiveMQ JMS Message Broker (localhost, ID:oracle.ewsd.donpac.ru-21892-1155720095236-0:0) started
    INFO  jotm - JOTM started with a local transaction factory which is not bound.
    INFO  jotm - CAROL initialization
    INFO  ConfigurationRepository - No protocols were defined for property 'carol.protocols', trying with default protocol = 'jrmp'.
    INFO  jta - JOTM 2.0.10
    INFO  JtaTransactionManager - Using JTA UserTransaction: org.objectweb.jotm.Current@104faf8
    INFO  JtaTransactionManager - Using JTA TransactionManager: org.objectweb.jotm.Current@104faf8
    DEBUG PooledSpringXAConnection - -->> ENTERING PooledSpringXAConnection.createXASession()
    DEBUG PooledSpringXAConnection - -->> THERE IS NO ACTIVE TRANSACTION, SO JUST RETURNING BORROWED SESSION...
    DEBUG XASessionPool - ---->>>>> BORROWING JMS SESSION FROM POOL...
    DEBUG XASessionPool - ---->>>>> CREATING NEW SESSION TO SATISFY REQUEST!!
    DEBUG XASessionPool - ---->>>>> BORROWED SESSION: org.jencks.pool.PooledSpringXASession@1fe1feb
    
    
    ERROR DefaultMessageListenerContainer - Execution of JMS message listener failed
    javax.jms.JMSException: Session's XAResource has not been enlisted in a distributed transaction.
    	at org.apache.activemq.ActiveMQXASession.doStartTransaction(ActiveMQXASession.java:109)
    	at org.apache.activemq.ActiveMQMessageConsumer.ackLater(ActiveMQMessageConsumer.java:658)
    	at org.apache.activemq.ActiveMQMessageConsumer.beforeMessageIsConsumed(ActiveMQMessageConsumer.java:610)
    	at org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:469)
    	at org.springframework.jms.listener.DefaultMessageListenerContainer.doExecuteListener(DefaultMessageListenerContainer.java:301)
    	at org.springframework.jms.listener.DefaultMessageListenerContainer$1.doInTransactionWithoutResult(DefaultMessageListenerContainer.java:278)
    	at org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:33)
    	at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:128)
    	at org.springframework.jms.listener.DefaultMessageListenerContainer.executeListener(DefaultMessageListenerContainer.java:275)
    	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:375)
    	at org.springframework.core.task.SimpleAsyncTaskExecutor$ConcurrencyThrottlingRunnable.run(SimpleAsyncTaskExecutor.java:203)
    	at java.lang.Thread.run(Thread.java:595)
    What is wrong now?

  4. #4

    Default

    The problem is in Jencks code, which PooledSpringXAConnectionFactory can't work in multithreaded environment. I tried to run this simple context:

    Code:
    <beans>
    
        <bean id="broker" class="org.apache.activemq.broker.BrokerService" init-method="start" destroy-method="stop">
            <property name="persistent" value="false"/>
            <property name="transportConnectorURIs">
                <list>
                    <value>tcp://localhost:5000</value>
                </list>
            </property>
        </bean>
        
        <bean id="jotm" class="org.springframework.transaction.jta.JotmFactoryBean"/>
        <bean id="jotmTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
            <property name="userTransaction" ref="jotm"/>
        </bean>
        
        <bean id="connectionFactory" class="org.jencks.pool.PooledSpringXAConnectionFactory">
            <property name="connectionFactory">
                <bean class="org.apache.activemq.ActiveMQXAConnectionFactory">
                    <property name="brokerURL" value="tcp://localhost:5000" />
                </bean>
            </property>
            <property name="transactionManager" ref="jotm"/>
        </bean>
        
        <bean id="messageReceiver" class="org.springframework.transaction.interceptor.TransactionProxyFactoryBean">
            <property name="transactionManager" ref="jotmTransactionManager"/>
            <property name="transactionAttributes">
                <props>
                    <prop key="*">PROPAGATION_REQUIRED</prop>
                </props>
            </property>
            <property name="target">
                <bean class="simple.MessageReceiver">
                    <property name="connectionFactory" ref="connectionFactory"/>
                </bean>
            </property>
            <property name="proxyTargetClass" value="true"/>
        </bean>
        
    </beans>
    with this code:

    Code:
    public class MessageReceiver {
    
        private Log log = LogFactory.getLog(getClass());
        
        private ConnectionFactory connectionFactory;
    
        public void setConnectionFactory(ConnectionFactory connectionFactory) {
            this.connectionFactory = connectionFactory;
        }
    
        public void receive() {
            Thread readerThread = new Thread(new Runnable(){
                public void run() {
                    try {
                        Connection connection = connectionFactory.createConnection();
                        connection.start();
                        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                        Destination destination = session.createQueue("messages.input");
                        MessageConsumer consumer = session.createConsumer(destination);
                        log.debug("receive message ...");
                        while (true) {
                            Message message = consumer.receive(Long.MAX_VALUE);
                            log.debug("received message : "+message);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }               
                }           
            });
            readerThread.start();
        }
        
    }
    and got:

    Code:
    INFO  BrokerService - ActiveMQ null JMS Message Broker (localhost) is starting
    INFO  BrokerService - For help or more information please see: http://incubator.apache.org/activemq/
    INFO  TransportServerThreadSupport - Listening for connections at: tcp://prokopiev.stc.donpac.ru:5000
    INFO  TransportConnector - Connector tcp://prokopiev.stc.donpac.ru:5000 Started
    INFO  BrokerService - ActiveMQ JMS Message Broker (localhost, ID:prokopiev.stc.donpac.ru-38166-1155892764301-0:0) started
    INFO  jotm - JOTM started with a local transaction factory which is not bound.
    INFO  jotm - CAROL initialization
    INFO  ConfigurationRepository - No protocols were defined for property 'carol.protocols', trying with default protocol = 'jrmp'.
    INFO  jta - JOTM 2.0.10
    INFO  JtaTransactionManager - Using JTA UserTransaction: org.objectweb.jotm.Current@19836ed
    INFO  JtaTransactionManager - Using JTA TransactionManager: org.objectweb.jotm.Current@19836ed
    INFO  ManagementContext - JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
    INFO  DefaultAopProxyFactory - CGLIB2 available: proxyTargetClass feature enabled
    DEBUG MessageReceiver - receive message ...
    DEBUG PooledSpringXAConnection - -->> ENTERING PooledSpringXAConnection.createXASession()
    DEBUG PooledSpringXAConnection - -->> THERE IS NO ACTIVE TRANSACTION, SO JUST RETURNING BORROWED SESSION...
    DEBUG XASessionPool - ---->>>>> BORROWING JMS SESSION FROM POOL...
    DEBUG XASessionPool - ---->>>>> CREATING NEW SESSION TO SATISFY REQUEST!!
    DEBUG XASessionPool - ---->>>>> BORROWED SESSION: org.jencks.pool.PooledSpringXASession@4c47db
    javax.jms.JMSException: Session's XAResource has not been enlisted in a distributed transaction.
        at org.apache.activemq.ActiveMQXASession.doStartTransaction(ActiveMQXASession.java:109)
        at org.apache.activemq.ActiveMQMessageConsumer.ackLater(ActiveMQMessageConsumer.java:658)
        at org.apache.activemq.ActiveMQMessageConsumer.beforeMessageIsConsumed(ActiveMQMessageConsumer.java:610)
        at org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:469)
        at simple.MessageReceiver$1.run(MessageReceiver.java:35)
        at java.lang.Thread.run(Thread.java:595)
    After extracting JMS code from separate thread I got:

    Code:
    INFO  BrokerService - ActiveMQ null JMS Message Broker (localhost) is starting
    INFO  BrokerService - For help or more information please see: http://incubator.apache.org/activemq/
    INFO  TransportServerThreadSupport - Listening for connections at: tcp://prokopiev.stc.donpac.ru:5000
    INFO  TransportConnector - Connector tcp://prokopiev.stc.donpac.ru:5000 Started
    INFO  BrokerService - ActiveMQ JMS Message Broker (localhost, ID:prokopiev.stc.donpac.ru-38153-1155892708903-0:0) started
    INFO  jotm - JOTM started with a local transaction factory which is not bound.
    INFO  jotm - CAROL initialization
    INFO  ConfigurationRepository - No protocols were defined for property 'carol.protocols', trying with default protocol = 'jrmp'.
    INFO  jta - JOTM 2.0.10
    INFO  JtaTransactionManager - Using JTA UserTransaction: org.objectweb.jotm.Current@949f69
    INFO  JtaTransactionManager - Using JTA TransactionManager: org.objectweb.jotm.Current@949f69
    INFO  ManagementContext - JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
    INFO  DefaultAopProxyFactory - CGLIB2 available: proxyTargetClass feature enabled
    DEBUG MessageReceiver - receive message ...
    DEBUG PooledSpringXAConnection - -->> ENTERING PooledSpringXAConnection.createXASession()
    DEBUG PooledSpringXAConnection - -->> ACTUAL TRANSACTION IS ACTIVE!
    DEBUG PooledSpringXAConnection - -->> NO ACTIVE SESSION ASSOCIATED WITH CURRENT THREAD, BORROWING...
    DEBUG XASessionPool - ---->>>>> BORROWING JMS SESSION FROM POOL...
    DEBUG XASessionPool - ---->>>>> CREATING NEW SESSION TO SATISFY REQUEST!!
    DEBUG XASessionPool - ---->>>>> BORROWED SESSION: org.jencks.pool.PooledSpringXASession@1402d5a
    DEBUG PooledSpringXAConnection - -->> ENLISTING NEW SESSION'S XAResource WITH TRANSACTION...
    DEBUG PooledSpringXAConnection - -->> BINDING NEW SESSION WITH TRANSACTION...
    DEBUG PooledSpringXAConnection - -->> REGISTERING SYNCHRONIZATION WITH TRANSACTION...
    DEBUG MessageReceiver - received message : ActiveMQObjectMessage {commandId = 5, responseRequired = true, messageId = ID:prokopiev.stc.donpac.ru-38161-1155892719980-0:0:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:prokopiev.stc.donpac.ru-38161-1155892719980-0:0:1:1, destination = queue://messages.input, transactionId = null, expiration = 0, timestamp = 1155892720450, arrival = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activeio.packet.ByteSequence@b40ec4, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true}
    So, in single thread all works fine.

    Can I enlist my XAResource (JMS Session) in distributed transaction by hand? Can anybody give me a simple example of this?

  5. #5

    Default

    This problem can be resolved by writing another wrapper instead of org.jencks.pool.PooledSpringXAConnectionFactory or modifing it to support XA connection and XA session in different threads.

    Such wrapper example can be found at http://www.nabble.com/XA-connection-...tf2139695.html
    but ActiveMQ guys says that this way is wrong: enlistment is the responsibility of the container - be it Jencks, MDB container or Spring. So, DefaultMessageListenerContainer implementation must be updated to support enlisting XA resources.

    What can anobody say about this?

  6. #6

    Default

    My latest code looks like:

    context.xml:

    Code:
    <beans>
    
    	<bean id="broker" class="org.apache.activemq.broker.BrokerService" init-method="start" destroy-method="stop">
    		<property name="persistent" value="false"/>
    		<property name="transportConnectorURIs">
    			<list>
    				<value>tcp://localhost:5000</value>
    			</list>
    		</property>
    	</bean>
    	
    	<bean id="connectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
    		<property name="brokerURL" value="tcp://localhost:5000" />
    	</bean>
    	
    	<bean id="jotm" class="org.springframework.transaction.jta.JotmFactoryBean"/>
    	<bean id="jotmTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
    		<property name="userTransaction" ref="jotm"/>
    	</bean>
    	
    	<bean id="messageReceiver" class="simple.TransactionalMessageReceiver">
    		<property name="connectionFactory" ref="connectionFactory"/>
    		<property name="transactionManager" ref="jotmTransactionManager"/>
    	</bean>
    	
    </beans>
    TransactionalMessageReceiver.java

    Code:
    public class TransactionalMessageReceiver {
    
    	private Log log = LogFactory.getLog(getClass());
    	
    	private XAConnectionFactory connectionFactory;
    	private JtaTransactionManager transactionManager;
    
    	public void setConnectionFactory(XAConnectionFactory connectionFactory) {
    		this.connectionFactory = connectionFactory;
    	}
    
    	public void setTransactionManager(JtaTransactionManager transactionManager) {
    		this.transactionManager = transactionManager;
    	}
    
    	public void receive() throws Exception {
    		final XAConnection connection = connectionFactory.createXAConnection();
    		connection.start();
    		final XASession session = connection.createXASession();
    		final Destination destination = session.createQueue("messages.input");
                    final MessageConsumer consumer = session.createConsumer(destination);
    		Thread readerThread = new Thread(new Runnable(){
    			public void run() {
    				while(!Thread.currentThread().isInterrupted()) {
    					try {
    						transactionManager.getUserTransaction().begin();
    						transactionManager.getTransactionManager().getTransaction().enlistResource(session.getXAResource());						
    				                Message message = consumer.receive(Long.MAX_VALUE);
    			                        log.debug("received message : "+message);
    						transactionManager.getTransactionManager().getTransaction().delistResource(session.getXAResource(), XAResource.TMSUCCESS);
    						transactionManager.getUserTransaction().commit();
    					} catch (Exception e) {
    						e.printStackTrace();
    					}
    					
    				}
    			}			
    		});
    		readerThread.start();        
    	}
    	
    }
    Is this approach right? There are no way to use DefaultMessageListenerContainer or even JmsTemplate, am I right?

    Is it possible to integrate to receive() method some other operations, for example with Hibernate? How can I create and save hibernate session, having only org.springframework.transaction.jta.JtaTransaction Manager and org.springframework.orm.hibernate3.LocalSessionFac toryBean?

  7. #7

    Default

    More convinient workaround is using TransactionTemplate:

    context.xml:

    Code:
    <beans>
    
        <bean id="broker" class="org.apache.activemq.broker.BrokerService" init-method="start" destroy-method="stop">
            <property name="persistent" value="false"/>
            <property name="transportConnectorURIs">
                <list>
                    <value>tcp://localhost:5000</value>
                </list>
            </property>
        </bean>
        
        <bean id="jotm" class="org.springframework.transaction.jta.JotmFactoryBean"/>
        <bean id="jotmTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
            <property name="userTransaction" ref="jotm"/>
        </bean>
        <bean id="jotmTransactionTemplate" class="org.springframework.transaction.support.TransactionTemplate">
            <property name="transactionManager" ref="jotmTransactionManager"/>
        </bean>
        
        <bean id="jmsConnectionFactory" class="org.jencks.pool.PooledSpringXAConnectionFactory">
            <property name="connectionFactory">
                <bean class="org.apache.activemq.ActiveMQXAConnectionFactory">
                    <property name="brokerURL" value="tcp://localhost:5000" />
                </bean>
            </property>
            <property name="transactionManager" ref="jotm"/>
        </bean>
        
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="jmsConnectionFactory"/>
            <property name="defaultDestinationName" value="messages.input"/>
        </bean>
        
        <bean id="messageReceiver" class="simple.MessageReceiver" init-method="receive">
            <property name="transactionTemplate" ref="jotmTransactionTemplate"/>
            <property name="jmsTemplate" ref="jmsTemplate"/>
        </bean>
        
    </beans>
    MessageReceiver.java:

    Code:
    public class MessageReceiver {
    
        private Log log = LogFactory.getLog(getClass());
        
        private JmsTemplate jmsTemplate;
        private TransactionTemplate transactionTemplate;
    
        public void setJmsTemplate(JmsTemplate jmsTemplate) {
            this.jmsTemplate = jmsTemplate;
        }
        
        public void setTransactionTemplate(TransactionTemplate transactionTemplate) {
            this.transactionTemplate = transactionTemplate;
        }
    
        public void receive() {
            Thread readerThread = new Thread(new Runnable(){
                public void run() {
                    while(!Thread.currentThread().isInterrupted()) {
                        transactionTemplate.execute(
                            new TransactionCallbackWithoutResult() {
                                public void doInTransactionWithoutResult(TransactionStatus status) {
                                    Message message = jmsTemplate.receive();
                                    log.debug(message);
                                }
                            }
                        );   
                    }
                }           
            });
            readerThread.start();
        }
        
    }
    On starting and sending message to queue://messages.input I see:

    Code:
    INFO  BrokerService - ActiveMQ null JMS Message Broker (localhost) is starting
    INFO  BrokerService - For help or more information please see: http://incubator.apache.org/activemq/
    INFO  TransportServerThreadSupport - Listening for connections at: tcp://prokopiev.stc.donpac.ru:5000
    INFO  TransportConnector - Connector tcp://prokopiev.stc.donpac.ru:5000 Started
    INFO  BrokerService - ActiveMQ JMS Message Broker (localhost, ID:prokopiev.stc.donpac.ru-42291-1156317141577-0:0) started
    INFO  jotm - JOTM started with a local transaction factory which is not bound.
    INFO  jotm - CAROL initialization
    INFO  ConfigurationRepository - No protocols were defined for property 'carol.protocols', trying with default protocol = 'jrmp'.
    INFO  jta - JOTM 2.0.10
    INFO  JtaTransactionManager - Using JTA UserTransaction: org.objectweb.jotm.Current@d9660d
    INFO  JtaTransactionManager - Using JTA TransactionManager: org.objectweb.jotm.Current@d9660d
    INFO  ManagementContext - JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
    DEBUG PooledSpringXAConnection - -->> ENTERING PooledSpringXAConnection.createXASession()
    DEBUG PooledSpringXAConnection - -->> ACTUAL TRANSACTION IS ACTIVE!
    DEBUG PooledSpringXAConnection - -->> NO ACTIVE SESSION ASSOCIATED WITH CURRENT THREAD, BORROWING...
    DEBUG XASessionPool - ---->>>>> BORROWING JMS SESSION FROM POOL...
    DEBUG XASessionPool - ---->>>>> CREATING NEW SESSION TO SATISFY REQUEST!!
    DEBUG XASessionPool - ---->>>>> BORROWED SESSION: org.jencks.pool.PooledSpringXASession@b60b93
    DEBUG PooledSpringXAConnection - -->> ENLISTING NEW SESSION'S XAResource WITH TRANSACTION...
    DEBUG PooledSpringXAConnection - -->> BINDING NEW SESSION WITH TRANSACTION...
    DEBUG PooledSpringXAConnection - -->> REGISTERING SYNCHRONIZATION WITH TRANSACTION...
    
    DEBUG PooledSpringXASession - ---->>>>> PooledSpringXASession.close() called
    DEBUG PooledSpringXASession - ---->>>>> ignoreClose IS TRUE!  KEEPING SESSION OPEN!
    DEBUG MessageReceiver - ActiveMQObjectMessage {commandId = 5, responseRequired = true, messageId = ID:prokopiev.stc.donpac.ru-42299-1156317149577-0:0:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:prokopiev.stc.donpac.ru-42299-1156317149577-0:0:1:1, destination = queue://messages.input, transactionId = null, expiration = 0, timestamp = 1156317150047, arrival = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activeio.packet.ByteSequence@14835fb, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true}
    DEBUG PooledSpringXAConnection - -->> PooledSpringXAConnection.[synchronization].afterCompletion() CALLED...
    DEBUG PooledSpringXAConnection - -->> RETURNING JMS SESSION TO POOL...
    DEBUG PooledSpringXASession - ---->>>>> PooledSpringXASession.close() called
    DEBUG PooledSpringXASession - ---->>>>> ignoreClose = false, so returning session pool...
    DEBUG XASessionPool - ---->>>>> SESSION HAS BEEN RETURNED TO POOL: org.jencks.pool.PooledSpringXASession@b60b93
    DEBUG PooledSpringXAConnection - -->> ENTERING PooledSpringXAConnection.createXASession()
    DEBUG PooledSpringXAConnection - -->> ACTUAL TRANSACTION IS ACTIVE!
    DEBUG PooledSpringXAConnection - -->> NO ACTIVE SESSION ASSOCIATED WITH CURRENT THREAD, BORROWING...
    DEBUG XASessionPool - ---->>>>> BORROWING JMS SESSION FROM POOL...
    DEBUG XASessionPool - ---->>>>> BORROWED SESSION: org.jencks.pool.PooledSpringXASession@b60b93
    DEBUG PooledSpringXAConnection - -->> ENLISTING NEW SESSION'S XAResource WITH TRANSACTION...
    DEBUG PooledSpringXAConnection - -->> BINDING NEW SESSION WITH TRANSACTION...
    DEBUG PooledSpringXAConnection - -->> REGISTERING SYNCHRONIZATION WITH TRANSACTION...
    
    INFO  jotm - set rollback only (tx=bb14:38:0:0148841f24dbed9394...eebc02:)
    I confused by last message but transaction is commited as I see to queue://messages.input - this queue is empty after reading message from it

  8. #8

    Default

    It will be more useful to use message driven POJO, so I registered bug in http://opensource.atlassian.com/proj...rowse/SPR-2461

  9. #9
    Join Date
    Oct 2006
    Posts
    21

    Default

    very similar to what you have.

    The spring transaction demarcation will start the transaction for you.
    Attached Files Attached Files

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •