Hello everybody,
Before I'll explain the problem, here is a brief description of the architecture:
I'm using Spring Integration to poll for files (referred to as trigger-files) which are processed by a class called FileReceiver. The FileReceiver uses a JobLauncher to launch a SimpleJob. What the job does is basically creating one object per record (usually one line of the file) and send it to a channel (referred to as trigger-channel) and finally stored on a JMS topic. The system that accesses the topic needs to aggregate the messages again in order to forward the entire information of one trigger-file in one message to another system.
Now the problem:
For the aggregator, the messages that were initially sent to the trigger-channel need to have a correlation ID and sequence size. The sequence size (line count of the trigger-file) and the correlation id (name of the trigger-file) are set as JobParameters in the FileReceiver. Here is the code extract:
I wanted to uses the job parameters to set the message headers accordingly in the TriggerWritter (property itemWriter, to be found in the code below)Code:JobParametersBuilder paramBuilder = new JobParametersBuilder(); paramBuilder.addLong("line.count", lineCount); paramBuilder.addString("file.name", fileContents.getName()); JobParameters params = paramBuilder.toJobParameters(); jobLauncher.run(job, params);
For the inputFile (see code above), I could reference the parameter like this: %file.name%Code:<bean id="simpleJob" class="org.springframework.batch.core.job.SimpleJob" abstract="true"> <property name="jobRepository" ref="jobRepository" /> <property name="restartable" value="false" /> </bean> <bean id="fixedLengthImportJob" parent="simpleJob"> <property name="steps"> <bean id="step1" parent="simpleStep"> <property name="commitInterval" value="3" /> <property name="listeners"> <list> <ref bean="inputFile" /> <bean class="...CorrelationAwareTriggerListener" /> </list> </property> <property name="streams" ref="fileItemReader" /> <property name="itemReader" ref="flatFileItemReader" /> <property name="itemWriter"> <bean class="...TriggerWriter"> <property name="channel" ref="eventChannel" /> </bean> </property> </bean> </property> <property name="restartable" value="true" /> </bean>
It seems to me that this mechanism was not intended to be used for the itemWriter (something like <property name="linecount" value="%line.count%" /> does not work). What I am doing instead is using an additional listener (CorrelationAwareTriggerListener, also used in the second code fragment) which is both, an ItemWriteListener and a StepExecutionListener. The most importnat methods are shown below.Code:<bean id="inputFile" class="org.springframework.batch.core.resource.StepExecutionResourceProxy"> <property name="filePattern" value="${trigger.file.location}/%file.name%" /> </bean>
In beforeStep, the job parameters etc. are stored and then used again in beforeWrite. This works as long as there is only one file processed at a time. As soon as two files arrive at approx. the same time, the job parameters of the first file will be overwritten.Code:public void beforeWrite(Object trigger) { if (!(trigger instanceof CorrelationAwareTrigger)) { LOG.warn("Received object of type " + trigger.getClass() + " in step " + stepName); return; } else { ((CorrelationAwareTrigger) trigger).setCorrelationId(params.getString("correlation.id")); ((CorrelationAwareTrigger) trigger).setTriggerCount(params.getLong("line.count").intValue()); ((CorrelationAwareTrigger) trigger).setIndex(counter++); } } public void beforeStep(StepExecution stepExecution) { stepName = stepExecution.getStepName(); params = stepExecution.getJobParameters(); counter = 1; }
Is there a better way to solve this?


Reply With Quote
