PDA

View Full Version : Issue with chunk processing when configured with task-executor



rspatyal
Jul 12th, 2012, 07:31 AM
Hi,

I have customer table with group column (1,2,3). I have 3 records for each group.

CUSTOMER_ID GROUP_ID
----------- ---------
1 -> 1
2 -> 2
3 -> 3
4 -> 1
5 -> 2
6 -> 3
7 -> 1
8 -> 2

From my main class, I am calling 'jobLauncher.run(job, params)' in loop each time different value for the group.



for (int igroupID = 1; igroupID <= 3; igroupID++)
{
params = new JobParametersBuilder()
.addLong("date", new Date().getTime())
.addLong("groupid", Long.valueOf(igroupID) )
.toJobParameters();

log.info("----------------- groupid=" + igroupID + ", Starting Spring Batch job [JobName=" + JOB_NAME + "]");

jobExecution = jobLauncher.run(job, params);

log.info("================= groupid=" + igroupID + ", Job ended with status: " + jobExecution.getStatus());
}


The job configuration is as below:


<b:job id="simpleJob" job-repository="jobRepository">
<b:step id="processInput">
<b:tasklet>
<b:chunk
reader="step2Reader"
processor="step2Processor"
writer="step2Writer"
commit-interval="2"
task-executor="chunkTaskExecutor"
/>
</b:tasklet>
</b:step>
</b:job>

<bean id="chunkTaskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecu tor" >
<property name="concurrencyLimit" value="5"/>
</bean>


Where step2Reader ( implements ItemReader ) is custom reader class that extends ItemReader


Following are declared as instance variables:
private int currentIndex = 0;
pivate List<Customer> colCustomers;

following is the implementation in read() method:
if(colCustomers==null || colCustomers.size()==0)
{
fetchRecordsFromDb();
}

if (currentIndex < colCustomers.size()) {
Customer cust = colCustomers.get(currentIndex);
currentIndex++;
logger.info("Read[ " + currentIndex + " ] = " + cust);

return cust;
}
else {
return null;
}


It works fine for the 1st thread (group-id=1), but it don't return any records for group-id 2 and 3 even though there are records in database for those groups.

If I change the else clause as:


else {
currentIndex=0;
colCustomers.clear();

return null;
}


It results in invoking read() method 3 times for each group.

Please advise.

visualjeff
Jul 12th, 2012, 06:31 PM
for (int igroupID = 1; igroupID <= 2; igroupID++) <== Did you mean to iterate only twice? Isn't there 3 GROUP_ID's?

Jeff

rspatyal
Jul 12th, 2012, 09:51 PM
for (int igroupID = 1; igroupID <= 2; igroupID++) <== Did you mean to iterate only twice? Isn't there 3 GROUP_ID's?

Jeff

Hey Jeff,
You are correct, it will be 'igroupID <= 3'. I had set it to 2 while debugging to avoid extra iterations(for 3rd group id).
Will update the post.

visualjeff
Jul 16th, 2012, 12:59 PM
I'm going to assume your using a JDBC compliant datasource so why not make use a JdbcCursorItemReader instead of a plain ItemReader?

<bean id="jdbcCustomerItemReader" class="org.springframework.batch.item.database.JdbcCursor ItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="sql" value="select * from customer where city = ?"/>
<property name="rowMapper" ref="customerRowMapper"/>
<property name="preparedStatementSetter" ref="citySetter"/>
</bean>

Just a suggestion.

Jeff

rspatyal
Jul 17th, 2012, 10:00 PM
I'm going to assume your using a JDBC compliant datasource so why not make use a JdbcCursorItemReader instead of a plain ItemReader?

<bean id="jdbcCustomerItemReader" class="org.springframework.batch.item.database.JdbcCursor ItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="sql" value="select * from customer where city = ?"/>
<property name="rowMapper" ref="customerRowMapper"/>
<property name="preparedStatementSetter" ref="citySetter"/>
</bean>

Just a suggestion.

Jeff

Jeff,
I wanted to experiment with Custom Reader.

visualjeff
Jul 18th, 2012, 11:24 AM
Very well. Have fun...

Jeff