PDA

View Full Version : Multithreading with Stax input files exception ?



gcollin
Dec 3rd, 2007, 04:31 AM
Hi,

I'm defining a batch solution for our product used in insurance and banking.
I'm currently investigating Spring-batch, which seems a near-perfect solution to our needs.. ;)

Using trunk or 1.0m2, when I try to setup a multithreading processing of an XML input file using StaxEventReaderInputSource and TaskExecutorRepeatTemplate with SimpleAsyncTaskExecutor, I get the following error:


10:09:56,863 DEBUG SimpleAsyncTaskExecutor-1 TransactionTemplate:151 - Initiating transaction rollback on application exception
org.springframework.dao.DataAccessResourceFailureE xception: Error while reading from event reader; nested exception is javax.xml.stream.XMLStreamException: ParseError at [row,col]:[1,1]
Message: Content is not allowed in prolog.
at org.springframework.batch.io.file.support.StaxEven tReaderInputSource.moveCursorToNextFragment(StaxEv entReaderInputSource.java:261)
at org.springframework.batch.io.file.support.StaxEven tReaderInputSource.read(StaxEventReaderInputSource .java:88)
at org.springframework.batch.io.file.support.StaxEven tReaderInputSource$$FastClassByCGLIB$$db438604.inv oke(<generated>)
at net.sf.cglib.proxy.MethodProxy.invoke(MethodProxy. java:149)
at org.springframework.aop.framework.Cglib2AopProxy$C glibMethodInvocation.invokeJoinpoint(Cglib2AopProx y.java:694)
at org.springframework.aop.framework.ReflectiveMethod Invocation.proceed(ReflectiveMethodInvocation.java :149)
at org.springframework.aop.support.DelegatingIntroduc tionInterceptor.doProceed(DelegatingIntroductionIn terceptor.java:131)
at org.springframework.aop.support.DelegatingIntroduc tionInterceptor.invoke(DelegatingIntroductionInter ceptor.java:119)
at org.springframework.aop.framework.ReflectiveMethod Invocation.proceed(ReflectiveMethodInvocation.java :171)
at org.springframework.aop.framework.Cglib2AopProxy$D ynamicAdvisedInterceptor.intercept(Cglib2AopProxy. java:629)
at org.springframework.batch.io.file.support.StaxEven tReaderInputSource$$EnhancerByCGLIB$$94aa0c51.read (<generated>)
at org.springframework.batch.item.provider.InputSourc eItemProvider.next(InputSourceItemProvider.java:48 )
at org.springframework.batch.execution.tasklet.ItemPr oviderProcessTasklet.execute(ItemProviderProcessTa sklet.java:141)
at org.springframework.batch.execution.step.simple.Si mpleStepExecutor.doTaskletProcessing(SimpleStepExe cutor.java:372)
at org.springframework.batch.execution.step.simple.De faultStepExecutor.doTaskletProcessing(DefaultStepE xecutor.java:61)
at org.springframework.batch.execution.step.simple.Si mpleStepExecutor$2.doInIteration(SimpleStepExecuto r.java:347)
at org.springframework.batch.repeat.support.RepeatTem plate.getNextResult(RepeatTemplate.java:324)
at org.springframework.batch.repeat.support.RepeatTem plate.executeInternal(RepeatTemplate.java:201)
at org.springframework.batch.repeat.support.RepeatTem plate.iterate(RepeatTemplate.java:131)
at org.springframework.batch.execution.step.simple.Si mpleStepExecutor.processChunk(SimpleStepExecutor.j ava:334)
at org.springframework.batch.execution.step.simple.Si mpleStepExecutor$1$2.doInTransaction(SimpleStepExe cutor.java:220)
at org.springframework.transaction.support.Transactio nTemplate.execute(TransactionTemplate.java:127)
at org.springframework.batch.execution.step.simple.Si mpleStepExecutor$1.doInIteration(SimpleStepExecuto r.java:208)
at org.springframework.batch.repeat.support.TaskExecu torRepeatTemplate$ExecutingRunnable.run(TaskExecut orRepeatTemplate.java:227)
at java.lang.Thread.run(Thread.java:619)
Caused by: javax.xml.stream.XMLStreamException: ParseError at [row,col]:[1,1]
Message: Content is not allowed in prolog.
at com.sun.org.apache.xerces.internal.impl.XMLStreamR eaderImpl.next(XMLStreamReaderImpl.java:588)
at com.sun.xml.internal.stream.XMLEventReaderImpl.pee k(XMLEventReaderImpl.java:271)
at org.springframework.batch.io.file.support.stax.Def aultTransactionalEventReader.peek(DefaultTransacti onalEventReader.java:86)
at org.springframework.batch.io.file.support.stax.Def aultFragmentEventReader.peek(DefaultFragmentEventR eader.java:152)
at org.springframework.batch.io.file.support.StaxEven tReaderInputSource.moveCursorToNextFragment(StaxEv entReaderInputSource.java:246)
... 24 more



Maybe I did something wrong or does anybody know if it should be working ?

G.C.

Dave Syer
Dec 3rd, 2007, 09:54 AM
You can't use a multithreaded repeat template for reading a file (at least not yet) because the InputSource does transaction synchronization and a transaction is single threaded. I assume it is probably the processing that you want to parallelise anyway, not the reading. This should be easier in m4, but will still require some additional work if you need restartability. N.B. we only plan to support parallel processing with zero effort after 1.0.

gcollin
Dec 3rd, 2007, 11:16 AM
Hi,

Thanks for your reply.

You are right, I would like to parallelise processing. But the only sample with parallel stuff is adhoc.xml and I more or less copied the config of it.

