Hi all. I'm in desparate need of some help. (This is a long one, so prepare yourself...)
We have a flow that we're trying to implement. All pieces work correctly, but we're looking for areas where we can speed it up. We call a series of web services and would like to make these steps multi-threaded.
Here's the step def:
Here are the bean defs:Code:<batch:step id="listenersParentStep" abstract="true"> <!-- these listeners will be used for each child step --> <batch:tasklet> <batch:listeners> <batch:listener ref="detailLogStepExecutionListener"/> <batch:listener ref="detailLogSkipListener"/> </batch:listeners> </batch:tasklet> </batch:step> <batch:step id="validation" parent="listenersParentStep"> <batch:tasklet task-executor="taskExecutor"> <batch:chunk commit-interval="50" skip-limit="${skip.limit}" reader="threadSafeValidationReader" processor="cdfItemValidateProcessor" writer="threadSafeValidationWriter"> <batch:skippable-exception-classes> <batch:include class="org.oclcwcp.batch.commons.exception.ValidationServiceException"/> </batch:skippable-exception-classes> </batch:chunk> </batch:tasklet> <batch:next on="*" to="tlInstructions"/> <batch:next on="FAILED" to="tlOrderClose"/> </batch:step>
The reader and writer are basically the same:Code:<bean id="threadSafeValidationReader" class="org.oclcwcp.batch.commons.reader.ThreadSafeItemStreamReader" scope="step"> <property name="delegate" ref="cdfXmlItemReader"/> </bean> <bean id="threadSafeValidationWriter" class="org.oclcwcp.batch.commons.writer.ThreadSafeItemStreamWriter" scope="step"> <property name="delegate" ref="cdfItemValidateWriter"/> </bean> <bean id="cdfItemValidateWriter" scope="step" parent="txtWriterParent"> <property name="resource" value="file:#{jobParameters['toolf.out.working.directory']}#{jobParameters['toolf.out.input.file.name.without.path']}.WCPVALIDATE"/> <property name="saveState" value="false"/> </bean> <bean id="cdfUnmarshaller" class="org.oclcwcp.batch.commons.xml.CDFUnmarshaller"/> <bean id="cdfItemReaderParent" class="org.springframework.batch.item.xml.StaxEventItemReader" scope="prototype"> <property name="fragmentRootElementName" value="CDFRec"/> <property name="unmarshaller" ref="cdfUnmarshaller"/> </bean> <bean id="cdfXmlItemReader" parent="cdfItemReaderParent" scope="step"> <property name="resource" value="file:#{jobParameters['toolf.input.file.name']}"/> <property name="saveState" value="false"/> </bean> <bean id="cdfItemValidateProcessor" class="org.oclcwcp.batch.commons.processor.CDFItemProcessor" scope="step"> <property name="wcpValidationServiceClient" ref="validationServiceCommonClient"/> <property name="orderId" value="#{jobParameters['toolf.orderId']}"/> <property name="vendorInstitutionSymbol" value="#{jobParameters['toolf.institutionSymbol']}"/> </bean> <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="10"/> <property name="maxPoolSize" value="20"/> </bean>
This is a snippet of the exception:Code:class ThreadSafeItemStreamReader<T> implements ItemStreamReader<T>, InitializingBean, StepExecutionListener { ItemReader<T> delegate T read() throws Exception { synchronized (this){ return delegate.read() } } void open(ExecutionContext executionContext) { if(delegate instanceof ItemStream){ ((ItemStream)delegate).open(executionContext) } } void update(ExecutionContext executionContext) { if(delegate instanceof ItemStream){ ((ItemStream)delegate).update(executionContext) } } void close() { if(delegate instanceof ItemStream){ ((ItemStream)delegate).close() } } void afterPropertiesSet() throws Exception { Assert.notNull(delegate, "delegate must be set") } void beforeStep(StepExecution stepExecution) { if(delegate instanceof StepExecutionListener){ ((StepExecutionListener)delegate).beforeStep(stepExecution) } } ExitStatus afterStep(StepExecution stepExecution) { if(delegate instanceof StepExecutionListener){ return ((StepExecutionListener)delegate).afterStep(stepExecution) } return null } }
We are using the JobRepositoryFactoryBean and not the MapJobRepositoryFactoryBean as others who have had a similar issue were using.Code:rg.springframework.dao.OptimisticLockingFailureException: Attempt to update step execution id=10844 with wrong version (4), where current version is 5 at org.springframework.batch.core.repository.dao.JdbcStepExecutionDao.updateStepExecution(JdbcStepExecutionDao.java:185) at org.springframework.batch.core.repository.support.SimpleJobRepository.update(SimpleJobRepository.java:171) at sun.reflect.GeneratedMethodAccessor122.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:309) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150) at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:110) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202) at $Proxy34.update(Unknown Source) at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:432) at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:130) at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:264) at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:76) at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:258) ... Caused by: java.util.ConcurrentModificationException at java.util.AbstractList$Itr.checkForComodification(AbstractList.java:372) at java.util.AbstractList$Itr.next(AbstractList.java:343) at com.thoughtworks.xstream.converters.collections.CollectionConverter.marshal(CollectionConverter.java:54) .... at com.thoughtworks.xstream.XStream.toXML(XStream.java:805) at org.springframework.batch.core.repository.dao.XStreamExecutionContextStringSerializer.serialize(XStreamExecutionContextStringSerializer.java:43) at org.springframework.batch.core.repository.dao.JdbcExecutionContextDao.serializeContext(JdbcExecutionContextDao.java:212) at org.springframework.batch.core.repository.dao.JdbcExecutionContextDao.updateExecutionContext(JdbcExecutionContextDao.java:134) at org.springframework.batch.core.repository.support.SimpleJobRepository.updateExecutionContext(SimpleJobRepository.java:184) at sun.reflect.GeneratedMethodAccessor121.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:309) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150) at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:110) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202) at $Proxy34.updateExecutionContext(Unknown Source) at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:429)
Ideas? Anyone?


Reply With Quote