Results 1 to 4 of 4

Thread: Using FlatFileItemReader with task-executor and Step scope

  1. #1

    Question Using FlatFileItemReader with task-executor and Step scope

    Hi

    I have a file which contains millions or records, and I need to process each record, and store in a database. It's working - but it's slow. I can store approx 250 records a minute in the database on my development machine. I was using FlatFileItemReader and a custom writer to an Oracle database.

    I read a post JIRA BATCH-1301 which suggests that FlatFileItemReader can be used with a task-executor, but I was wondering if it is ok to use it with a task-executor and in step scope? I tried it, and it seemed to work ok. I can store approx 1000 records a minute in the database. Only 1 job was running at the time. However, I'm seeing the following warnings in the log file..

    PHP Code:
    2012-02-15 11:24:38,739 WARN  [ChunkMonitor        ][taskExecutor-2      No ItemReader set (must be concurrent step), so ignoring offset data.
    2012-02-15 11:24:38,970 WARN  [ChunkMonitor        ][taskExecutor-7      ItemStream was opened in a different thread.  Restart data could be compromised.
    2012-02-15 11:24:39,110 INFO  [StandardQueryCache  ][taskExecutor-7      starting query cache at regionquery.findByDetachedCriteria.uk.co.and.domain.Customer
    2012
    -02-15 11:24:39,215 WARN  [ChunkMonitor        ][taskExecutor-6      ItemStream was opened in a different thread.  Restart data could be compromised.
    2012-02-15 11:24:39,227 WARN  [ChunkMonitor        ][taskExecutor-3      ItemStream was opened in a different thread.  Restart data could be compromised.
    2012-02-15 11:24:39,274 WARN  [ChunkMonitor        ][taskExecutor-5      ItemStream was opened in a different thread.  Restart data could be compromised.
    2012-02-15 11:24:42,249 WARN  [ChunkMonitor        ][taskExecutor-1      ItemStream was opened in a different thread.  Restart data could be compromised.
    2012-02-15 11:24:42,257 WARN  [ChunkMonitor        ][taskExecutor-9      ItemStream was opened in a different thread.  Restart data could be compromised.
    2012-02-15 11:24:42,257 WARN  [ChunkMonitor        ][taskExecutor-8      ItemStream was opened in a different thread.  Restart data could be compromised.
    2012-02-15 11:24:42,264 WARN  [ChunkMonitor        ][taskExecutor-10     ItemStream was opened in a different thread.  Restart data could be compromised.
    2012-02-15 11:24:45,219 WARN  [ChunkMonitor        ][taskExecutor-4      ItemStream was opened in a different thread.  Restart data could be compromised
    Basically, I set the saveState = false and the job is not restartable. I inject a task executor into the tasklet.

    Would be grateful if someone could advise as to the best approach

    PHP Code:
        <bean id="myItemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
            <
    property name="comments" value="#" />
            <
    property name="saveState" value="false" />
            <
    property name="lineMapper" >
                <
    bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
                    <
    property name="lineTokenizer">
                        <
    bean class="....MyCustomLineTokenizer" />
                    </
    property>
                    <
    property name="fieldSetMapper">
                        <
    bean class="...MyCustomFieldSetMapper" />
                    </
    property>
                </
    bean>
            </
    property>
            <
    property name="resource" value="#{stepExecutionContext['input.file.url']}" />
        </
    bean>

        <
    bean id="myItemProcessor" class="....MyCustomItemProcessor" scope="step" />

        <
    bean id="myItemWriter" class="....MyCustomItemWriter" scope="step">
            <
    property name="fileId" value="#{jobExecutionContext['fileId']}" />
        </
    bean>

        <!-- 
    bad record in input file, or a database errorthen write to error log file for emailing later -->    
        <
    bean id="myErrorItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step">
            <
    property name="resource" value="#{stepExecutionContext['rejected.file.url']}" />
            <
    property name="transactional" value="false" />
            <
    property name="lineAggregator">
                <
    bean class="..CustomLineAggreator" />
            </
    property>
            <
    property name="shouldDeleteIfExists" value="false" />
        </
    bean>

        <
    bean id="myStepListener" class="....CustomStepExecutionListener" scope="step">
            <
    property name="writer" ref="myErrorItemWriter" /> 
        </
    bean>

        <!-- 
    Bring the  job togetherread input list, create db recordsemail lists to business -->
        <
    batch:job id="myJob" restartable="false">
            <
    batch:step id="setupStep" next="uploadDecider">
                <
    batch:tasklet ref="mySetupTasklet" />
            </
    batch:step>
            <
    batch:decision id="uploadDecider" decider="myUploadDecider">
                <
    batch:end on="STOPPED"/>
                <
    batch:next on="*" to="uploadRecords"/>
            </
    batch:decision>
            <
    batch:step id="uploadRecords">
                <
    batch:tasklet task-executor="taskExecutor">
                    <
    batch:chunk reader="myItemReader" processor="myItemProcessor" writer="myItemWriter" commit-interval="10" skip-limit="100" >
                        <
    batch:skippable-exception-classes>
                            <
    batch:include class="java.lang.Exception"/>
                            <
    batch:exclude class="java.io.FileNotFoundException"/>
                        </
    batch:skippable-exception-classes>
                        <
    batch:streams>
                            <!-- 
    essential for skip listenerwrite bad records to error file -->
                            <
    batch:stream ref="myErrorItemWriter"/>
                        </
    batch:streams>
                    </
    batch:chunk>
                    <
    batch:listeners>
                        <!-- 
    setup the step and perform actions after the step completes -->
                        <
    batch:listener ref="myStepListener" />
                    </
    batch:listeners>
                </
    batch:tasklet>
            </
    batch:step>
        </
    batch:job
    PHP Code:
    public class MyItemProcessor implements ItemProcessor<MyJobVOMyJobVO> {

        
    /**
         * Takes in a MyJobVO, checks email and then passes on to the writer.
         */
        
    @Override
        
    public MyJobVO process(MyJobVO myJobVO)) throws Exception {
            if(!
    EmailValidator.getInstance().isValid(myJobVO.getEmail())) {
                throw new 
    FlatFileParseException("Not an email address"myJobVO.getEmail());
            }
            return 
    myJobVO;
        }


    PHP Code:
    public class MyItemWriter implements ItemWriter<MyJobVO> {

        private Long fileId;

        @Override
        public void write(List<? extends MyJobVO> myJobVOs) {
            for(MyJobVO myJobVO: myJobVOs) {
                          .. write to database...
            }
        }

    ...
    PHP Code:
    public class MyStepExecutionListener implements StepExecutionListenerSkipListener<MyJobVOMyJobVO> {

        private 
    ResourceAwareItemWriterItemStream<Stringwriter;

        private 
    boolean hasSkips false;
        
        @
    Override
        
    public void beforeStep(StepExecution stepExecution) {
               
                   ...        
            
        }

        @
    Override
        
    public ExitStatus afterStep(StepExecution stepExecution) {
            
    // essential to close the rejected file, or nothing will get emailed!
            
    writer.close();
            
    // email only if the status is completed and has skips
            
    if (stepExecution.getExitStatus().equals(ExitStatus.COMPLETED)) {
                if(
    this.hasSkips) {
                                
    // then email rejected file
                
    }
            }
            return 
    stepExecution.getExitStatus();
        }


        
    // ------------ START SKIP LISTENER IMPLEMENTATION ----------------------

        
    @Override
        
    public void onSkipInProcess(MyJobVO itemThrowable t) {
            
    logger.debug("onSkipInProcess {} {}",itemt.getMessage());
            
            final List<
    Stringitems = new ArrayList<String>(1);
            
    items.add(item.getEmail());
            try {
                
    writer.write(items);
                
    this.hasSkips true;
            } catch (
    Exception e) {
                
    logger.error("Failed to write skip record: {}, {}",t.getMessage(), e.getMessage());
            }
        }

        @
    Override
        
    public void onSkipInRead(Throwable t) {
            
    logger.error("onSkipInRead {}"t.getMessage());
        }

        @
    Override
        
    public void onSkipInWrite(MyJobVO itemThrowable t) {
            
    logger.debug("onSkipInWrite {} {}"itemt.getMessage());        
            final List<
    Stringitems = new ArrayList<String>(1);
            
    items.add(item.getEmail());
            try {
                
    writer.write(items);
                
    this.hasSkips true;
            } catch (
    Exception e) {
                
    logger.error("Failed to write skip record: {}, {}",t.getMessage(), e.getMessage());
            }
        }

        
    // ------------ END SKIP LISTENER IMPLEMENTATION ----------------------
        
        /**
         * @param writer the writer to set
         */
        
    @Required
        
    public void setWriter(ResourceAwareItemWriterItemStream<Stringwriter) {
            
    this.writer writer;
        }


    Last edited by tony_murphy; Feb 15th, 2012 at 06:04 AM.

  2. #2
    Join Date
    Jan 2012
    Posts
    6

    Default

    I haven't solved such a problem for myself but I read about in the book "Spring Batch in Action". They solved it with a thread safe Itemreader where the read method is synchronized.

  3. #3

    Default

    Thanks ChristianO, I got myself a copy of that book and I'm working my way through it. I haven't got to the performance chapter yet, I'm on chapter 10. But it is a book that I would recommend to others

  4. #4

    Default

    Tony, what does your custom writer look like? If you are are not batching inserts/updates I would look at doing that first. It can make a huge performance difference. Take a look at JDBCBatchItemWriter.

Tags for this Thread

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •