Hi
I have a file which contains millions or records, and I need to process each record, and store in a database. It's working - but it's slow. I can store approx 250 records a minute in the database on my development machine. I was using FlatFileItemReader and a custom writer to an Oracle database.
I read a post JIRA BATCH-1301 which suggests that FlatFileItemReader can be used with a task-executor, but I was wondering if it is ok to use it with a task-executor and in step scope? I tried it, and it seemed to work ok. I can store approx 1000 records a minute in the database. Only 1 job was running at the time. However, I'm seeing the following warnings in the log file..
PHP Code:
2012-02-15 11:24:38,739 WARN [ChunkMonitor ][taskExecutor-2 ] No ItemReader set (must be concurrent step), so ignoring offset data.
2012-02-15 11:24:38,970 WARN [ChunkMonitor ][taskExecutor-7 ] ItemStream was opened in a different thread. Restart data could be compromised.
2012-02-15 11:24:39,110 INFO [StandardQueryCache ][taskExecutor-7 ] starting query cache at region: query.findByDetachedCriteria.uk.co.and.domain.Customer
2012-02-15 11:24:39,215 WARN [ChunkMonitor ][taskExecutor-6 ] ItemStream was opened in a different thread. Restart data could be compromised.
2012-02-15 11:24:39,227 WARN [ChunkMonitor ][taskExecutor-3 ] ItemStream was opened in a different thread. Restart data could be compromised.
2012-02-15 11:24:39,274 WARN [ChunkMonitor ][taskExecutor-5 ] ItemStream was opened in a different thread. Restart data could be compromised.
2012-02-15 11:24:42,249 WARN [ChunkMonitor ][taskExecutor-1 ] ItemStream was opened in a different thread. Restart data could be compromised.
2012-02-15 11:24:42,257 WARN [ChunkMonitor ][taskExecutor-9 ] ItemStream was opened in a different thread. Restart data could be compromised.
2012-02-15 11:24:42,257 WARN [ChunkMonitor ][taskExecutor-8 ] ItemStream was opened in a different thread. Restart data could be compromised.
2012-02-15 11:24:42,264 WARN [ChunkMonitor ][taskExecutor-10 ] ItemStream was opened in a different thread. Restart data could be compromised.
2012-02-15 11:24:45,219 WARN [ChunkMonitor ][taskExecutor-4 ] ItemStream was opened in a different thread. Restart data could be compromised.
Basically, I set the saveState = false and the job is not restartable. I inject a task executor into the tasklet.
Would be grateful if someone could advise as to the best approach
PHP Code:
<bean id="myItemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
<property name="comments" value="#" />
<property name="saveState" value="false" />
<property name="lineMapper" >
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean class="....MyCustomLineTokenizer" />
</property>
<property name="fieldSetMapper">
<bean class="...MyCustomFieldSetMapper" />
</property>
</bean>
</property>
<property name="resource" value="#{stepExecutionContext['input.file.url']}" />
</bean>
<bean id="myItemProcessor" class="....MyCustomItemProcessor" scope="step" />
<bean id="myItemWriter" class="....MyCustomItemWriter" scope="step">
<property name="fileId" value="#{jobExecutionContext['fileId']}" />
</bean>
<!-- bad record in input file, or a database error, then write to error log file for emailing later -->
<bean id="myErrorItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step">
<property name="resource" value="#{stepExecutionContext['rejected.file.url']}" />
<property name="transactional" value="false" />
<property name="lineAggregator">
<bean class="..CustomLineAggreator" />
</property>
<property name="shouldDeleteIfExists" value="false" />
</bean>
<bean id="myStepListener" class="....CustomStepExecutionListener" scope="step">
<property name="writer" ref="myErrorItemWriter" />
</bean>
<!-- Bring the job together, read input list, create db records, email lists to business -->
<batch:job id="myJob" restartable="false">
<batch:step id="setupStep" next="uploadDecider">
<batch:tasklet ref="mySetupTasklet" />
</batch:step>
<batch:decision id="uploadDecider" decider="myUploadDecider">
<batch:end on="STOPPED"/>
<batch:next on="*" to="uploadRecords"/>
</batch:decision>
<batch:step id="uploadRecords">
<batch:tasklet task-executor="taskExecutor">
<batch:chunk reader="myItemReader" processor="myItemProcessor" writer="myItemWriter" commit-interval="10" skip-limit="100" >
<batch:skippable-exception-classes>
<batch:include class="java.lang.Exception"/>
<batch:exclude class="java.io.FileNotFoundException"/>
</batch:skippable-exception-classes>
<batch:streams>
<!-- essential for skip listener, write bad records to error file -->
<batch:stream ref="myErrorItemWriter"/>
</batch:streams>
</batch:chunk>
<batch:listeners>
<!-- setup the step and perform actions after the step completes -->
<batch:listener ref="myStepListener" />
</batch:listeners>
</batch:tasklet>
</batch:step>
</batch:job>
PHP Code:
public class MyItemProcessor implements ItemProcessor<MyJobVO, MyJobVO> {
/**
* Takes in a MyJobVO, checks email and then passes on to the writer.
*/
@Override
public MyJobVO process(MyJobVO myJobVO)) throws Exception {
if(!EmailValidator.getInstance().isValid(myJobVO.getEmail())) {
throw new FlatFileParseException("Not an email address", myJobVO.getEmail());
}
return myJobVO;
}
}
PHP Code:
public class MyItemWriter implements ItemWriter<MyJobVO> {
private Long fileId;
@Override
public void write(List<? extends MyJobVO> myJobVOs) {
for(MyJobVO myJobVO: myJobVOs) {
.. write to database...
}
}
...
PHP Code:
public class MyStepExecutionListener implements StepExecutionListener, SkipListener<MyJobVO, MyJobVO> {
private ResourceAwareItemWriterItemStream<String> writer;
private boolean hasSkips = false;
@Override
public void beforeStep(StepExecution stepExecution) {
...
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
// essential to close the rejected file, or nothing will get emailed!
writer.close();
// email only if the status is completed and has skips
if (stepExecution.getExitStatus().equals(ExitStatus.COMPLETED)) {
if(this.hasSkips) {
// then email rejected file
}
}
return stepExecution.getExitStatus();
}
// ------------ START SKIP LISTENER IMPLEMENTATION ----------------------
@Override
public void onSkipInProcess(MyJobVO item, Throwable t) {
logger.debug("onSkipInProcess {} {}",item, t.getMessage());
final List<String> items = new ArrayList<String>(1);
items.add(item.getEmail());
try {
writer.write(items);
this.hasSkips = true;
} catch (Exception e) {
logger.error("Failed to write skip record: {}, {}",t.getMessage(), e.getMessage());
}
}
@Override
public void onSkipInRead(Throwable t) {
logger.error("onSkipInRead {}", t.getMessage());
}
@Override
public void onSkipInWrite(MyJobVO item, Throwable t) {
logger.debug("onSkipInWrite {} {}", item, t.getMessage());
final List<String> items = new ArrayList<String>(1);
items.add(item.getEmail());
try {
writer.write(items);
this.hasSkips = true;
} catch (Exception e) {
logger.error("Failed to write skip record: {}, {}",t.getMessage(), e.getMessage());
}
}
// ------------ END SKIP LISTENER IMPLEMENTATION ----------------------
/**
* @param writer the writer to set
*/
@Required
public void setWriter(ResourceAwareItemWriterItemStream<String> writer) {
this.writer = writer;
}
}