Launching multi-threaded job by JMS
Hi All,
I have encountered a problem when launching a multi-threaded job triggered by JMS.
Here is my scenario:
Client pushes message in the queue, MDB listens to the queue and picks up the message, then MDB launch a multi-threaded job.
When Job had been started and the first time Step(the first thread finished the Tasklet) committed the changes in table BATCH_STEP_EXECUTION according to ‘commitIntervel’ property of Step, it threw the following exception:
Code:
SystemErr R org.springframework.batch.core.UnexpectedJobExecutionException: Encountered an error saving batch meta data.
SystemErr R at org.springframework..batch.core.step.item.ItemOrientedStep.execute(ItemOrientedStep.java:415)
SystemErr R at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
SystemErr R at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64)
SystemErr R at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
SystemErr R at java.lang.reflect.Method.invoke(Method.java:615)
SystemErr R at org.springframework..aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:296)
SystemErr R at org.springframework..aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:177)
...
SystemErr R at org.springframework..aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
SystemErr R at $Proxy9.execute(Unknown Source)
SystemErr R at org.springframework..batch.core.job.SimpleJob.execute(SimpleJob.java:121)
SystemErr R at org.springframework..batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:90)
SystemErr R at org.springframework..core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:49)
SystemErr R at org.springframework..batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:85)
SystemErr R at com.test.batch.sample.JobLauncherService.launchJob(JobLauncherService.java:54)
...
SystemErr R at com.ibm.ejs.container.MDBWrapper.onMessage(MDBWrapper.java:132)
SystemErr R at com.ibm.ejs.jms.listener.ServerSession.run(ServerSession.java:481)
SystemErr R at com.ibm.ws.util.ThreadPool$Worker.run(ThreadPool.java:1469)
SystemErr R Caused by: org.springframework.dao.EmptyResultDataAccessException: Incorrect result size: expected 1, actual 0
at org.springframework.dao.support.DataAccessUtils.requiredSingleResult(DataAccessUtils.java:71)
at org.springframework.jdbc.core.JdbcTemplate.queryForObject(JdbcTemplate.java:668)
at org.springframework.jdbc.core.JdbcTemplate.queryForObject(JdbcTemplate.java:678)
at org.springframework.jdbc.core.JdbcTemplate.queryForInt(JdbcTemplate.java:705)
at org.springframework.batch.core.repository.dao.JdbcStepExecutionDao.updateStepExecution(JdbcStepExecutionDao.java:327)
at org.springframework.batch.core.repository.support.SimpleJobRepository.saveOrUpdate(SimpleJobRepository.java:238)
at org.springframework.batch.core.repository.support.SimpleJobRepository.saveOrUpdateExecutionContext(SimpleJobRepository.java:247)
at org.springframework.batch.core.step.item.ItemOrientedStep$1.doInIteration(ItemOrientedStep.java:315)
at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:215)
at org.springframework.scheduling.commonj.DelegatingWork.run(DelegatingWork.java:61)
at com.ibm.ws.asynchbeans.J2EEContext.run(J2EEContext.java:1114)
at com.ibm.ws.asynchbeans.WorkWithExecutionContextImpl.go(WorkWithExecutionContextImpl.java:195)
at com.ibm.ws.asynchbeans.CJWorkItemImpl.run(CJWorkItemImpl.java:187)
at com.ibm.ws.util.ThreadPool$Worker.run(ThreadPool.java:1469)
SystemErr R at org.springframework..dao.support.DataAccessUtils.requiredSingleResult(DataAccessUtils.java:71)
SystemErr R at org.springframework..jdbc.core.JdbcTemplate.queryForObject(JdbcTemplate.java:668)
SystemErr R at org.springframework..jdbc.core.JdbcTemplate.queryForObject(JdbcTemplate.java:678)
SystemErr R at org.springframework..jdbc.core.JdbcTemplate.queryForInt(JdbcTemplate.java:705)
SystemErr R at org.springframework..batch.core.repository.dao.JdbcStepExecutionDao.updateStepExecution(JdbcStepExecutionDao.java:327)
SystemErr R at org.springframework..batch.core.repository.support.SimpleJobRepository.saveOrUpdate(SimpleJobRepository.java:238)
SystemErr R at org.springframework..batch.core.repository.support.SimpleJobRepository.saveOrUpdateExecutionContext(SimpleJobRepository.java:247)
SystemErr R at org.springframework..batch.core.step.item.ItemOrientedStep$1.doInIteration(ItemOrientedStep.java:315)
SystemErr R at org.springframework..batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:215)
SystemErr R at org.springframework..scheduling.commonj.DelegatingWork.run(DelegatingWork.java:61)
SystemErr R at com.ibm.ws.asynchbeans.J2EEContext.run(J2EEContext.java:1114)
SystemErr R at com.ibm.ws.asynchbeans.WorkWithExecutionContextImpl.go(WorkWithExecutionContextImpl.java:195)
SystemErr R at com.ibm.ws.asynchbeans.CJWorkItemImpl.run(CJWorkItemImpl.java:187)
SystemErr R ... 1 more
Then I debugged and went into the snippet code:
org.springframework.batch.core.repository.support. SimpleJobRepository
Code:
public void saveOrUpdate(StepExecution stepExecution) {
...
if (stepExecution.getId() == null) {
stepExecutionDao.saveStepExecution(stepExecution);
}
else {
// existing execution, update
stepExecutionDao.updateStepExecution(stepExecution);
}
}
It is correct that stepExecution.getId() is not null after the first time executing stepExecutionDao.saveStepExecution(stepExecution); , but its corresponding record is not found in table BATCH_STEP_EXECUTION, so that count is 0 below and it fails when executing the follow snnipet:
org.springframework.batch.core.repository.dao.Jdbc StepExecutionDao
Code:
public void updateStepExecution(StepExecution stepExecution) {
...
int count = getJdbcTemplate().update(
getQuery(UPDATE_STEP_EXECUTION),
parameters,
new int[] { Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.INTEGER, Types.INTEGER,
Types.CHAR, Types.VARCHAR, Types.VARCHAR, Types.INTEGER, Types.INTEGER, Types.INTEGER });
// Avoid concurrent modifications...
if (count == 0) {
int curentVersion = getJdbcTemplate().queryForInt(
getQuery(CURRENT_VERSION_STEP_EXECUTION),
new Object[] { stepExecution.getId() });
throw new OptimisticLockingFailureException("Attempt to update step execution id="
+ stepExecution.getId() + " with wrong version (" + stepExecution.getVersion() + "), where current version is "+curentVersion);
}
stepExecution.incrementVersion();
...
}
It appears that there is transaction matter, which leads not commit on DB level.
Here’s my environment:
Spring Batch 1.0.0.rc1
WebSphere Application Server 6.1.0.11
Oracle 10g
MQ 6.0
DB and MDB’s XA resources ablility are enabled, also configured on WAS 6.1
I used ‘JtaTransactionManager’ as my transaction manager, and ‘WorkManagerTaskExecutor’ as TaskManager for thread pools management.
Could anyone give me a solution? Thank you!
BTW, the 2 scenarios I have tried below are OK:
JMS trigger single-threaded job. (JMS resource + JDBC resource)
Multi-threaded job run on web container. (JDBC resource)