Results 1 to 5 of 5

Thread: AbstractDirectorySource.onFailure() never called?

  1. #1
    Join Date
    Jul 2008
    Posts
    15

    Default AbstractDirectorySource.onFailure() never called?

    Hi,

    I am trying to use AbstractDirectorySource.onSend() and onFailure() to move handled files to a success or a failure-directory. This works fine with onSend(). However if something goes wrong, onFailure() isn't called and therefore the file remains where it is and is not moved to my failure directory.

    I tried to throw some Exception (e.g. MessageException) from somewhere in the messaging process, in this case in a router. This doesn't result in a call to AbstractDirectorySource.onFailure(). Is this behaviour intentioned or is it a bug? Should I implement the functionality somewhere else?

    I see that AbstractFileMessageCreator is intended to delete the files, but I can't know if the processing will be successfull while creating the message.

    Best regards,
    Maarten Donders
    Last edited by mdond; Jul 23rd, 2008 at 10:07 AM.

  2. #2
    Join Date
    May 2007
    Location
    Netherlands
    Posts
    614

    Default

    I remember (vaguely) I logged a bug for this where the wrong exception type was thrown. Which version are you using?

    Can you post some code (preferably a TestCase) that shows the problem? I can't seem to find the reference I was looking for.
    Last edited by iwein; Jul 23rd, 2008 at 03:38 PM. Reason: additional question

  3. #3
    Join Date
    Jul 2008
    Posts
    15

    Default

    I am using 1.0M5.

    The cause of the problem seems to be SourceEndpoint.poll(). It should be sufficient to catch the Exception, call onFailure() and throw the Exception again. See complete explanation below.

    I took a look into the test source code delivered with spring integration for some inspiration how to set up a TestCase. Well, it seems something went wrong with the jar. It is nearly empty, so that didn't help much. Probably it would be enough to set up a test for SourceEndpoint.poll() with Mocks for MessageSource, OutputChannel and MessageDeliveryAware. The OutputChannel-Mock should throw an Exception and the test should fail, if MessageDeliveryAware.onFailure() isn't called.

    Here are some incomplete code-snippets, to show the cause of the problem:

    I implemented a FileSource with overriding onSend and onFailure:

    public class MyFileSource extends org.springframework.integration.adapter.file.FileS ource {
    @Override
    public void onSend(Message<?> message) {
    super.onSend(message);
    moveFile(message, successDirectory);
    }

    @Override
    public void onFailure(MessagingException exception) {
    super.onFailure(exception);
    moveFile(exception.getFailedMessage(), failureDirectory);
    }
    }


    I configure this FileSource programmatically, including a TransactionInterceptor:

    MyFileSource fileSource = new MyFileSource(path, messageCreator);
    SourceEndpoint fileSourceEndpoint = new SourceEndpoint(fileSource);
    fileSourceEndpoint.setName(fileSourceId);
    fileSourceEndpoint.setOutputChannelName("fileChann el");
    Schedule schedule = new PollingSchedule(15000);
    fileSourceEndpoint.setSchedule(schedule);
    TransactionInterceptor transactionInterceptor = new TransactionInterceptor(transactionManager);
    transactionInterceptor.afterPropertiesSet();
    fileSourceEndpoint.addInterceptor(transactionInter ceptor);
    messageBus.registerEndpoint(fileSourceEndpoint);

    I have a router that throws an error, when a plausibility check fails:

    @MessageEndpoint(input = "fileChannel")
    public class MyMessageRouter {
    @Router
    public String route(Message<?> message) {
    if (isNotPlausible(message))
    {
    throw new MessagingException(message, "Not plausible");
    }
    return getChannelName(message);
    }
    }

    I hoped, that in case of above exception, MyFileSource.onFailure() would have been called. But onFailure() only is called when in org.springframework.integration.endpoint.SourceEnd point.poll() getOutputChannel().send(message, this.sendTimeout) delivers false:

    In org.springframework.integration.endpoint.SourceEnd point:

    public boolean poll() {
    ...
    boolean sent = this.getOutputChannel().send(message, this.sendTimeout);
    if (this.source instanceof MessageDeliveryAware) {
    if (sent) {
    ((MessageDeliveryAware) this.source).onSend(message);
    }
    else {
    ((MessageDeliveryAware) this.source).onFailure(new MessageDeliveryException(message, "failed to send message"));
    }
    }
    return sent;
    }

    But this is not the case, as org.springframework.integration.endpoint.HandlerEn dpoint has following send()-Implementation:

    in org.springframework.integration.endpoint.HandlerEn dpoint:

    public boolean send(Message<?> message) {
    Message<?> replyMessage = this.handler.handle(message);
    if (replyMessage != null) {
    if (replyMessage.getHeader().getCorrelationId() == null) {
    replyMessage.getHeader().setCorrelationId(message. getId());
    }
    this.replyHandler.handle(replyMessage, message.getHeader());
    }
    return true;
    }

    This is always true or an exception is thrown. Therefore onFailure() is never called.
    Of course it is right to propagate the exeception through the call stack, otherwise the TransactionInterceptor wouldn't rollback the transaction!

    So I suggest to catch the Exception in SourceEndpoint.poll(), call onFailure() and throw the Exception again.

    Regards,
    Maarten
    Last edited by mdond; Jul 24th, 2008 at 04:10 AM.

  4. #4
    Join Date
    Jul 2008
    Posts
    15

    Default

    The following change in org.springframework.integration.endpoint.HandlerEn dpoint works for me:

    Code:
    	public boolean poll() {
    		Message<?> message = (this.source instanceof BlockingSource && this.receiveTimeout >= 0) ? ((BlockingSource<?>) this.source)
    				.receive(this.receiveTimeout)
    				: this.source.receive();
    		if (message == null) {
    			return false;
    		}
    		boolean sent = false;
    		try {
    			sent = this.getOutputChannel().send(message, this.sendTimeout);
    		} catch (RuntimeException error) {
    			if (this.source instanceof MessageDeliveryAware) {
    				((MessageDeliveryAware) this.source)
    						.onFailure(new MessageDeliveryException(message, error
    								.getMessage()));
    			}
    			throw error;
    		}
    		if (this.source instanceof MessageDeliveryAware) {
    			if (sent) {
    				((MessageDeliveryAware) this.source).onSend(message);
    			} else {
    				((MessageDeliveryAware) this.source)
    						.onFailure(new MessageDeliveryException(message,
    								"failed to send message"));
    			}
    		}
    		return sent;
    	}
    Regards,
    Maarten

  5. #5
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,853

    Default

    Maarten,

    Could you please add an issue in JIRA for this?

    Thanks,
    Mark

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •