Page 2 of 3 FirstFirst 123 LastLast
Results 11 to 20 of 24

Thread: Spring integration flow and XA transactions

  1. #11
    Join Date
    Jan 2009
    Location
    Ukraine, Kharkov
    Posts
    637

    Default

    Hello

    I don't see any place in your code, when you start transaction...
    How do you place the Message into in channel? It should be a point to start transaction.
    And this transaction will be ended on sending Message into out channel, or rollbacked as you do in the trasformer method.

    Please, provide more info around you code.

    Cheers,
    Artem Bilan

  2. #12
    Join Date
    Feb 2010
    Posts
    22

    Default

    This is a question. Who to start transaction if my flow doesn't have any custom code?

  3. #13
    Join Date
    Jan 2009
    Location
    Ukraine, Kharkov
    Posts
    637

    Default

    Who to start transaction if my flow doesn't have any custom code?
    H-m-m.
    Sorry, I ask again: how do you send Message into in channel or who does it?

    There are a lot of solutions:
    1. from <transactional> <poller> on queue channel
    2. from @Transactional @Gateway method
    3. from transactional <int-jms:message-driven-channel-adapter> as Mark said above
    4. from transactional POJO service via @Gateway.
    5. before Spring Integration 2.2 I used this trick:
    HTML Code:
    <tx:advice id="messageProcessorRequiredNewTxAdvice">
    		<tx:attributes>
    			<tx:method name="processMessage" propagation="REQUIRES_NEW"/>
    		</tx:attributes>
    	</tx:advice>
    
    	<bean id="mergeEntityMessageProcessor" class="org.springframework.aop.framework.ProxyFactoryBean">
    		<property name="target">
    			<bean class="org.springframework.integration.handler.MethodInvokingMessageProcessor"
    				  c:targetObject-ref="entityManager" c:methodName="merge"/>
    		</property>
    		<property name="interceptorNames" value="messageProcessorRequiredNewTxAdvice"/>
    	</bean>
    Since 2.2 we can use <transactional> sub-element on new JPA-adapters.
    Unfortunately, it's not your case.
    So, show, please, upstream message-flow: the point when the Message is placed into in channel
    Last edited by Cleric; Aug 1st, 2012 at 06:50 AM.

  4. #14
    Join Date
    Feb 2010
    Posts
    22

    Default

    I get messages from int-jms:message-driven-channel-adapter after. In this point I have to start my XA transaction.

  5. #15
    Join Date
    Jan 2009
    Location
    Ukraine, Kharkov
    Posts
    637

    Default

    In this point I have to start my XA transaction
    Show config, please, and also keep in mind DataSourceTransactionManager isn't XA-manager. Read his JavaDoc, please.
    You should use here JtaTransactionManager but he will work only on AS container: JBOSS, WebSphere, WebLogic etc...
    Or read more Dave's article ;-)

  6. #16
    Join Date
    Feb 2010
    Posts
    22

    Default

    I am using atomikos with JtaTransactionManager
    Here is my full config

    I am not sure that jms transaction and db transactions are under one XA transaction.

    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:int="http://www.springframework.org/schema/integration"
           xmlns:jdbc="http://www.springframework.org/schema/jdbc"
           xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"
           xmlns:tx="http://www.springframework.org/schema/tx"
           xmlns:p="http://www.springframework.org/schema/p"
           xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    		http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
    		http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
    		http://www.springframework.org/schema/integration/jdbc http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc.xsd
    		  http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd">
    
    
        <int-jms:message-driven-channel-adapter id="autoRollLocatesJmsIn"
                                                channel="in"
                                                container="xaQueueListener"
                                                acknowledge="transacted"/>
        <bean id="xaQueueListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer"
              p:connectionFactory-ref="connectionFactory" p:destination-ref="destination"
              p:concurrentConsumers="5" p:maxConcurrentConsumers="10" p:transactionManager-ref="jtaTransactionManager"/>
    
        <bean id="jmsConnection" class="com.tibco.tibjms.TibjmsQueueConnectionFactory"
              p:serverUrl="tcp://localhost:7222" p:userName="admin"
              p:userPassword=""/>
    
    
        <bean id="xaJmsConnection" class="com.tibco.tibjms.TibjmsXAConnectionFactory"
              p:serverUrl="tcp://localhost:7222" p:userName="admin"
              p:userPassword=""/>
    
    
        <bean id="destination" class="com.tibco.tibjms.TibjmsQueue">
            <constructor-arg value="myQueue"/>
        </bean>
    
        <bean id="resultQueue" class="com.tibco.tibjms.TibjmsQueue">
            <constructor-arg value="resultQueue"/>
        </bean>
    
        <bean id="atomicosConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init">
            <property name="uniqueResourceName" value="tibcoEMSConnectionFactory"/>
            <property name="xaConnectionFactory" ref="xaJmsConnection"/>
            <property name="poolSize" value="5"/>
    
        </bean>
    
        <bean id="connectionFactory"
              class="org.springframework.jms.connection.SingleConnectionFactory">
            <property name="targetConnectionFactory" ref="atomicosConnectionFactory"/>
        </bean>
    
    
        <bean id="jtaTransactionManager"
              class="org.springframework.transaction.jta.JtaTransactionManager" depends-on="userTransactionService">
            <property name="transactionManager" ref="atomikosTransactionManager"/>
            <property name="userTransaction" ref="atomikosUserTransaction"/>
        </bean>
    
        <bean id="atomikosTransactionManager"
              class="com.atomikos.icatch.jta.UserTransactionManager"
              init-method="init" destroy-method="close" depends-on="userTransactionService">
            <property name="forceShutdown" value="false"/>
    
        </bean>
    
        <bean id="atomikosUserTransaction"
              class="com.atomikos.icatch.jta.UserTransactionImp" depends-on="userTransactionService">
            <property name="transactionTimeout" value="300"/>
    
        </bean>
    
    
        <bean id="userTransactionService" class="com.atomikos.icatch.config.UserTransactionServiceImp"
              init-method="init" destroy-method="shutdownForce">
            <constructor-arg>
                <!-- IMPORTANT: specify all Atomikos properties here -->
                <props>
                    <prop key="com.atomikos.icatch.service">com.atomikos.icatch.standalone.UserTransactionServiceFactory
                    </prop>
                </props>
            </constructor-arg>
        </bean>
    
    
        <tx:annotation-driven transaction-manager="jtaTransactionManager"/>
    
    
        <bean id="jdbcDataSource1" class="org.h2.jdbcx.JdbcDataSource">
            <property name="URL" value="jdbc:h2:mem:xa_test1"/>
            <property name="user" value="sa"/>
            <property name="password" value=""/>
        </bean>
    
        <bean id="dataSource1" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init"
              destroy-method="close">
            <property name="xaDataSource" ref="jdbcDataSource1"/>
            <property name="uniqueResourceName" value="database1"/>
        </bean>
    
        <bean id="jdbcDataSource2" class="org.h2.jdbcx.JdbcDataSource">
            <property name="URL" value="jdbc:h2:mem:xa_test2"/>
            <property name="user" value="sa"/>
            <property name="password" value=""/>
        </bean>
    
        <bean id="dataSource2" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init"
              destroy-method="close">
            <property name="xaDataSource" ref="jdbcDataSource2"/>
            <property name="uniqueResourceName" value="database2"/>
        </bean>
    
    
        <int:channel id="in">
            <int:queue capacity="10"/>
        </int:channel>
    
        <int:channel id="transformChannel"/>
    
        <int:channel id="internalChannel"/>
    
        <int:channel id="internalChannel2">
            <int:queue capacity="10"/>
        </int:channel>
    
        <int:channel id="out">
            <int:queue capacity="10"/>
        </int:channel>
    
        <int-jdbc:outbound-gateway
                update="insert into message (id, message) values (1,'Hello')"
                request-channel="in" reply-channel="internalChannel" data-source="dataSource1"/>
    
        <int-jdbc:outbound-gateway
                update="insert into message (id, message) values (1,'Hello')"
                request-channel="internalChannel" reply-channel="transformChannel" data-source="dataSource2"/>
    
        <int:transformer input-channel="transformChannel" output-channel="out" ref="myTransformer"/>
    
        <bean id="myTransformer" class="xa.MyTransformer">
        </bean>
    
    
        <int:poller id="transactionalPoller" default="true" max-messages-per-poll="1" fixed-rate="10">
            <int:transactional transaction-manager="jtaTransactionManager"
                               isolation="DEFAULT"
                               propagation="REQUIRED"
                               read-only="true"
                               timeout="1000"/>
    
        </int:poller>
    
    
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="jmsConnection"/>
    
            <property name="sessionTransacted" value="true"/>
    
        </bean>
    
    
    </beans>

    Also I've got log message
    Code:
    14:17:13.101 [xaQueueListener-4] INFO  c.a.jms.AtomikosJmsNonXaSessionProxy - atomikos non-xa session proxy for vendor instance com.tibco.tibjms.TibjmsxSessionImp@1549ceb: WARNING - detected JTA transaction context while using non-transactional session.
    Beware that any JMS operations you perform are NOT part of the JTA transaction.
    To enable JTA, make sure to do all of the following:
    1. Make sure that the AtomikosConnectionFactoryBean is configured with localTransactionMode=false, and
    2. Make sure to call create JMS sessions with the transacted flag set to true.

    If I switch int-jms:message-driven-channel-adapter on

    Code:
     
            <int-jms:message-driven-channel-adapter id="autoRollLocatesJmsIn"
                                        channel="in"
                                        destination="destination"
                                        connection-factory="connectionFactory"
                                        acknowledge="transacted"
                                        transaction-manager="jtaTransactionManager"
    
        />
    I get log message

    Code:
    14:23:32.054 [org.springframework.jms.listener.DefaultMessageListenerContainer#0-1] INFO  c.a.jms.AtomikosJmsXaSessionProxy - atomikos xa session proxy for resource XAactiveMQ: Calling commit/rollback is not allowed on a managed session!
    Is it any issues with atomikos usage or it is mistake in my configuration?

  7. #17
    Join Date
    Jan 2009
    Location
    Ukraine, Kharkov
    Posts
    637

    Default

    detected JTA transaction context while using non-transactional session
    Just follow with this advice:
    HTML Code:
     <bean id="xaQueueListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer"
              p:connectionFactory-ref="connectionFactory" p:destination-ref="destination"
              p:concurrentConsumers="5" p:maxConcurrentConsumers="10" p:transactionManager-ref="jtaTransactionManager"
              p:sessionTransacted="true"/> !!!HERE!!
    <bean id="jdbcDataSource1" class="org.h2.jdbcx.JdbcDataSource">
    H2 doesn't fully support XA transactions: http://g.zeos.in/?q=h2%20xa%20support ;-)

  8. #18
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,840

    Default

    Is there a reason you are defining an explicit container bean? You *should* be able to specify everything you need on the "message-driven-channel-adapter" element, including the "transaction-manager" attribute.

  9. #19
    Join Date
    Feb 2010
    Posts
    22

    Default

    2 Cleric

    H2 supports XA enough . My test shows it. My problem regards to JMS transaction in message-driven-channel-adapter. Is that transaction includes in my global XA transaction? If yes , why I get messages like
    atomikos xa session proxy for resource XAactiveMQ: Calling commit/rollback is not allowed on a managed session!
    and
    14:17:13.101 [xaQueueListener-4] INFO c.a.jms.AtomikosJmsNonXaSessionProxy - atomikos non-xa session proxy for vendor instance com.tibco.tibjms.TibjmsxSessionImp@1549ceb: WARNING - detected JTA transaction context while using non-transactional session.
    Beware that any JMS operations you perform are NOT part of the JTA transaction.
    To enable JTA, make sure to do all of the following:
    1. Make sure that the AtomikosConnectionFactoryBean is configured with localTransactionMode=false, and
    2. Make sure to call create JMS sessions with the transacted flag set to true.
    2 Mark Fisher
    I've used this approach because in my real application I have to use messageSelector which exists at container

  10. #20
    Join Date
    Feb 2010
    Posts
    22

    Default

    Moreover!

    I can't start transaction without poller. Is It correct behaviour or my configuration mistake?

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •