Page 1 of 2 12 LastLast
Results 1 to 10 of 12

Thread: Using JobParameters for itemWriter

  1. #1
    Join Date
    Jun 2009
    Posts
    16

    Default Using JobParameters for itemWriter

    Hello everybody,

    Before I'll explain the problem, here is a brief description of the architecture:

    I'm using Spring Integration to poll for files (referred to as trigger-files) which are processed by a class called FileReceiver. The FileReceiver uses a JobLauncher to launch a SimpleJob. What the job does is basically creating one object per record (usually one line of the file) and send it to a channel (referred to as trigger-channel) and finally stored on a JMS topic. The system that accesses the topic needs to aggregate the messages again in order to forward the entire information of one trigger-file in one message to another system.

    Now the problem:
    For the aggregator, the messages that were initially sent to the trigger-channel need to have a correlation ID and sequence size. The sequence size (line count of the trigger-file) and the correlation id (name of the trigger-file) are set as JobParameters in the FileReceiver. Here is the code extract:

    Code:
    JobParametersBuilder paramBuilder = new JobParametersBuilder();
    paramBuilder.addLong("line.count", lineCount);
    paramBuilder.addString("file.name", fileContents.getName());
    JobParameters params = paramBuilder.toJobParameters();
    jobLauncher.run(job, params);
    I wanted to uses the job parameters to set the message headers accordingly in the TriggerWritter (property itemWriter, to be found in the code below)

    Code:
    <bean id="simpleJob" class="org.springframework.batch.core.job.SimpleJob" abstract="true">
    	<property name="jobRepository" ref="jobRepository" />
    	<property name="restartable" value="false" />
    </bean>
    	
    <bean id="fixedLengthImportJob" parent="simpleJob">
    	<property name="steps">
    		<bean id="step1" parent="simpleStep">
    			<property name="commitInterval" value="3" />
    			<property name="listeners">
    				<list>
    					<ref bean="inputFile" />
    					<bean class="...CorrelationAwareTriggerListener" />
    				</list>
    			</property>
    			<property name="streams" ref="fileItemReader" />
    			<property name="itemReader" ref="flatFileItemReader" />
    			<property name="itemWriter">
    				<bean class="...TriggerWriter">
    					<property name="channel" ref="eventChannel" />
    				</bean>
    			</property>
    		</bean>
    	</property>
    	<property name="restartable" value="true" />
    </bean>
    For the inputFile (see code above), I could reference the parameter like this: %file.name%

    Code:
    <bean id="inputFile" class="org.springframework.batch.core.resource.StepExecutionResourceProxy">
    	<property name="filePattern" value="${trigger.file.location}/%file.name%" />
    </bean>
    It seems to me that this mechanism was not intended to be used for the itemWriter (something like <property name="linecount" value="%line.count%" /> does not work). What I am doing instead is using an additional listener (CorrelationAwareTriggerListener, also used in the second code fragment) which is both, an ItemWriteListener and a StepExecutionListener. The most importnat methods are shown below.

    Code:
    	public void beforeWrite(Object trigger) {
    		if (!(trigger instanceof CorrelationAwareTrigger)) {
    			LOG.warn("Received object of type " + trigger.getClass() + " in step " + stepName);
    			return;
    		} else {
    			((CorrelationAwareTrigger) trigger).setCorrelationId(params.getString("correlation.id"));
    			((CorrelationAwareTrigger) trigger).setTriggerCount(params.getLong("line.count").intValue());
    			((CorrelationAwareTrigger) trigger).setIndex(counter++);
    		}
    	}
    	
    	public void beforeStep(StepExecution stepExecution) {
    		stepName = stepExecution.getStepName();
    		params = stepExecution.getJobParameters();
    		counter = 1;
    	}
    In beforeStep, the job parameters etc. are stored and then used again in beforeWrite. This works as long as there is only one file processed at a time. As soon as two files arrive at approx. the same time, the job parameters of the first file will be overwritten.
    Is there a better way to solve this?

  2. #2
    Join Date
    Mar 2006
    Posts
    312

    Default

    Have you tried step scoping the itemWriter & passing the job params with late binding?

    Code:
    <bean class="...TriggerWriter" scope="step">
    	<property name="channel" ref="eventChannel" />
    	<property name="lineCount" value="#{jobParameters[line.count]}"/>
    	<property name="fileName" value="#{jobParameters[file.name]}"/>
    </bean>

  3. #3

    Default

    Ive found the best method to get parameters to your Reader and Writer is by using the @BeforeStep annotation which gives you access to the StepExecution. Then you can get the job parameters with: stepExecution.getJobParameters().

    Late binding doesn't play nice with task executors which I use a lot of.

  4. #4
    Join Date
    Jun 2009
    Posts
    16

    Default

    Hi, thanks for the replies.

    I don't think that they work for me, let me know if I'm wrong. The problem is, that once I set local variables in the beforeStep-method, they are overwritten if the beforeStep-method is called again because there is only one instance of the listener. If the execution of the first step is not finished, the beforeWrite-method sets the wrong values.

    I solved the problem by using a semaphore that it acquired in the beforeStep- and released in the afterStep-method. This works fine so far.

    This is my solution for the listener:

    Code:
    package ...;
    
    import java.util.concurrent.Semaphore;
    
    import org.apache.log4j.Logger;
    import ...CorrelationAwareTrigger;
    import org.springframework.batch.core.ItemWriteListener;
    import org.springframework.batch.core.StepExecution;
    import org.springframework.batch.core.StepExecutionListener;
    import org.springframework.batch.repeat.ExitStatus;
    
    
    public class CorrelationAwareTriggerListener implements ItemWriteListener, StepExecutionListener {
    	
    	private static final Logger LOG = Logger.getLogger(CorrelationAwareTriggerListener.class);
    	
    	private Semaphore semaphore;
    	
    	private long sequenceSize;
    	
    	private String correlationId;
    	
    	private String stepName;
    	
    	private int counter;
    	
    	public CorrelationAwareTriggerListener() {
    		semaphore = new Semaphore(1);
    	}
    
    	public void afterWrite(Object arg0) {
    		// Do nothing
    	}
    	
    	public void beforeWrite(Object trigger) {
    		if (!(trigger instanceof CorrelationAwareTrigger)) {
    			LOG.warn("Received object of type " + trigger.getClass() + " in step " + stepName);
    			return;
    		} else {
    			((CorrelationAwareTrigger) trigger).setCorrelationId(correlationId);
    			((CorrelationAwareTrigger) trigger).setTriggerCount((int) sequenceSize);
    			((CorrelationAwareTrigger) trigger).setIndex(counter++);
    		}
    	}
    	
    	public void onWriteError(Exception arg0, Object arg1) {
    		LOG.error("Write error " + arg1, arg0);
    	}
    	
    	public ExitStatus afterStep(StepExecution arg0) {
    		semaphore.release();
    		return arg0.getExitStatus();
    	}
    	
    	public void beforeStep(StepExecution stepExecution) {
    		try {
    	                semaphore.acquire();
                    } catch (InterruptedException e) {
                    	LOG.error("Error while waiting for import job to end", e);
                    }
    		stepName = stepExecution.getStepName();
    		correlationId = stepExecution.getJobParameters().getString("correlation.id");
    		sequenceSize = stepExecution.getJobParameters().getLong("line.count");
    		counter = 1;
    	}
    	
    	public ExitStatus onErrorInStep(StepExecution arg0, Throwable arg1) {
    		LOG.error("Error in execution of step " + arg0.getStepName());
    		return arg0.getExitStatus();
    	}
    	
    }

  5. #5
    Join Date
    Mar 2006
    Posts
    312

    Default

    Ah, I see -- typically I use the Step Execution context for storing stateful values.

  6. #6
    Join Date
    Jan 2008
    Location
    San Diego
    Posts
    780

    Default

    If I understand your problem...you are sharing stateful objects between jobs?

    That's why I don't do that. The only 'shared' beans that I have defined are the stateless ones, or abstract parent beans.

    Everything else is done using stateful prototype beans using the ClassPathXmlJobRegistry. When a job is started, it gets a fresh context with it's own beans. All the stateful beans are defined in their own bean file and bound to the job name in the registry:

    Code:
    <bean id="jobLocator" class="org.springframework.batch.core.configuration.support.ClassPathXmlJobRegistry">
          <property name="jobPaths">
            <list>
              <value>classpath:spring/batch-jobtype1-prototype-beans.xml</value>
              <value>classpath:spring/batch-jobtype2-prototype-beans.xml</value>
            </list>
          </property>
        </bean>
    There are jobs defined in each file:

    jobtype1
    Code:
    <bean id="jobtype1" parent="simpleJob">
    ...
    jobtype2
    Code:
    <bean id="jobtype2" parent="simpleJob">
    ...
    Then, when I get a job to start it, it has a fresh context with beans instances that aren't shared between other jobs:

    Code:
    Job batchJob = jobLocator.getJob("jobtype1");
    // add parameters and start the job

  7. #7
    Join Date
    Jun 2009
    Posts
    16

    Default

    Oh yeah, the step execution context could be the solution I was looking for. I configured the StepListener likes this:

    Code:
        <bean class="org.springframework.batch.core.scope.StepScope" />
        <bean id="correlationTriggerListener" class="...CorrelationAwareTriggerListener" scope="step">
            <property name="sequenceSize" value="#{stepExecutionContext['line.count']}"/>
        </bean>
    I've read in the spring-batch manual, that the StepScope has to be imported in oreder to use the scope-attribute for the correlationTriggerListener. Anyway, this is the job config:

    Code:
    <bean id="fixedLengthImportJob" parent="simpleJob">
      <property name="steps">
        <bean id="step1" parent="simpleStep">
          <property name="commitInterval" value="3" />
          <property name="listeners">
            <list>
              <ref bean="inputFile" />
              <ref bean="correlationTriggerListener" />
            </list>
          </property>
          <property name="streams" ref="fileItemReader" />
          <property name="itemReader" ref="flatFileItemReader">
          </property>
          <property name="itemWriter">
            <bean class="...TriggerWriter">
              <property name="channel" ref="eventChannel" />
            </bean>
          </property>
        </bean>
      </property>
      <property name="restartable" value="true" />
    </bean>
    And when I run the application I get the following exception:

    Code:
    Exception in thread "main" org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.integration.config.ConsumerEndpointFactoryBean#1': Cannot resolve reference to bean 'org.springframework.integration.handler.ServiceActivatingHandler#0' while setting constructor argument; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.integration.handler.ServiceActivatingHandler#0': Cannot resolve reference to bean 'applicationFileReceiver' while setting constructor argument; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'applicationFileReceiver' defined in class path resource [integration.xml]: Cannot resolve reference to bean 'fixedLengthImportJob' while setting bean property 'job'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'fixedLengthImportJob' defined in class path resource [batch.xml]: Cannot create inner bean 'step1' of type [org.springframework.batch.core.step.item.SimpleStepFactoryBean] while setting bean property 'steps'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'step1' defined in class path resource [batch.xml]: Initialization of bean failed; nested exception is java.lang.IllegalStateException: No Scope registered for scope 'step'
    	at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveReference(BeanDefinitionValueResolver.java:275)
    	at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveValueIfNecessary(BeanDefinitionValueResolver.java:104)
    	at org.springframework.beans.factory.support.ConstructorResolver.resolveConstructorArguments(ConstructorResolver.java:479)
    ...
    
    Caused by: java.lang.IllegalStateException: No Scope registered for scope 'step'
    	at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:295)
    	at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:185)
    	at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:164)
    	at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveReference(BeanDefinitionValueResolver.java:269)
    	at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveValueIfNecessary(BeanDefinitionValueResolver.java:104)
    	at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveManagedList(BeanDefinitionValueResolver.java:287)
    	at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveValueIfNecessary(BeanDefinitionValueResolver.java:126)
    	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyPropertyValues(AbstractAutowireCapableBeanFactory.java:1245)
    	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.populateBean(AbstractAutowireCapableBeanFactory.java:1010)
    	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:472)
    	... 66 more
    I had to remove some parts of the stack trace, because it would have exceeded the maximum length for a thread.

  8. #8
    Join Date
    Jun 2009
    Posts
    16

    Default

    Oh sorry, I didn't see the chudaks post. I'll try this, tomorrow. Just ignore my last post for now.

  9. #9
    Join Date
    Jun 2009
    Posts
    16

    Default

    Ok, using a JobLocator makes sense.
    I still like to get this step execution stuff runnning, but I'll try myself when I have more time.
    Thanks both of you.

  10. #10
    Join Date
    Apr 2010
    Posts
    6

    Default

    From you config '${trigger.file.location}/%file.name%', it seems you are using SpringBatch 1.x instead of 2.0. The scope="step" just can be used in SpringBatch 2.

Tags for this Thread

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •