In psudo code, I was trying to achieve the following:
Code:
for each Step
{
for each chunk
{
TX Start
for each item in Chunk
{
// asynchronous chunk operation:
taskExecuor.run(
Object o = readFromProvider()
processor.process( o );
}
}
Wait for all task executors to complete
Update Repository with RESTARTDATA etc
TX END
}
}
But, your observation is completely correct:' transactions generally - they are normally single threaded' I've now tried setting up an asynchronous step operation via the StepExecutor as suggested. There were a number of things I had to do before it started working:
1. Remove scope="step" from SimpleStepConfiguration
2. Change from hibernateJobDao to SqlJobDao. This was due to hibernate bombing out with StaleObjectStateException when it was trying to update a batch_step which has already been updated by another thread. It would be nice to have this working with hibernate ??
After I got it working I noticed that it was not saving any restart data.
For reference:
Code:
<bean id="jobExecutor" class="org.springframework.batch.execution.job.DefaultJobExecutor">
<property name="jobRepository" ref="simpleJobRepository" />
<property name="stepExecutorFactory">
<bean class="org.springframework.batch.execution.step.PrototypeBeanStepExecutorFactory">
<property name="stepExecutorName" value="myStepExecutor" />
</bean>
</property>
</bean>
<bean id="jobExecutorFacade" class="org.springframework.batch.execution.facade.SimpleJobExecutorFacade">
<property name="jobRepository" ref="simpleJobRepository" />
<property name="jobConfigurationLocator" ref="jobConfigurationRegistry"/>
<property name="jobExecutor" ref="jobExecutor" />
</bean>
<bean id="myStepExecutor" class="org.springframework.batch.execution.step.simple.SimpleStepExecutor" scope="prototype">
<property name="transactionManager" ref="txManager" />
<property name="repository" ref="simpleJobRepository" />
<property name="stepOperations">
<bean class="org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate">
<property name="taskExecutor" ref="threadPoolTaskExecutor"/>
</bean>
</property>
</bean>
<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor">
</bean>
<bean id="threadPoolTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="5" />
<property name="maxPoolSize" value="10" />
<property name="queueCapacity" value="0" />
</bean>
<!-- Job Launcher 1 -->
<bean id="jobLauncher_1" class="org.springframework.batch.execution.bootstrap.TaskExecutorJobLauncher">
<property name="jobExecutorFacade" ref="jobExecutorFacade" />
<property name="taskExecutor" ref="taskExecutor"/>
<property name="autoStart" value="false" />
<property name="jobConfigurationName" value="jobConfiguration_1"/>
</bean>
<bean id="jobConfiguration_1" class="org.springframework.batch.core.configuration.JobConfiguration">
<property name="steps">
<list>
<bean id="jc_1_step_1" class="org.springframework.batch.execution.step.SimpleStepConfiguration">
<constructor-arg>
<bean id="jc_1_module1" class="org.springframework.batch.execution.tasklet.RestartableItemProviderTasklet">
<property name="itemProcessor">
<bean class="com.zzz.springdemo.batch.processor.MyProcessor">
</bean>
</property>
<property name="itemProvider">
<bean class="com.zzz.springdemo.batch.provider.MyProvider">
</bean>
</property>
</bean>
</constructor-arg>
<property name="commitInterval" value="10"/>
<property name="allowStartIfComplete" value="true" />
<property name="saveRestartData" value="true" />
<property name="exceptionHandler">
<bean class="org.springframework.batch.repeat.exception.handler.SimpleLimitExceptionHandler">
<property name="limit" value="5" />
<property name="useParent" value="true"/>
</bean>
</property>
</bean>
</list>
</property>
</bean>