There is no listener called after all unsucessfull retries with the associated items (that cause the batch failure) ?
Printable View
There is no listener called after all unsucessfull retries with the associated items (that cause the batch failure) ?
The skip policy is called but doesn't get a reference to the item. Your best workaround for this is to put the item in some shared state in a Item*Listener (e.g. ThreadLocal if your step is multi-threaded, or just an dinstance variable in step scope otherwise) and then pick it up in the skip policy.
thank you :)
finally we have followed your suggestion:
reader and writer keeps track of items. If an exception cannot be skipped, the skip policy (running under the same thread as the reader and writer) is able to know which items are currently processing and writing. These items must be set as "ERROR"Code:/**
* {@link StagingItemsTracking} keeps track of items read and written by every batch process. Each
* of them are executed in one thread. Each registration erases previous registrations made by the current thread.
* <p />
* This tracker is used by the staging item reader and writer in order to know which items are currently
* processed and written. If a crash occurs, the skip policy (executed in the same thread than the batch process)
* is able to know which items have failed.
*
* @author Sebastien GERARD
* @see com.bsb.sf.batch.service.staging.AbstractStagingItemReader
* @see com.bsb.sf.batch.service.staging.support.StagingItemCompositeProcessor
* @see com.bsb.sf.batch.service.staging.support.StagingItemCompositeWriter
*/
public class StagingItemsTracking {
private static final ThreadLocal<ItemsDescriptors> threadLocal =
new ThreadLocal<ItemsDescriptors>(){
@Override
protected ItemsDescriptors initialValue() {
return new ItemsDescriptors();
}
};
/**
* Register the given items and un-register previous items that have been registered by the current thread.
* @param items Items to register.
*/
public void registerItems(final List<? extends StagingItemWrapper> items)
{
threadLocal.set(new ItemsDescriptors(items) );
}
/**
* Get items that have been registered by the current thread.
* @return Items registered by the current thread.
*/
public List<StagingItemWrapper> getRegisteredItems() {
return threadLocal.get().getItems();
}
/**
* Utility class for keeping items tracking.
*/
private static class ItemsDescriptors {
private final List<StagingItemWrapper> items;
private ItemsDescriptors() {
this.items = new ArrayList<StagingItemWrapper>();
}
private ItemsDescriptors(List<? extends StagingItemWrapper> items) {
this.items = new ArrayList<StagingItemWrapper>(items);
}
public List<StagingItemWrapper> getItems() {
return items;
}
}
}