
Originally Posted by
mpish
We are trying to prove out an ETL service proto type application using Spring Batch. It is wrapped in a web service and deployed in Weblogic container.
The ETL data, obtained through the spring batch service, is then posted to a Solr implementation index for querying.
Essentially, there are two parallel, multi-threaded spring batch steps involved in the job. The first step uses synchronized Jdbc Cursor
item reader (wrapped in Synchronized delegator) to read in ID data from Oracle DB (chunked in 50).
The processor in the first step then use these IDs to query detail data
from the DB (through a custom DAO) and generate the pojo, which the item writer (custom) writes on to a Queue (commons collection UnboundedFifoBuffer queue).
In the second spring batch step (running parallel to the first through split/flow), a custom item reader reads in the Queue message (pojos) and the item writer
invokes Solr server via SolrJ api to write out into the Index. Here is the Spring Batch XML:
<!--************************************************** *************-->
<!--************************Job Definition*************************-->
<!--************************************************** *************-->
<batch:job id="ETLService">
<batch:split id="etlService.split" task-executor="split.taskExecutor">
<batch:flow>
<batch:step id="etlService.getClaimBIDWList">
<batch:tasklet task-executor="etlService.step.pooledTaskExecutor" throttle-limit="25">
<batch:chunk
reader="claimBIDWDBItemReader"
processor="claimBIDWDetailItemProcessor"
writer="claimBIDWQItemWriter"
commit-interval="50"/>
<batch:listeners>
<batch:listener ref="claimBIDWQItemWriter" />
</batch:listeners>
</batch:tasklet>
</batch:step>
</batch:flow>
<batch:flow>
<batch:step id="etlService.indexClaimBIDWData">
<batch:tasklet task-executor="etlService.step.pooledTaskExecutor" throttle-limit="25">
<batch:chunk
reader="claimBIDWQItemReader"
writer="claimBIDWSolrItemWriter"
commit-interval="50"/>
<batch:listeners>
<batch:listener ref="claimBIDWQItemReader"/>
</batch:listeners>
</batch:tasklet>
</batch:step>
</batch:flow>
</batch:split>
<batch:listeners>
<batch:listener ref="qWrapper"/>
</batch:listeners>
</batch:job>
<!-- Multithreaded support -->
<bean id="split.taskExecutor" class="org.springframework.core.task.SimpleAsyncTa skExecutor">
<property name="concurrencyLimit" value="2"/>
</bean>
<bean id="etlService.step.pooledTaskExecutor" class="org.springframework.scheduling.concurrent.T hreadPoolTaskExecutor">
<property name="corePoolSize" value="200" />
<property name="maxPoolSize" value="200" />
</bean>
<!--************************************************** *************-->
<!--************************Item Readers***************************-->
<!--************************************************** *************-->
<bean id="claimBIDWDBItemReader" class="com.esuite.batch.itemreader.SynchronizingIt emReaderDelegator">
<property name="itemReader">
<bean class="org.springframework.batch.item.database.Jdb cCursorItemReader" scope="step">
<property name="dataSource" ref="ETL_SVC_BIDW.ClaimDataSource"/>
<property name="sql">
<value>
SELECT clmdim.CLAIM_NUM FROM DATAMART.CLM_DIM clmdim WHERE clmdim.RECEIVED_DT = '#{jobParameters['etl.claims.for.date']}'
</value>
</property>
<property name="rowMapper">
<bean class="com.esuite.batch.itemreader.row.ClaimBIDWET LRowMapper"/>
</property>
</bean>
</property>
</bean>
<bean id="claimBIDWQItemReader" class="com.esuite.batch.itemreader.etl.ClaimBIDWQI temReader" scope="step" >
<property name="qWrapper" ref="qWrapper"/>
</bean>
<!--************************************************** *************-->
<!--************************Item Writers***************************-->
<!--************************************************** *************-->
<bean id="claimBIDWQItemWriter" class="com.esuite.batch.itemwriter.etl.ClaimBIDWQI temWriter" scope="step">
<property name="qWrapper" ref="qWrapper"/>
</bean>
<bean id="claimBIDWSolrItemWriter" class="com.esuite.batch.itemwriter.etl.ClaimBIDWSo lrItemWriter" scope="step">
<property name="indexerSvc" ref="BATCH_SVC_BIDW.indexerSvc"/>
<property name="solrURL" value="#{jobParameters['etl.service.solr.url']}"/>
<property name="jobId" value="#{stepExecution.jobExecution.jobId}"/>
</bean>
<!--************************************************** *************-->
<!--************************Item Processors************************-->
<!--************************************************** *************-->
<bean id="claimBIDWDetailItemProcessor" class="com.esuite.batch.itemprocessor.ClaimBIDWDet ailItemProcessor">
<property name="bidwDao" ref="dao1"/>
<property name="viantDao" ref="dao2"/>
</bean>
<bean id="dao1" class="com.esuite.batch.pojo.ClaimBIDWDAO">
<property name="datasource" ref="ETL_SVC_BIDW.ClaimDataSource"/> <!--- Weblogic Datasources -->
</bean>
<bean id="dao2" class="com.esuite.batch.pojo.ClaimDAO">
<property name="datasource" ref="ETL_SVC.ClaimDataSource"/>
</bean>
<!--************************************************** *************-->
<!--********************** Queue Resources *************************-->
<!--************************************************** *************-->
<bean id="qWrapper" class="com.esuite.batch.services.etl.QWrapper"/>
</beans>
This job is then run from a Junit client code, invoking the web service and passing in the '#{jobParameters['etl.claims.for.date']}' job parameter to
retrieve the IDs from the first step jdbc cursor item reader. I am also able to run multiple instances of this job at the same time, passing in diffrent
'#{jobParameters['etl.claims.for.date']}' variable (two different dates and they have distinct rows in the DB, so no overlap). What I am seeing when I run multiple
instances is that:
(1) I have to set the concurrency limit to 2 (if I run 2 instances of the same job with different bind variables) in the Split level Asynch. task Executor
(2) The second instance of the job (kicked off concurrently with first instance with different bind variable value) is waiting to
execute its steps till the steps from the first job is completed (instead of running in parallel with the first job instance) - meaning
they are running sequentially.
I am not able to figure out why the second instance of the job is not parallely executing with the first instance. The idea is to have mutiple instances
of this job run concurrently for different dates (the bind variable), extracting data to the Solr index. If the jobs are going to run sequentially, we
are not going to be able to utilize the full app server capabilities (like connection pools and concurrency abilities). Also, since we have to load a lot of
data for the first Solr index bulk population, it makes sense to minimize the time it takes to complete the ETL process and running them sequentially
doesn't help. Instead of using single dates for query criteria, we will be using selected date ranges in the actual implementation.
Do we have our spring XML correctly configured for this scenario? FYI - The two steps within the job is running parallel through the split/flow configuration.