Page 1 of 3 123 LastLast
Results 1 to 10 of 21

Thread: Restart/Continue where left off

  1. #1
    Join Date
    Aug 2006
    Posts
    25

    Default Restart/Continue where left off

    Hi,

    Really new to Spring Batch so it's gonna be easy I guess.
    I can't figure out how "Restart/Resume failed job" works exactly.
    Here is my sample app:
    - 1 simple job with 1 step.
    - Step has an ItemReader and an ItemWriter.
    - I just read ints from a list of 100 items in ItemReader
    - ItemWriter just outputs the number it gets as param in "write()".
    - I also put a counter in the ItemWriter so it throws an Exception when the number of items it "write()"s reaches 10.

    Now, I run the job with the CommandLineJobRunner.
    I run it once. It stops (throws an Exception) as expected and displayed items from 1 to 10 (so far so good).
    I run it again (same job params). It displays numbers from 1 to 10 again.
    Why???
    I run it again without changing anything and now it displays numbers from 11 to 20.
    Why???

    I keep doing that and it looks like the "second" attempt always seem to "try to restart" the previous failed job execution (not sure I'm using the right term here...), and the following run seems to "continue" where the previous job execution left off.
    What I'd like to do is to always resume a failed job in my case...and avoid trying to restart from 0 the same job.

    I read the User Guide a couple times but I'm still lost.

    Can anyone shine a light on this?

    NB: I'm using Spring Batch 1.1.3-RELEASE-A.

  2. #2

    Default

    Do you use framework provided readers and writers? How are they configured? How have you configured your JobRepository?

  3. #3
    Join Date
    Jan 2008
    Location
    San Diego
    Posts
    780

    Default

    What is your chunk size? Note that on a restart, it will begin at the end of the last completed chunk. If your chunk size is 50 and you throw an exception after 10 items, it will keep starting at 1. If your chunk size is 1, and you throw an exception on the 10th, it should start on the 10th when you restart.

  4. #4
    Join Date
    Aug 2006
    Posts
    25

    Default

    @magott:
    My ItemReader extends AbstractBufferedItemReaderItemStream.
    No particular reason except that I'm interested in the StaxEventItemReader one for my project (need to be able to resume a failed job when readin an XML). The AbstractBufferedItemReaderItemStream seemed to implement the ItemStream and handle the update of the execution context the way I want to (I think...).
    My ItemWriter just implements the interface ItemWriter.

    Code:
    package test.springbatch;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import org.springframework.batch.item.support.AbstractBufferedItemReaderItemStream;
    import org.springframework.util.ClassUtils;
    
    import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
    
    public class TestItemReader extends AbstractBufferedItemReaderItemStream {
    	
    	private static final List<Integer> NUMBERS = new ArrayList<Integer>();
    	private AtomicInteger mIncrementer = new AtomicInteger(0);	
    	static {
    		for (int i = 0; i < 100; i++) {
    			NUMBERS.add(i);
    		}		
    	}
    	
    	public TestItemReader() {
    		setName(ClassUtils.getShortName(TestItemReader.class));		
    	}
    	
    	@Override
    	protected void doClose() throws Exception {}
    
    	@Override
    	protected void doOpen() throws Exception {}
    	
    	@Override
    	protected Object doRead() throws Exception {
    		if (mIncrementer.get() > NUMBERS.size()) return null;
    		return NUMBERS.get(mIncrementer.getAndIncrement());
    	}
    }
    Code:
    package test.springbatch;
    
    import org.springframework.batch.item.ClearFailedException;
    import org.springframework.batch.item.FlushFailedException;
    import org.springframework.batch.item.ItemWriter;
    
    import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
    
    public class TestItemWriter implements ItemWriter {
    	
    	private AtomicInteger mCounter = new AtomicInteger(1);
    
    	public void clear() throws ClearFailedException {}
    
    	public void flush() throws FlushFailedException {}
    
    	public void write(Object pObj) throws Exception {
    		System.out.println("Writing " + pObj);
    		if (mCounter.getAndIncrement() >= 10) throw new Exception("It's alright.");
    	}
    
    }
    @chudak: I didn't specify anything regarding chunk size. The weird thing is that the "read.count" variable is stored properly in the database (i.e. its value is indeed the number of items read so far).


    I have to correct a little bit what I said before. It's less predictable actually.
    The first run is always right (obviously).
    The second run sometimes resumes the previous job, sometimes not.
    Successive runs are also "random" (starts at 27, restart from 0, or starts again at 9).
    I really think it's just me not understanding some concept.

    Am I not supposed to run a failed job with the same job parameters?
    What do I have to do to resume a failed job?

  5. #5
    Join Date
    Jan 2008
    Location
    San Diego
    Posts
    780

    Default

    Quote Originally Posted by lpezet View Post
    Am I not supposed to run a failed job with the same job parameters?
    What do I have to do to resume a failed job?
    If you start a job with the SAME parameters as the failed job, it will run the failed job.

  6. #6
    Join Date
    Aug 2006
    Posts
    25

    Default

    Would that resume the job or not? Meaning, would it start from where it left off? (if configured to do so)

  7. #7
    Join Date
    Jan 2008
    Location
    San Diego
    Posts
    780

    Default

    Quote Originally Posted by lpezet View Post
    Would that resume the job or not? Meaning, would it start from where it left off? (if configured to do so)
    Yes, that's how I'm doing it.

    Code:
            <!-- Prototype job bean -->
            <bean id="simpleJob" class="org.springframework.batch.core.job.SimpleJob" abstract="true">
    		<property name="jobRepository" ref="jobRepository" />
    		<property name="restartable" value="true" />
    		<property name="jobExecutionListeners" ref="provisioningBatchErrorListener"/>
    	</bean>
    
            <!-- Prototype step bean -->
    	<bean id="simpleStep" class="org.springframework.batch.core.step.item.SkipLimitStepFactoryBean" abstract="true">
    		<property name="transactionManager" ref="transactionManager" />
    		<property name="jobRepository" ref="jobRepository" />
    		<property name="commitInterval" value="50" />
    	</bean>

  8. #8
    Join Date
    Aug 2006
    Posts
    25

    Default

    Thanks for sharing that chudak.

    I have a similar config:
    Code:
    <bean id="TestSimpleJob" class="org.springframework.batch.core.job.SimpleJob">
    		<property name="jobRepository" ref="test.springbatch.JobRepository" />
    		<property name="restartable" value="true"/>
    		<property name="steps">
    			<list>
    				<bean class="org.springframework.batch.core.step.item.SimpleStepFactoryBean">
    					<property name="jobRepository" ref="test.springbatch.JobRepository" />
    					<property name="transactionManager" ref="test.springbatch.TxManager" />
    					<property name="itemReader"><bean class="test.springbatch.TestItemReader"/></property>					
    					<property name="itemWriter"><bean class="test.springbatch.TestItemWriter"/></property>
    				</bean>				
    			</list>
    		</property>
    	</bean>
    I changed my ItemReader and avoided the AbstractBufferedItemReaderItemStream class.
    I now implement ItemStream and ItemReader.
    Code:
    package test.springbatch;
    import (...);
    public class TestItemReader implements ItemStream, ItemReader {
    	
    	private static final List<Integer> NUMBERS = new ArrayList<Integer>();
    	private AtomicInteger mIncrementer = new AtomicInteger(0);
    	private int mCurrentItemCount;	
    	static {
    		for (int i = 0; i < 100; i++) {
    			NUMBERS.add(i);
    		}		
    	}
    	
    	public void close(ExecutionContext pContext) throws ItemStreamException {}
    	
    	public void mark() throws MarkFailedException {}
    	
    	public void open(ExecutionContext pContext) throws ItemStreamException {
    		if (pContext.containsKey("read.count")) {
    			int oItemCount = Long.valueOf(pContext.getLong("read.count")).intValue();
    			try {
    				jumpToItem(oItemCount);
    			}
    			catch (Exception e) {
    				throw new ItemStreamException("Could not move to stored position on restart", e);
    			}
    
    			mCurrentItemCount = oItemCount;
    		} else {
    			System.out.println("Did not find read.count key in execution context.");
    		}
    	}
    	
    	private void jumpToItem(int pItemCount) {
    		while (mIncrementer.getAndIncrement() < pItemCount) {}
    	}
    
    	public Object read() throws Exception, UnexpectedInputException, NoWorkFoundException, ParseException {
    		if (mIncrementer.get() >= NUMBERS.size()) return null;
    		mCurrentItemCount++;		
    		return NUMBERS.get(mIncrementer.getAndIncrement());
    	}
    	
    	public void reset() throws ResetFailedException {}
    	
    	public void update(ExecutionContext pContext) throws ItemStreamException {
    		pContext.putLong("read.count", mCurrentItemCount);
    	}
    	
    }
    I'm still having some weird/random behavior.
    It would resume the failed job up to a certain point sometimes (all fine up to the 4th run) and then restart from "scratch" (from 0).
    It's really frustrating...

    chudak you must have some magic touch

  9. #9
    Join Date
    Dec 2006
    Posts
    1,061

    Default

    I'm not sure why the 4th run wouldn't work, but every subsequent one would. Seems very fishy, but from looking at your reader, I can't see anything off hand that's wrong.

    @deckingraj
    I don't understand the question really, did you try and run the failed job again, but it wouldn't let you? If so, it's probably because restart saves the state of the reader at the last commit point. So if you say "throw an exception when i is 5", and it starts again at 4, you will always keep failing.

  10. #10
    Join Date
    Aug 2006
    Posts
    25

    Default

    Sorry lucasward, maybe I'm not explaining it clearly.
    The "4th run then random" was really just a one time experience.
    It's always pretty random.Sometimes it would work for couple runs in a row, then gets weird, some other times it would keep restarting from 0 for couple runs then would start at 9, etc. I didn't notice any pattern.

    Every time I check the db, especially job_execution and execution_context tables see if there are any difference.
    They are all "similar": the "read.count" is always stored properly, all jobs are said to have FAILED, etc, all using the same job instance, etc.

    I'm still clueless...

Tags for this Thread

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •