Results 1 to 3 of 3

Thread: Configure two different datasources ( one for reading and another for writing )

  1. #1
    Join Date
    Jan 2012
    Posts
    4

    Question Configure two different datasources ( one for reading and another for writing )

    Hi,

    I am very new to the spring batch. I was trying to setup a ETL process, where i can read data from one database, apply some transformations to it and then write it back to another database.

    So basically i should be able to configure two datasources along with their respective transaction management beans.

    Is this possible?


    Thanks in Advance,
    Vamsi Annem
    Last edited by vamsiannem; Jan 30th, 2012 at 08:49 AM. Reason: Adding thanks note

  2. #2
    Join Date
    Jan 2012
    Posts
    4

    Default

    Just to be very clear, i have an example below:

    I have 2 different datasources, one to read and another one to write results. Following is the job configuration:

    <job id="sampleJob" job-repository="jobRepository">
    <step id="step1" transaction-manager="myTransactionManager">
    <tasklet>
    <chunk reader="itemReader" processor="itemProcessor" writer="itemWriter" commit-interval="10"/>
    <tasklet>
    </step>
    </job>

    ItemReader should get data from dataSource_1.
    ItemWriter should write data to dataSource_2.

    As per the documentaion, we can configure a single transaction manager at tasklet


    In this scenario, how do i use the transaction manager here ?


    Thanks,
    Vamsi.

  3. #3
    Join Date
    Jan 2012
    Posts
    4

    Default Error: resource 'mysql' cannot be used outside XA transaction scope for Global trx

    I am an issue with Transaction Manager configuration for XA Transactions, Please help me.

    Here is the Log:
    ----------------------------------------------------------------------------------------------------------
    DEBUG main org.springframework.batch.core.repository.dao.Jdbc StepExecutionDao - Truncating long message before update of StepExecution, original message is: org.springframework.batch.item.ItemStreamException : Failed to initialize the reader
    at org.springframework.batch.item.support.AbstractIte mCountingItemStreamItemReader.open(AbstractItemCou ntingItemStreamItemReader.java:137)
    at org.springframework.batch.item.support.CompositeIt emStream.open(CompositeItemStream.java:93)
    at org.springframework.batch.core.step.tasklet.Taskle tStep.open(TaskletStep.java:301)
    at org.springframework.batch.core.step.AbstractStep.e xecute(AbstractStep.java:192)
    at org.springframework.batch.core.job.SimpleStepHandl er.handleStep(SimpleStepHandler.java:135)
    at org.springframework.batch.core.job.flow.JobFlowExe cutor.executeStep(JobFlowExecutor.java:61)
    at org.springframework.batch.core.job.flow.support.st ate.StepState.handle(StepState.java:60)
    at org.springframework.batch.core.job.flow.support.Si mpleFlow.resume(SimpleFlow.java:144)
    at org.springframework.batch.core.job.flow.support.Si mpleFlow.start(SimpleFlow.java:124)
    at org.springframework.batch.core.job.flow.FlowJob.do Execute(FlowJob.java:135)
    at org.springframework.batch.core.job.AbstractJob.exe cute(AbstractJob.java:281)
    at org.springframework.batch.core.launch.support.Simp leJobLauncher$1.run(SimpleJobLauncher.java:120)
    at org.springframework.core.task.SyncTaskExecutor.exe cute(SyncTaskExecutor.java:49)
    at org.springframework.batch.core.launch.support.Simp leJobLauncher.run(SimpleJobLauncher.java:114)

    Caused by: org.springframework.jdbc.UncategorizedSQLException : Executing query; uncategorized SQLException for SQL [SELECT * FROM mmfdb.transaction limit 1, 10]; SQL state [null]; error code [0]; error enlisting a JdbcConnectionHandle of a JdbcPooledConnection from datasource mysql in state ACCESSIBLE with usage count 1 wrapping com.mysql.jdbc.jdbc2.optional.JDBC4MysqlXAConnecti on@78dc4c on com.mysql.jdbc.jdbc2.optional.JDBC4ConnectionWrapp er@c70b0d; nested exception is java.sql.SQLException: error enlisting a JdbcConnectionHandle of a JdbcPooledConnection from datasource mysql in state ACCESSIBLE with usage count 1 wrapping com.mysql.jdbc.jdbc2.optional.JDBC4MysqlXAConnecti on@78dc4c on com.mysql.jdbc.jdbc2.optional.JDBC4ConnectionWrapp er@c70b0d
    at org.springframework.jdbc.support.AbstractFallbackS QLExceptionTranslator.translate(AbstractFallbackSQ LExceptionTranslator.java:83)
    at org.springframework.jdbc.support.AbstractFallbackS QLExceptionTranslator.translate(AbstractFallbackSQ LExceptionTranslator.java:80)
    at org.springframework.jdbc.support.AbstractFallbackS QLExceptionTranslator.translate(AbstractFallbackSQ LExceptionTranslator.java:80)
    at org.springframework.batch.item.database.JdbcCursor ItemReader.openCursor(JdbcCursorItemReader.java:13 0)
    at org.springframework.batch.item.database.AbstractCu rsorItemReader.doOpen(AbstractCursorItemReader.jav a:401)
    at org.springframework.batch.item.support.AbstractIte mCountingItemStreamItemReader.open(AbstractItemCou ntingItemStreamItemReader.java:134)
    ... 24 more
    Caused by: bitronix.tm.internal.BitronixSystemException: resource 'mysql' cannot be used outside XA transaction scope. Set allowLocalTransactions to true if you want to allow this and you know your resource supports this.
    at bitronix.tm.resource.common.TransactionContextHelp er.enlistInCurrentTransaction(TransactionContextHe lper.java:79)
    at bitronix.tm.resource.jdbc.JdbcConnectionHandle.enl istResource(JdbcConnectionHandle.java:84)
    ... 34 more
    ----------------------------------------------------------------------------------------------------------

    I was able to configure bitronix JTA transaction manager for XA resources.
    1. I have configured a transaction manager for spring batch internal usage, and a JTA Transaction manager at tasklet level.
    2. Inside tasklet i read the data from dataSource_1, process the data and then write the data to dataSource_2.

    Code has been shared below:
    1. launch-context.xml [ Configuration to create job repository and job launcher ....]
    <bean id="jobLauncher"
    class="org.springframework.batch.core.launch.suppo rt.SimpleJobLauncher">
    <property name="jobRepository" ref="jobRepository" />
    </bean>

    <bean id="jobRepository"
    class="org.springframework.batch.core.repository.s upport.JobRepositoryFactoryBean"
    p:dataSource-ref="dataSource" p:transactionManager-ref="transactionManager" p:databaseType="POSTGRES" />

    <bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource">
    <property name="driverClassName" value="${batch.jdbc.driver}" />
    <property name="url" value="${batch.jdbc.url}" />
    <property name="username" value="${batch.jdbc.user}" />
    <property name="password" value="${batch.jdbc.password}" />
    </bean>



    <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSou rceTransactionManager" lazy-init="true">
    <property name="dataSource" ref="dataSource" />
    </bean>


    <bean id="placeholderProperties" class="org.springframework.beans.factory.config.Pr opertyPlaceholderConfigurer">
    <property name="location" value="classpath:batch.properties" />
    <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE" />
    <property name="ignoreUnresolvablePlaceholders" value="true" />
    <property name="order" value="1" />
    </bean>
    ......
    ----------------------------------------------------------------------------------------------------
    2. readWriteJob-db.xml [two datasources and trx manager configuration using bitronix ]

    <bean id="dataSource_1" class="bitronix.tm.resource.jdbc.PoolingDataSource "
    init-method="init" destroy-method="close">
    <property name="className" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSo urce" />
    <property name="uniqueName" value="mysql" />
    <property name="minPoolSize" value="0" />
    <property name="maxPoolSize" value="10" />
    <property name="driverProperties">
    <props>
    <prop key="url">jdbc:mysql://localhost:3306/testdb</prop>
    <prop key="user">root</prop>
    <prop key="password">root</prop>
    </props>
    </property>
    </bean>

    <bean id="dataSource_2" class="bitronix.tm.resource.jdbc.PoolingDataSource "
    init-method="init" destroy-method="close">
    <property name="className" value="org.postgresql.xa.PGXADataSource" />
    <property name="uniqueName" value="postgres" />
    <property name="minPoolSize" value="0" />
    <property name="maxPoolSize" value="10" />
    <property name="driverProperties">
    <props>
    <prop key="user">postgres</prop>
    <prop key="password">postgres</prop>
    <prop key="portNumber">5432</prop>
    <prop key="serverName">localhost</prop>
    <prop key="databaseName">testdb</prop>
    </props>
    </property>
    </bean>



    <bean id="btmConfig" factory-method="getConfiguration"
    class="bitronix.tm.TransactionManagerServices">
    <property name="serverId" value="spring-btm" />
    </bean>

    <bean id="bitronixTrxMgr" class="bitronix.tm.TransactionManagerServices"
    factory-method="getTransactionManager"
    depends-on="btmConfig,dataSource_1,dataSource_2" destroy-method="shutdown">
    </bean>

    <bean id="JtaTransactionManagers" class="org.springframework.transaction.jta.JtaTran sactionManager">
    <property name="transactionManager" ref="bitronixTrxMgr"/>
    <property name="userTransaction" ref="bitronixTrxMgr"/>
    </bean>
    -----------------------------------------------------------------------------------------------------------
    3. readWriteJob.xml

    <job id="readWriteJob" incrementer="timeStampIncrement" >
    <step id="handleTrxsStep" >
    <tasklet transaction-manager="JtaTransactionManagers">
    <chunk commit-interval="1" reader="transactionsReader"
    processor="transactionProcessor" writer="transactionsWriter">
    </chunk>
    </tasklet>
    </step>
    </job>
    <!-- Item Reader -->
    <beans:bean id="transactionsReader"
    class="org.springframework.batch.item.database.Jdb cCursorItemReader">
    <beans:property name="dataSource" ref="dataSource_1"/>
    <beans:property name="sql" value="${query.trx.read}"/>
    <beans:property name="rowMapper">
    <beans:bean
    class="com.mmf.batch.examples.dao.rowmapper.Transa ctionsReaderRowMapper"/>
    </beans:property>
    </beans:bean>

    <!-- Item Processor -->
    <beans:bean id="transactionProcessor" class="com.mmf.batch.examples.dao.TrxItemProcessor "/>

    <!-- Item Writer -->
    <beans:bean id="transactionsWriter" class="com.mmf.batch.examples.dao.TransactionItemW riterImpl">
    <beans:constructor-arg index="0" ref="dataSource_2" />
    <beans:property name="queries" ref="queries"/>
    </beans:bean>
    <util:properties id="queries">
    <beans:prop key="TRX_GROUP_INSERT">${query.trx.group.insert}</beans:prop>
    <beans:prop key="TRX_LOAN_INSERT">${query.trx.loan.insert}</beans:prop>
    <beans:prop key="TRX_READ">${query.trx.read}</beans:prop>
    </util:properties>

Tags for this Thread

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •