First of all, My apology for this lengthy post.
I have a batch job that reads from a database(Oracle 10g) table, process and writes to a flat file. Data volume is 1.5 million records.
I took Partition approach and able to create the partitions. However when running the job, only the master step and one slave step(myStep : partition0)
are actaully reading, processing and writing data. Remaining steps are completed with 0 read/writes. The Reader, Processor and Writer are in Step scope.
Here is a sample scenario(with 2715 records) I tested before running actual 1.5 million records.
1. Created a custom partition handler similar to ColumnRangePartitionHandler with gridSize = 4 and used SimpleAsyncTaskExecutor.
2. Used JdbcCursorItemReader to read from the table by rownum.
3. When I checked batch_step_exectuion table it has myStepMaster, myStep : partition0, myStep : partition1, myStep : partition2, myStep : partition3.
4. Both myStepMaster and myStep : partition0 read 679 recs. Only partition0's 679 recs were written to flat file and these steps are in 'COMPLETED' status.
5. myStep : partition1, myStep : partition2,myStep : partition3 are completed with 0 read/write.
6. Eventhough, there is a seperate executionContext for each partition, still I am not able to read, process and write all 2715 recs with multi threads. I tried this scenario with 3 taskExecutors and no luck.
Please see my job.xml file and help me to read, process and write 2715 recs with 4 partitions each with 679 recs concurrently.
Thanks in advance.Code:<job id="myJob" xmlns="http://www.springframework.org/schema/batch"> <step id="myStepMaster"> <partition step="myStep" partitioner="partitioner"> <handler grid-size="4" task-executor="taskExecutor" /> </partition> </step> </job> <beans:bean id="partitioner" class="com.xxx.CustomPartitioner" > <beans:property name="dataSource" ref="dataSource" /> <beans:property name="table" value="TAB1" /> <beans:property name="column" value="rownum" /> </beans:bean> <beans:bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" /> <!-- <beans:bean id="taskExecutor" class="org.springframework.core.task.SyncTaskExecutor" /> <beans:bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" /> <beans:bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ConcurrentTaskExecutor" /> --> <step id="myStep" xmlns="http://www.springframework.org/schema/batch"> <tasklet> <chunk reader="myItemReader" processor="myProcessor" writer="myItemWriter" commit-interval="30" /> <listeners> <listener ref="fileNameListener" /> </listeners> </tasklet> </step> <beans:bean id="fileNameListener" class="com.xxx.OutputFileListener" scope="step"> <beans:property name="path" value="file:./temp/partitions/" /> </beans:bean> <beans:bean id="myItemReader" scope="step" autowire-candidate="false" parent="itemReaderParent"> <beans:property name="sql"> <beans:value> <![CDATA[ select * from TAB1 where rownum >= ? and rownum <= ? ]]> </beans:value> </beans:property> <beans:property name="preparedStatementSetter"> <beans:bean class="org.springframework.batch.core.resource.ListPreparedStatementSetter"> <beans:property name="parameters"> <beans:list> <beans:value>#{stepExecutionContext[minValue]}</beans:value> <beans:value>#{stepExecutionContext[maxValue]}</beans:value> </beans:list> </beans:property> </beans:bean> </beans:property> </beans:bean> <beans:bean id="itemReaderParent" class="org.springframework.batch.item.database.JdbcCursorItemReader" abstract="true"> <beans:property name="dataSource" ref="dataSource" /> <beans:property name="rowMapper"> <beans:bean class="com.xxx.MyRowMapper" /> </beans:property> </beans:bean> <beans:bean id="myProcessor" class="com.xxx.MyProcessor" scope="step" /> <beans:bean id="myItemWriter" class="com.xxx.MyItemWriter" scope="step"> <beans:property name="myDao" ref="myDaoImpl" /> </beans:bean> <beans:bean id="myDaoImpl" class="com.xxx.myDaoImpl" scope="step"> <beans:property name="itemWriter"> <beans:bean class="org.springframework.batch.item.file.FlatFileItemWriter"> <beans:property name="resource" value="#{stepExecutionContext[outputFile]}" /> <beans:property name="lineAggregator"> <beans:bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator" > <beans:property name="delimiter" value=","/> <beans:property name="fieldExtractor"> <beans:bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor" > <beans:property name="names" value="p1,p2,p3,p4" /> </beans:bean> </beans:property> </beans:bean> </beans:property> </beans:bean> </beans:property> </beans:bean>


Reply With Quote