How can I find an example of parallelisation processing ?
Should I wait for m4 for that ?

G. C.

Dave Syer
Dec 3rd, 2007, 12:35 PM
The adhoc job is not parallel processing inside the job - just launching the job asynchronously from the JobLauncher.

Actually, thinking about it, the Stax input source buffers Stax events in its TX synchronization, so you might actually be able to use it with a TaskExecutorRepeatTemplate - but make sure it is in the stepOperations, not the chunkOperations.

You will still lose Restartable behaviour, until we go beyond 1.0.

gcollin
Dec 4th, 2007, 03:24 AM
Looking at my config. I see that I did do the paralleling stuff in stepOperations.
But I think my mistake is that the inputSource was not registered with step scope, so It created two instances of it in different threads.

I'm currently fighting with my Spring config. to change that.
I'll keep you posted.

Thx

Gérard

ramkris
Dec 7th, 2007, 02:27 AM
Hi Gerard,
did you get the parallel processing working? Can you give an example here with xml snippets?

regards,
Ramkumar

gcollin
Dec 7th, 2007, 02:58 AM
In fact I switched to something else.
I almost convinced my clients we should do multiprocessing instead of multi-tasking ,at least for now.

Anyway, I think we can try to wrap any InputSource into a "synchronized input source" so that any thread can access it sequentially.

We will loose the parallel processing on data access, but we'll have multi-tasking on processing.....

Regards,

Gérard

ramkris
Dec 7th, 2007, 03:49 AM
Hi Gerard,
Thanks for your quick reply.
<We will loose the parallel processing on data access, but we'll have multi-tasking on processing....>
Can you tell me how you done this? Even i am interested only on processing not while reading the data.

gcollin
Dec 7th, 2007, 04:01 AM
Here is a snipplet of the spring-batch config I used for parallel processing.
Thing is, it does not work for now because the itemProvider is accessed concurrently by the parallelized itemProcessors, and for now it's not supported.
For me, you need to create your own ItemProvider wrapper, that will synchronize all the access to it.
=> I'm not sure you don't lose other features of spring-batch, like cold or hot-Restart





<bean name="jobLauncher" class="org.springframework.batch.execution.launch.SimpleJ obLauncher">
<property name="jobRepository" ref="jobRepository" />
<property name="jobConfigurationLocator" ref="jobConfigurationRegistry"/>
<property name="jobExecutor" ref="jobExecutor" />
<property name="jobIdentifierFactory" ref="jobIdentifierFactory"/>
</bean>

<bean id="jobConfigurationRegistry" class="org.springframework.batch.execution.configuration. MapJobConfigurationRegistry"/>

<bean class="org.springframework.batch.execution.configuration. JobConfigurationRegistryBeanPostProcessor">
<property name="jobConfigurationRegistry" ref="jobConfigurationRegistry"/>
</bean>


<!--aop:config>
<aop:advisor pointcut="execution(* org.springframework.batch.execution..*Repository+. *(..))"
advice-ref="txAdvice" />
</aop:config>
<tx:advice id="txAdvice" transaction-manager="transactionManager">
<tx:attributes>
<tx:method name="*" />
</tx:attributes>
</tx:advice-->

<bean id="jobExecutor" class="org.springframework.batch.execution.job.DefaultJob Executor">
<property name="jobRepository" ref="jobRepository" />
<property name="stepExecutorFactory">
<bean class="org.springframework.batch.execution.step.Prototype BeanStepExecutorFactory">
<property name="stepExecutorName" value="stepExecutor" />
</bean>
</property>
</bean>

<bean id="stepExecutor" class="org.springframework.batch.execution.step.simple.De faultStepExecutor"
scope="prototype">
<property name="transactionManager" ref="sqlTransactionManager" />
<property name="repository" ref="jobRepository" />
<!-- Support for concurrent execution before the transaction -->
<property name="stepOperations" ref="repeater" />

</bean>

<bean id="sqlTransactionManager" class="org.springframework.jdbc.datasource.DataSourceTran sactionManager" lazy-init="true">
<property name="dataSource" ref="businessDB" />
</bean>

<bean id="repeater" class="org.springframework.batch.repeat.support.TaskExecu torRepeatTemplate">
<property name="taskExecutor" ref="asyncExecutor" />
</bean>

<bean id="asyncExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecu tor">
<property name="concurrencyLimit" value="2"/>
</bean>


<bean id="simpleJob" class="org.springframework.batch.core.configuration.JobCo nfiguration"
abstract="true">
<property name="restartable" value="true" />
</bean>

<bean id="simpleStep" class="org.springframework.batch.execution.step.SimpleSte pConfiguration"
abstract="true">
<property name="allowStartIfComplete" value="true" />
<property name="saveRestartData" value="true" />
<property name="exceptionHandler">
<bean
class="org.springframework.batch.repeat.exception.handler .SimpleLimitExceptionHandler">
<property name="limit" value="5" />
<property name="useParent" value="true"/>
</bean>
</property>
<property name="commitInterval" value="1" />
</bean>

<bean id="jobConfiguration" parent="simpleJob">
<property name="name" value="xmlStaxJob" />
<property name="steps">
<bean id="step1" parent="simpleStep">
<constructor-arg>
<bean
class="org.springframework.batch.execution.tasklet.Restar tableItemProviderTasklet">
<property name="itemProvider">
<bean class="org.springframework.batch.item.provider.InputSourc eItemProvider">
<property name="inputSource">
<ref bean="fileInputSource"/>
</property>
</bean>
</property>

<property name="itemProcessor">
<ref bean="itemProcessor" />
</property>
</bean>
</constructor-arg>
</bean>
</property>
</bean>



Hope this helps,

Gérard