Page 1 of 2 12 LastLast
Results 1 to 10 of 13

Thread: Multi-file input and ouput

  1. #1
    Join Date
    Feb 2009
    Location
    Montreal, Qc, Canada
    Posts
    23

    Question Multi-file input and ouput

    Hello,

    I have a requirement that I don't know how to handle.

    I have to read from multiple input files. For each input file, I have to write to a different output file (with same name but different suffix). During the processing, if a record is identified to be skipped, it has to be written to a different output file too (with same name as the input file but with .rejected suffix)

    Example:

    File pattern to read: /input/file.csv.*
    Files in the input directory: file.csv.001, file.csv.002
    If during processing, at least 1 record has been skipped in each file, the result should be something like:

    In the output directory: file.csv.001.out, file.csv.002.out
    In the reject directory: file.csv.001.rejected, file.csv.002.rejected

    That being said, my first guess was to subclass 'FaultTolerantStepFactoryBean' and 'TaskletStep' in order to be able to iterate thru the itemReader-itemProcessor-itemWriter-skipListener cycle for each input file.

    I'm about to start coding but I keep telling myself that I'm probably not the first guy with that kind of requirement. So that's why I turn myself to the Spring-Batch community for some insight.

    Regards,

    Gino.

  2. #2

    Default

    We exactly had the same requirement of reading and wrting into multiple files. We achieved this by extending SimpleJobLauncher. We provided an extra attribute for this launcher as below.

    <code>
    <bean id="jobLauncher" class="=.....CustomJobLauncher">
    <property name="jobRepository" ref="jobRepository" />
    <property name="resources" value="D:/folder" />
    </bean>

    </code>

    In overridden run method, we loop through each file read by resources attribute and process them accordingly.

  3. #3
    Join Date
    Feb 2008
    Posts
    488

    Default

    Gino,

    You should solve this problem by creating a composite ItemWriter that has two FlatFileItemWriters as properties. The write() method of the composite will determine to which file the record must be written.

    As for processing multiple input files, if they must map directly to output files (instead of treat all the input as one big file) then you should wrap the job launcher to loop through the files and to pass each filename as a parameter to a job being launched.

  4. #4
    Join Date
    Feb 2009
    Location
    Montreal, Qc, Canada
    Posts
    23

    Smile

    Thanks Guys for the hints!

    I've implemented the "composite item writer" solution and it works fine! I only had to add a flag in the "item" class that indicate which file it has to be written to (valid or rejected record).

    Tomorrow, I'll try the "job launcher" solution to manage the one-to-one mapping between multiple input and output files.

    I'll let you know.

    Regards,

    Gino.

  5. #5
    Join Date
    Feb 2009
    Location
    Montreal, Qc, Canada
    Posts
    23

    Question

    It is almost working now...

    The JobLauncher implementation is a great idea and is pretty simple to implement!

    But I still have an issue with passing each file to be processed as JobParameter.

    Here's the code from the JobLauncher's run method:

    Code:
    // some code to manage state vs restartable, etc...
    
    for (Resource resource : resources) {
        JobParameter param = new JobParameter(resource.getFilename());
        jobParameters.getParameters().put("file.name", param);
        taskExecutor.execute(new MyRunnable(job, jobParameters, jobExecution));
        numberOfFilesProcessed++;
    }
    		
    return jobExecution;
    }

    Here's the step's config:

    Code:
    <bean id="myStep" parent="simpleStep">
        <property name="itemReader">
            <bean parent="myFileItemReader" scope="step"/>
        </property>
        <property name="itemWriter">
            <bean parent="myCompositeItemWriter" scope="step"/>
       </property>
    </bean>
    The reader:

    Code:
    <bean id="myFileItemReader" parent="flatFileItemReader">
        <property name="resource" value="file:${data.root}/${batch.name}/input/#{jobParameters[file.name]}" />
    
     ... other bean properties...
    </bean>
    The composite writer:

    Code:
    <bean id="myCompositeItemWriter" class="MyCompositeItemWriter">
        <property name="validDelegateWriter" ref="validItemWriter"/>
        <property name="rejectedDelegateWriter" ref="rejectedItemWriter"/>
    </bean>
    The delegate writers:

    Code:
    <bean id="validRecordItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
        <property name="resource" value="file:${data.root}/${batch.name}/output/#{jobParameters[file.name]}" />
    ... other properties...
    </bean>
    	
    <bean id="rejectedItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
        <property name="resource" value="file:${data.root}/${batch.name}/rejected/#{jobParameters[file.name]}"/>
    </bean>
    Then the step scope:

    Code:
    <bean class="org.springframework.batch.core.scope.StepScope" />
    And now, the console output:

    Job: [SimpleJob: [name=myJob]] launched with the following parameters: [{param=Gino-45}]

    Theres no "file.name" parameter...

    from my log: Opening file [input] for reading.


    And finally, the error:

    org.springframework.batch.item.ItemStreamException : Failed to initialize the reader


    Am I missing something? Obviously, I don't see it... but probably tomorrow's going to be a better day.


    Regards,

    Gino


    PS I'm using 2.0.0 M4

  6. #6
    Join Date
    Feb 2009
    Location
    Montreal, Qc, Canada
    Posts
    23

    Smile

    Well, things are progressing:

    I've successfully passed my "file name" parameter:

    [SimpleJob: [name=myJob]] launched with the following parameters: [{timeInMilliseconds=1235070782983, file.name=file1.csv}]

    By using the JobParameterBuilder:

    Code:
    for (Resource resource : resources) {
        builder.addLong("timeInMilliseconds", System.currentTimeMillis());
        jobParameters = builder.addString("file.name", resource.getFilename()).toJobParameters();
        logger.debug("Processing file: [" + resource.getFilename() + "]");
        taskExecutor.execute(new MyRunnable(job, jobParameters, jobExecution));
        numberOfFilesProcessed++;
    }
    BUT... I still have the same error:

    org.springframework.batch.item.ItemStreamException : Failed to initialize the reader

    And the same message in my log:

    Opening file [input] for reading.

    So it looks like the late binding does not work:

    Code:
    <property name="resource"
    value="file:${data.root}/${batch.name}/input/#{jobParameters[file.name]}" />
    To be continued...

  7. #7
    Join Date
    May 2009
    Posts
    13

    Default Any Updates on this.. am trying similiar where Output needs to go to two files

    Garette - tried the compositeitemwriter as below.. how do we specify that on exception or some condition go to bad file....

    thanks....

    <job id="CSVFileLoadJobSkipErrors" restartable="true">
    <step id="processCSVFileLoadJobSkipErrors" >
    <tasklet>
    <chunk reader="itemReaderForTest" processor="itemProcessor" writer="compositeItemWriter" commit-interval="300" skip-limit="5">
    <streams>
    <stream ref="itemWriter"/>
    <stream ref="badItemWriter"/>
    </streams>
    <skippable-exception-classes>org.springframework.batch.item.file.FlatFi leParseException</skippable-exception-classes>
    </chunk>
    </tasklet>
    </step>
    </job>

    <beans:bean id="compositeItemWriter" class="org.springframework.batch.item.support.Comp ositeItemWriter" >
    <beans:property name="delegates">
    <beans:list>
    <beans:ref bean="itemWriter"/>
    <beans:ref bean="badItemWriter"/>
    </beans:list>
    </beans:property>
    </beans:bean>

    <beans:bean id="badItemWriter" class="org.springframework.batch.item.file.FlatFil eItemWriter">
    <beans:property name="saveState" value="true"/>
    <beans:property name="resource" ref="badOutputResource" />
    <beans:property name="lineAggregator">
    <beans:bean class="org.springframework.batch.item.file.transfo rm.PassThroughLineAggregator" />
    </beans:property>
    </beans:bean>

    <beans:bean id="itemWriter" class="org.springframework.batch.item.file.FlatFil eItemWriter">
    <!--
    <beans:property name="shouldDeleteIfExists" value="true"/>
    -->
    <beans:property name="saveState" value="true"/>
    <beans:property name="resource" ref="outputResource" />
    <beans:property name="lineAggregator">
    <beans:bean class="org.springframework.batch.item.file.transfo rm.DelimitedLineAggregator">
    <beans:property name="delimiter" value="||"/>
    <beans:property name="fieldExtractor">
    <beans:bean class="org.springframework.batch.item.file.transfo rm.BeanWrapperFieldExtractor">
    <beans:property name="names" value="accountId,dataSourceTypeId,originalAccountN umber,accountNumber,primarySSN,updatedBy,updatedDa te"/>
    </beans:bean>
    </beans:property>
    </beans:bean>
    </beans:property>
    </beans:bean>

    <beans:bean id="outputResource" class="org.springframework.core.io.FileSystemResou rce">
    <beans:constructor-arg value="data/output/csv/pipedDelimitedOutput.txt" />
    </beans:bean>

    <beans:bean id="badOutputResource" class="org.springframework.core.io.FileSystemResou rce">
    <beans:constructor-arg value="data/output/csv/pipedDelimitedOutput_bad.txt" />
    </beans:bean>

    Also currently as there is no condition it writes the result twice but to same file instead of two seperate files..

  8. #8
    Join Date
    Feb 2008
    Posts
    488

    Default

    You'll need to write a custom composite ItemWriter. Something like:
    Code:
    public class CompositeItemWriter<T> implements ItemWriter<T> {
    
        private ItemWriter<T> itemWriter; //setter omitted
        private ItemWriter<T> badItemWriter; //setter omitted
    
        public void write(List<? extends  T> items) throws Exception {
            List<? extends  T> goodItems = new ArrayList<? extends T>();
            List<? extends  T> badItems = new ArrayList<? extends T>();
    
            for (T item: items) {
                if (recordIsBad(item)) {
                    goodItems.add(item);
                }
                else {
                    badItems.add(item);
                }
            }
    
            itemWriter.write(goodItems);
            badItemWriter.write(badItems);
        }
    }

  9. #9
    Join Date
    Jun 2009
    Posts
    8

    Default

    Cerrog:

    I am doing something similar. Any chance you share the progress you have made? Specifically looking for the job xml and job launcher and any other relevant config file you may be using.

  10. #10
    Join Date
    Jun 2010
    Posts
    23

    Question

    I am also looking out for the same test case
    Hoping that some one comes with a clear Source code to explain me in details , or else this dumb brain of mine cant understand anything

    Cheers
    Sandeep

Tags for this Thread

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •