This may be a simple problem or I may be completely missing something in my configuration. I am trying to implement a parallel batch job based on the partitioning example provided with the Spring Batch release. My understanding of the partitioning example is that as is, it is not multi-threaded, but single threaded due to the use of the SynchTaskExecutor. In order to make it multi-threaded I changed the configuration to use the ThreadPoolTaskExecutor. While this change succeeded in making the partitioned steps run in parallel I have run into a couple of problems.
The first problem is when I run the job using a non-dataSource job repository and transaction manager, i.e., MapJobRepositoryFactoryBean and ResourcelessTransactionManager (for testing), one or more of the partitioned steps fails with an OptimisticLockingFailureException:
This problem seems to go away when I change the configuration to point to a database backed data source and transaction manager. However, when I increase the number of threads (or files in MultiResource...) it appears to cause a dead-lock scenario. The steps execute once and then hang without any further execution.Code:2009-07-13 17:17:33,209 ERROR [org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor#19ecd80-4] - Rollback caused by fatal failure failed org.springframework.transaction.IllegalTransactionStateException: Transaction is already completed - do not call commit or rollback more than once per transaction at org.springframework.transaction.support.AbstractPlatformTransactionManager.rollback(AbstractPlatformTransactionManager.java:795) at org.springframework.batch.core.step.tasklet.TaskletStep.rollback(TaskletStep.java:368) ... at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:197) at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:108) at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:106) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) ... at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:619) 2009-07-13 17:17:33,225 ERROR [org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor#19ecd80-4] - Encountered an error executing the step: class org.springframework.batch.core.step.AbstractStep$FatalException: Fatal failure detected org.springframework.batch.core.step.AbstractStep$FatalException: Fatal failure detected at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:303) ... at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:197) at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:108) at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:106) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) ... at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:619) Caused by: org.springframework.dao.OptimisticLockingFailureException: Attempt to update step execution id=3 with wrong version (1), where current version is 0 at org.springframework.batch.core.repository.dao.MapStepExecutionDao.updateStepExecution(MapStepExecutionDao.java:86) at org.springframework.batch.core.repository.support.SimpleJobRepository.update(SimpleJobRepository.java:167) ... at $Proxy37.update(Unknown Source) at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:300) ... 13 more
I have tried multiple configurations, from using my own custom partitioner to using the MultiResourcePartitioner, in both cases they react the same.
Here is a partial of my batch configuration file:
Note, the LoggingItemWriter above is a stateless writer that just writes the Item received to the console.Code:<job id="sample.parallelJob"> <step id="sample.parallelJob.step" parent="sample.parallelJob.step1:master"/> </job> <beans:bean name="sample.parallelJob.step1:master" class="org.springframework.batch.core.partition.support.PartitionStep"> <beans:property name="jobRepository" ref="jobRepository" /> <beans:property name="stepExecutionSplitter"> <beans:bean class="org.springframework.batch.core.partition.support.SimpleStepExecutionSplitter"> <beans:constructor-arg ref="jobRepository" /> <beans:constructor-arg ref="sample.parallelJob.step1" /> <beans:constructor-arg> <beans:bean class="org.springframework.batch.core.partition.support.MultiResourcePartitioner"> <beans:property name="resources" value="classpath:sample/data/*.txt" /> </beans:bean> </beans:constructor-arg> </beans:bean> </beans:property> <beans:property name="partitionHandler"> <beans:bean class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler"> <beans:property name="taskExecutor"> <!-- --> <beans:bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" p:queueCapacity="20" p:corePoolSize="20" p:maxPoolSize="20"/> <!-- --> </beans:property> <beans:property name="step" ref="sample.parallelJob.step1" /> </beans:bean> </beans:property> </beans:bean> <step id="sample.parallelJob.step1"> <tasklet job-repository="jobRepository" transaction-manager="transactionManager"> <chunk writer="sample.parallelJob.writer" reader="sample.parallelJob.reader" commit-interval="1" > </chunk> </tasklet> </step> <beans:bean id="sample.parallelJob.reader" scope="step" autowire-candidate="false" class="org.springframework.batch.item.file.FlatFileItemReader"> <beans:property name="resource" value="#{stepExecutionContext[fileName]}" /> <beans:property name="lineMapper"> <beans:bean class="org.springframework.batch.item.file.mapping.PassThroughLineMapper"/> </beans:property> </beans:bean> <beans:bean id="sample.parallelJob.writer" class="LoggingItemWriter"/>
I've been banging my head against this for a while now to no avail. Any insight into the problem would be greatly appreciated.


Reply With Quote
