I am using 1.0M5.
The cause of the problem seems to be SourceEndpoint.poll(). It should be sufficient to catch the Exception, call onFailure() and throw the Exception again. See complete explanation below.
I took a look into the test source code delivered with spring integration for some inspiration how to set up a TestCase. Well, it seems something went wrong with the jar. It is nearly empty, so that didn't help much. Probably it would be enough to set up a test for SourceEndpoint.poll() with Mocks for MessageSource, OutputChannel and MessageDeliveryAware. The OutputChannel-Mock should throw an Exception and the test should fail, if MessageDeliveryAware.onFailure() isn't called.
Here are some incomplete code-snippets, to show the cause of the problem:
I implemented a FileSource with overriding onSend and onFailure:
public class MyFileSource extends org.springframework.integration.adapter.file.FileS ource {
@Override
public void onSend(Message<?> message) {
super.onSend(message);
moveFile(message, successDirectory);
}
@Override
public void onFailure(MessagingException exception) {
super.onFailure(exception);
moveFile(exception.getFailedMessage(), failureDirectory);
}
}
I configure this FileSource programmatically, including a TransactionInterceptor:
MyFileSource fileSource = new MyFileSource(path, messageCreator);
SourceEndpoint fileSourceEndpoint = new SourceEndpoint(fileSource);
fileSourceEndpoint.setName(fileSourceId);
fileSourceEndpoint.setOutputChannelName("fileChann el");
Schedule schedule = new PollingSchedule(15000);
fileSourceEndpoint.setSchedule(schedule);
TransactionInterceptor transactionInterceptor = new TransactionInterceptor(transactionManager);
transactionInterceptor.afterPropertiesSet();
fileSourceEndpoint.addInterceptor(transactionInter ceptor);
messageBus.registerEndpoint(fileSourceEndpoint);
I have a router that throws an error, when a plausibility check fails:
@MessageEndpoint(input = "fileChannel")
public class MyMessageRouter {
@Router
public String route(Message<?> message) {
if (isNotPlausible(message))
{
throw new MessagingException(message, "Not plausible");
}
return getChannelName(message);
}
}
I hoped, that in case of above exception, MyFileSource.onFailure() would have been called. But onFailure() only is called when in org.springframework.integration.endpoint.SourceEnd point.poll() getOutputChannel().send(message, this.sendTimeout) delivers false:
In org.springframework.integration.endpoint.SourceEnd point:
public boolean poll() {
...
boolean sent = this.getOutputChannel().send(message, this.sendTimeout);
if (this.source instanceof MessageDeliveryAware) {
if (sent) {
((MessageDeliveryAware) this.source).onSend(message);
}
else {
((MessageDeliveryAware) this.source).onFailure(new MessageDeliveryException(message, "failed to send message"));
}
}
return sent;
}
But this is not the case, as org.springframework.integration.endpoint.HandlerEn dpoint has following send()-Implementation:
in org.springframework.integration.endpoint.HandlerEn dpoint:
public boolean send(Message<?> message) {
Message<?> replyMessage = this.handler.handle(message);
if (replyMessage != null) {
if (replyMessage.getHeader().getCorrelationId() == null) {
replyMessage.getHeader().setCorrelationId(message. getId());
}
this.replyHandler.handle(replyMessage, message.getHeader());
}
return true;
}
This is always true or an exception is thrown. Therefore onFailure() is never called.
Of course it is right to propagate the exeception through the call stack, otherwise the TransactionInterceptor wouldn't rollback the transaction!
So I suggest to catch the Exception in SourceEndpoint.poll(), call onFailure() and throw the Exception again.
Regards,
Maarten