We use jdbcTemplate to do the updateBatch (I have omitted some special logic used to divide data up into INSERTs and UPDATEs, but here is some pseudocode using JDBC batch update):
Code:
public class XXXItemWriter implements ItemWriter<X>, InitializingBean {
...
public void afterPropertiesSet() throws Exception {
jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
public int[] write(final List<X> items) throws Exception {
String sql = ...
int[] ret = getJdbcTemplate().batchUpdate(sql, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
X row = items.get(i);
// ps.setXXX(row.getAbc());
}
@Override
public int getBatchSize() {
return items.size();
}
});
// check ret is correct...
}
}
So what we are considering currently is a Tasklet that does a lot of the logic Spring Batch does for chunks (full code is given):
Code:
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.PassThroughItemProcessor;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.InitializingBean;
public class SingleCommitChunkOrientedTasklet<I,O> implements Tasklet, InitializingBean {
private static Logger LOGGER = LoggerFactory.getLogger(SingleCommitChunkOrientedTasklet.class);
private ItemReader<I> reader;
private ItemProcessor<I, O> processor;
private ItemWriter<O> writer;
private Tasklet preTasklet;
private Tasklet postTasklet;
private int chunkSize = 1000;
private int count = 0;
public void setReader(ItemReader<I> reader) {
this.reader = reader;
}
public void setProcessor(ItemProcessor<I, O> processor) {
this.processor = processor;
}
public void setWriter(ItemWriter<O> writer) {
this.writer = writer;
}
public void setChunkSize(int chunkSize) {
this.chunkSize = chunkSize;
}
public void setPreTasklet(Tasklet preTasklet) {
this.preTasklet = preTasklet;
}
public void setPostTasklet(Tasklet postTasklet) {
this.postTasklet = postTasklet;
}
@SuppressWarnings("unchecked")
@Override
public void afterPropertiesSet() throws Exception {
if (processor == null) {
processor = (ItemProcessor<I, O>) new PassThroughItemProcessor<I>();
}
}
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
StepExecution stepExecution = prepare(contribution, chunkContext);
execute(stepExecution);
return finish(contribution, chunkContext);
}
private StepExecution prepare(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();
if (preTasklet != null) {
while (preTasklet.execute(contribution, chunkContext).isContinuable()) {};
}
if (reader instanceof ItemStream) {
LOGGER.debug("Opening ItemStream...");
ExecutionContext executionContext = stepExecution.getExecutionContext();
((ItemStream)reader).open(executionContext);
((ItemStream)reader).update(executionContext);
}
return stepExecution;
}
private void execute(StepExecution stepExecution) throws Exception {
I item;
List<O> items = new ArrayList<O>();
while ((item = reader.read()) != null) {
stepExecution.setReadCount(count);
O processedItem = processor.process(item);
if (processedItem != null) {
items.add(processedItem);
if (++count % chunkSize == 0) {
LOGGER.debug("Writing item numbers {}-{} ({} items)", new Object[]{count-items.size()+1, count, items.size()});
writer.write(items);
stepExecution.setWriteCount(count);
items = new ArrayList<O>();
}
}
}
stepExecution.setReadCount(count);
if (! items.isEmpty()) {
LOGGER.debug("Writing item numbers {}-{} ({} items)", new Object[]{count-items.size()+1, count, items.size()});
writer.write(items);
stepExecution.setWriteCount(count);
}
}
private RepeatStatus finish(StepContribution contribution, ChunkContext chunkContext) throws Exception {
if (postTasklet != null) {
while (postTasklet.execute(contribution, chunkContext).isContinuable()) {};
}
if (reader instanceof ItemStream) {
LOGGER.debug("Closing ItemStream...");
((ItemStream)reader).close();
}
return RepeatStatus.FINISHED;
}
}
Then instead of using
Code:
<job id="myJob" xmlns="http://www.springframework.org/schema/batch" >
<step id="prepare" next="step1">
<tasklet ref="myTasklet" />
</step>
<step id="step1">
<tasklet>
<chunk reader="myItemReader" processor="myItemProcessor" writer="myItemWriter" commit-interval="1000" />
</tasklet>
</step>
</job>
we have
Code:
<bean id="singleCommitTasklet" class="x.y.z.SingleCommitChunkOrientedTasklet" scope="step">
<property name="preTasklet" ref="myTasklet" />
<property name="reader" ref="myItemReader" />
<property name="processor" ref="myItemProcessor" />
<property name="writer" ref="myItemWriter" />
<property name="chunkSize" value="1000" />
</bean>
<job id="myJob" xmlns="http://www.springframework.org/schema/batch" >
<step id="step1">
<tasklet ref="singleCommitTasklet" />
</step>
</job>
It seems to work OK, but of course we will have to add more logic as needed (different listeners and so forth - stuff that is already implemented in Spring Batch as it is). It would be nice if Spring Batch had something like:
Code:
<atomic-chunk reader="myItemReader" processor="myItemProcessor" writer="myItemWriter" chunk-size="1000" />
Or
Code:
<step ... postponeCommit="true">
...
</step>
I realize jobs won't be 'resumeable', but to meet the task's requirements and keep the programming pattern as similar as possible, I feel the 'postponeCommit' would be easiest to implement and give users a lot of flexibility.
I know other's have had trouble with unit tests and the locking semaphore if using @Transactional - we have had trouble with this too as we would like to use DbUnit to sync/populate a table, run the job, check the results and then rollback the entire transaction/job/dbunit stuff such that the database is in the exact same state as when the test started. It would be easy to write a job configuration with postponeCommit="true" for everything and then use @TransactionConfiguration(defaultRollback=true) and @Transactional in the unit test.
Spring Batch has a lot of positive features, but we feel we need more control over our transactions.