Ok so it works. Kinda.... It makes a lot of sense now and looking back I don't know how I did not think that the commit interval will make sure the threads do not do too much.
I have a different issue now, which I do not know where to begin solving. If I stop the job and restart it the readers fail because they cannot start where they left off. To me it looks like the parameters are not replaced back into the query when restarting. The exception:
Code:
bad SQL grammar [SELECT SORT_KEY FROM ( SELECT ID AS SORT_KEY, ROW_NUMBER() OVER (ORDER BY ID ASC) AS ROW_NUMBER FROM TABLE WHERE ID >= :min and ID <= :max) WHERE ROW_NUMBER = 30]; nested exception is com.ibm.db2.jcc.b.SqlException: DB2 SQL error: SQLCODE: -312, SQLSTATE: 42618, SQLERRMC: min
Step:
Code:
<beans:bean name="extractStep:master" class="org.springframework.batch.core.partition.support.PartitionStep">
<beans:property name="jobRepository" ref="jobRepository" />
<beans:property name="stepExecutionSplitter">
<beans:bean name="stepExecutionSplitter" class="org.springframework.batch.core.partition.support.SimpleStepExecutionSplitter">
<beans:constructor-arg ref="jobRepository" />
<beans:constructor-arg ref="extractStep" />
<beans:constructor-arg ref="extractIdPartitioner"/>
</beans:bean>
</beans:property>
<beans:property name="partitionHandler">
<beans:bean class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler">
<beans:property name="taskExecutor" ref="taskExecutor"/>
<beans:property name="step" ref="extractStep" />
<beans:property name="gridSize" value="10" />
</beans:bean>
</beans:property>
<beans:property name="stepExecutionListeners">
<beans:list>
<beans:ref bean="extractIdPartitioner"/> <!-- Get the JobId -->
</beans:list>
</beans:property>
</beans:bean>
<beans:bean id="extractIdPartitioner" class="part.IDPartitioner">
<beans:property name="dataSource" ref="dataSource"/>
<beans:property name="tableName" value="TABLENAME"/>
<beans:property name="where" value="PROCESSED = 0"/>
</beans:bean>
<step id="extractStep">
<tasklet allow-start-if-complete="true" >
<chunk reader="extractReader" writer="extractWriter" commit-interval="10" skip-limit="10">
<skippable-exception-classes>
java.lang.Exception
</skippable-exception-classes>
</chunk>
<listeners>
<listener ref="extractStepListener"/>
</listeners>
</tasklet>
</step>
Reader:
Code:
<beans:bean id="extractReader" scope="step" autowire-candidate="false" class="org.springframework.batch.item.database.JdbcPagingItemReader">
<beans:property name="dataSource" ref="dataSource" />
<beans:property name="rowMapper">
<beans:bean class="xx.MyRowMapper" />
</beans:property>
<beans:property name="queryProvider">
<beans:bean class="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean">
<beans:property name="dataSource" ref="dataSource"/>
<beans:property name="fromClause" value="TABLENAME"/>
<beans:property name="selectClause" value="*"/>
<beans:property name="sortKey" value="ID"/>
<beans:property name="whereClause" value="ID >= :min and ID <= :max"/>
</beans:bean>
</beans:property>
<beans:property name="parameterValues">
<beans:map>
<beans:entry key="min" value="#{stepExecutionContext[MIN]}"/>
<beans:entry key="max" value="#{stepExecutionContext[MAX]}"/>
</beans:map>
</beans:property>
</beans:bean>
Looking at the stepExecutionContext of one of the threads:
Code:
{"map":{"entry":[{"string":"itemCount","long":177},{"string":"org.springframework.batch.core.step.item.ChunkMonitor.OFFSET","int":3},{"string":"MIN","long":18132},{"string":"JdbcPagingItemReader.read.count","int":90},{"string":"MAX","long":18308}]}}