Page 1 of 2 12 LastLast
Results 1 to 10 of 12

Thread: Manual chunk commit

  1. #1
    Join Date
    Apr 2010
    Location
    Brazil
    Posts
    12

    Default Manual chunk commit

    Hello,

    I'm using spring batch for data extraction tasks, in the most of cases, from XML to DB. I have one situation that i need to control the commit interval manually.

    Example: The application reads and process some registers provided from XML in the commit interval (chunk). The next register (in the same chunk yet) depends on the information readed previously. When this situation occurs, I need to write/commit the chunk before read the next one, which depends on the information commited on DB. Altought, if the information comes in different chunks, no problem, it's OK!

    I can intercept the information through StepListener and identify if depends or not of some previous register, but I don't know how I should commit the chunk manually.

    May you help me???

    Thanks from Brazil

  2. #2
    Join Date
    Jun 2005
    Posts
    4,231

    Default

    The <chunk/> element in the step configuration allows you to inject a chunk-completion-policy that can be used to control the commit. I didn't understand your use case 100%, but I'd be surprised if you can't do something with a completion policy and a listener of some type (or something that implements both). Another common use case is forcing a commit after a timeout or time window.

  3. #3
    Join Date
    Apr 2010
    Posts
    3

    Default

    I would also need to control the commit-interval dependent on data in a peeked item.

    Is there any concept of a PeekableItemListener?
    How would it be possible to access the reader that is configured on the chunk element from a CompletionPolicy?
    Last edited by joustava; Sep 6th, 2010 at 01:03 PM.

  4. #4
    Join Date
    Apr 2010
    Location
    Brazil
    Posts
    12

    Default

    Hi, thanks for reply.

    I will try to explain my doubt. I need to control the chunks commits depending on the input, before process and after read. I will study the Completion Policy to know if it helps me or not. Do you have some use example of Completion Policy and manual commit interval control?

    Thanks again!

    Fabiano

  5. #5
    Join Date
    Jun 2005
    Posts
    4,231

    Default

    There is a PeekableItemReader (and one implementation) in the framework. You even guessed the name right, so we must be doing something right.

    There are no framework samples (except unit tests in core) for chunk completion policy, but you can find a sample here from a user: http://github.com/magott/magott-springbatch-poc.

    In your case I would say you need to write a reader that pulls together a PeekableItemReader and a CompletionPolicy (maybe implements both interfaces, maybe just injects them and changes their state). Because of the state, there may be restrictions on multi-threaded use, unless you take special steps.

  6. #6
    Join Date
    Apr 2010
    Posts
    3

    Default

    Thanks for the hints, they made me implement something like this:

    Code:
    /**
     * Custom reader that needs to be configured BOTH as reader AND as
     * chunk-completion-policy to be able to update its internal state.
     */
    public class RecordReader extends SingleItemPeekableItemReader<Scheduling> implements CompletionPolicy {
    
    	private Record current;
    
    	@Override
    	public boolean isComplete(RepeatContext context) {
    		return ((ReaderRepeatContext) context).isComplete();
    	}
    
    	@Override
    	public boolean isComplete(RepeatContext context, RepeatStatus result) {
    		return ((ReaderRepeatContext) context).isComplete();
    	}
    
    	@Override
    	public RepeatContext start(RepeatContext parent) {
    		/*
    		 * Set first item of the chunk for later comparison.
    		 */
    		this.current = invokePeek();
    		return new ReaderRepeatContext(parent);
    	}
    
    	@Override
    	public void update(RepeatContext context) {
    		/*
    		 * Check if the step should finish.
    		 * In this case when there are no more records to process.
    		 */
    		if (current == null) {
    			context.setCompleteOnly();
    		}
    	}
    
    	private Scheduling invokePeek() {
    		Record peeked = null;
    		try {
    			peeked = peek();
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    		return peeked;
    	}
           
            //custom RepeatContext
    	protected class ReaderRepeatContext extends RepeatContextSupport {
    
    		public ReaderRepeatContext(RepeatContext parent) {
    			super(parent);
    		}
    
    		public boolean isComplete() {
    			Record next = null;
    			next = invokePeek();
    
    			if (next == null || next.getDate().after(current.getDate())) {
    				current = next;
    				return true;
    			}
    			return false;
    		}
    
    	}
    
    }
    And the step is configured something like this:

    Code:
    <step id="chunkStep">
    	<tasklet transaction-manager="jobRepository-transactionManager">
    	<chunk reader="peekableReader" 
                        processor="scheduleCopyProcessor"
    		    writer="scheduleCopyWriter" 
                        chunk-completion-policy="peekableReader"/>		
    	</tasklet>
    </step>
    
    <bean id="peekableReader" scope="singleton" class="RecordReader" >
    	<property name="delegate" ref="schedulingReader" />
    </bean>
    This seems to work for my use-case, in which records in a file have a date field and I want to commit records with the same date in one chunk, so a commit interval per new encountered date.

    Thanks for the hint, and always open for suggestions.
    Last edited by joustava; Sep 9th, 2010 at 08:38 AM.

  7. #7
    Join Date
    Apr 2010
    Location
    Brazil
    Posts
    12

    Default

    Guys, the proposed solution works fine! I really liked that!

    In my case, I had to implement the fixed commit interval too, but it worked well!

    Thanks!!!

  8. #8
    Join Date
    Jan 2011
    Posts
    6

    Default

    I have a similar issue to this one, but I'm unsure if this meets my requirements. Does a chunk completion policy end the chunk? All I want to do is commit what's in the write buffer manually, then continue processing the chunk. How would I go about that?

  9. #9
    Join Date
    Jun 2005
    Posts
    4,231

    Default

    A completion policy signals the end of the chunk (i.e. no more items will be taken from the reader in this transaction). Can you describe your use case in a bit more detail because I'm surprised to hear that you want to manipulate the transaction from inside your business logic?

  10. #10
    Join Date
    Jan 2011
    Posts
    6

    Default

    We are parsing a log file from a piece of hardware. It has a line that starts a group within the database, then we load the corresponding data into related tables as we go. Sometimes, we'll get a power interrupt, which basically means that a new group is starting immediately on the line after, and we have to mark all previous groups that didn't complete successfully as invalid. The problem that we're running in to is that sometimes these invalid groups only exist in the chunk write buffer, so we can't update them. We want to flush the write buffer, read any groups that have a certain status from the database and update them to a new status, and then continue on from the next line.

    How would you implement something like that?

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •