Page 1 of 2 12 LastLast
Results 1 to 10 of 19

Thread: Preserve header values when exception thrown down error-channel?

  1. #1
    Join Date
    Apr 2011
    Posts
    27

    Default Preserve header values when exception thrown down error-channel?

    Is there anyway to preserve header values when I throw an exception (down the error channel)?

    The reason is that I have a poller start point (which is a database) and a chain of transformers etc - any of which can throw an exception. (For context the database I am polling from is a custom persistent queue that was populated by an inbound message earlier).

    When the message is initally polled I mark the row in the database as "P" for processing - the row in the databsebase holds a unique id - which is then put into the message header. If the message successfully passes down the chain then the last thing I do is to mark the row in the database (with the uniquie id) as "S" for successful, if the message fails (because an exception is thrown) I want the (chain specific) error-channel to make the row in the database (with the uniquie id) as "F" for fail.

    So my issue is that I am designing an error-channel and the exception is being wrapped up in a message and is being delivered to me. Now for the successful message the unique Id is in the header (as a key/value pair) and has been passed along the chain - until the very end where I pass the message into a success(Message) method where I pick the unique id out the header and update the database. However for the Exception message its a new message and the header values are not passed into it. So I would like the header values to be passed to the new exception message.

    Im not sure what the semantics of the header values is. But I would like the header to contain the spring context information and the payload to hold the business - and Im trying to keep the database polling queue in the spring layer and away from the "business" layer.

    So the basic question is - is there a way to put the header values into the exception message (from the xml) OR is there a best practice to pass data from headers to exceptions?

    For the moment I'll stick the unquie id from the message header in the exception - but Id rather keep the exception as the underlying error (e.g. transformation error).

  2. #2
    Join Date
    Apr 2011
    Posts
    27

    Default is this right

    I'm picking the failedMessage from the error-channel message. So I can get at the original header from here.



    <int:outbound-channel-adapter channel="failMessageChannel" ref="foGatewayAdapter" method="fail"/>

    public void fail(final Message<MessagingException> message) throws Exception
    {
    updateStatus( message.getPayload().getFailedMessage(), MessageQueueStatus.FAIL );
    }



    So my only question is whether when I send a failed message via the error-channel back to the original gateway does this act as a reply message? or do I need to forward my error message to the reply-channel?

  3. #3
    Join Date
    Jan 2008
    Location
    Mohnton, PA USA (that's near Philadelphia)
    Posts
    2,148

    Default

    If you simply return the same or new ErrorMessage from the subscriber of the error-channel, it is as if you didn't define an error-channel in the first place. It will be converted to Exception. This however is still useful technique to intercept the exception for logging/auditing purposes.
    However as I stated before a lot of times the use case is to intercept the exception and return a successful Message which might contain some error code (similar to SOAP Fault). In this case the caller will never see the exception but would have to look at the reply (payload) to determine if there was an error downstream.

  4. #4
    Join Date
    Jan 2009
    Location
    Ukraine, Kharkov
    Posts
    645

    Default

    Hello.
    My be it will be able help you: Spring-Integration errorHandling
    http://git.springsource.org/spring-i.../errorhandling

    Artem

  5. #5

    Default

    Hello,
    I am trying to implement retry mechanism which goes like this:
    - if exception is thrown in my service activator, which is btw invoked asynchrously through dispatcher channel followed by gateway, I'd like to get ErrorMessage in my errorChannel, extract the payload, increment the retryCount in Header and put it back to the dispatcher channel after some dealy so it is eventually passed to my service activator again.

    I think I have the same problem as the author of this thread, namely at the moment ErrorMessage is created after the exception in my service activator, the whole SI message is put as a payload in ErrorMessage, but unfortunately Header where I keep retryCount variable is not preserved.

    Every time Error Message is created, it's Payload contains my business entity but without custom header. The only workaround I found, and which I really don't like is to pass retryCount in my exception constructor, and then retrieve that in My handler that consumes errorChannel.

    Is this an expected behaviour?
    What I'd expect is that when SI creates Error Message and puts the whole SI message that failed as a payload of that error message, so that it also preserves headers, not just the entity itself.

    Could somebody look at this please and advise?
    Thanks & best,
    Milan

  6. #6

    Default

    Could someone give me a hand, please?

    I see ErrorMessage in the source code:

    public ErrorMessage(Throwable payload) {
    super(payload);
    }

    public ErrorMessage(Throwable payload, Map<String, Object> headers) {
    super(payload, headers);
    }

    But it looks like at the moment ErrorMessage is put into errorChannel, my subscriber (Transformer) is not able to retrieve original Headers of the message that failed, which I guess means 1st constructor is called somewhere in code, instead of the 2nd one.

    Retrieving Headers from the failed message is really a big deal for me, since it looks like the simplest way to implement retry (preserving retryCount as a part of the @Header).

    Another approach suggested on this forum is to keep retryCount as a part of the @Payload. While I see that it would certainly work, that's not an option for me, since Payload is a Roo Entity, in which I can't store field related to internal messaging system.

    Btw I am using 2.0.4.RELEASE
    Thanks for your help,
    Milan

  7. #7
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,844

    Default

    The Exception that is a payload of your ErrorMessage most likely is assignable to MessagingException. If so, that class defines a getFailedMessage() method, and there you will have the original Message - including it's payload AND headers.

    Hope that helps.
    -Mark

  8. #8

    Default

    Thanks Mark,

    Yes, it is MessagingException that I am getting.
    However Headers set in the ServiceActivator that eventually throws Exception is not preserved.

    Here is my ErrorHandlingTransformer which has errorChannel as input:

    Code:
    public class ErrorHandlingTransformer {
    
    	final Logger log = LoggerFactory.getLogger(ErrorHandlingTransformer.class);
    	
    	/**
    	 * Unwraps the ErrorMessage, and gets Transaction from Payload and current retryCount from Exception.
    	 */
    	@Transformer
    	public Message<Transaction> execute(ErrorMessage message) {
    		log.debug("inside transformer");
    		Message<?> siMessage = ((MessagingException) message.getPayload()).getFailedMessage();
    		// retry count does not get preserved in header. The line below always returns 0.
    		//Integer retryCountInHeader = message.getHeaders().get("retryCount")==null?0:(Integer)message.getHeaders().get("retryCount");
                   // instead as a dirty hack, retrieve retryCount from Exception itself.
    		String exceptionMessage = ((MessagingException) message.getPayload()).getCause().getMessage();
    		log.debug("exceptionMessage: " + exceptionMessage);
    		int retryCount = Integer.parseInt(exceptionMessage);
    		Transaction transaction = (Transaction)siMessage.getPayload();
    		log.debug("failed tansaction " + transaction.getId() + ", after " + retryCount + " retries.");
    		retryCount++;
    		return MessageBuilder.withPayload(transaction).setHeader("retryCount", retryCount).build();
    	}
    }
    Any idea, why it could be the case?
    Thanks,
    Milan

  9. #9

    Default

    Sorry for bothering you again, but I'd really like to know if I am doing something wrong.

    My Service Activator class looks like this:
    Code:
    public class ConversationService {
    	final Logger log = LoggerFactory.getLogger(ConversationService.class);
    
    	/**
    	 * simulation of unpredictable error
    	 */
    	@Value("${error-probability}")
    	private Double errorProbability;
    	
    	/**
    	 * merge needs to be called only if service is called in separate thread then input adapter poller
    	 */
    	@Value("${should-merge}")
    	private Boolean shouldMerge = false;
    	
    	/**
    	 * max number of retries
    	 */	
    	@Value("${num-of-retries}")
    	private Integer numOfRetries = 3;
    	
    	/**
    	 * Creates conversation and update transaction to processed in case of success. In case it fails, new exception is thrown with the message = current retry count.
    	 */
    	@ServiceActivator
    	public void execute(Message<Transaction> message) throws Exception {
    		Transaction transaction = (Transaction) message.getPayload();
            // I am getting here retry count correctly set from transformer
    		Integer retryCount = message.getHeaders().get("retryCount")==null?0:(Integer)message.getHeaders().get("retryCount");
    		log.debug("retryCount: " + retryCount);
    		
                  try{
    		double calculated = Math.random();
    		if(errorProbability > calculated){
    			log.debug("Calculated: " + calculated + " , so transaction " + transaction.getId() + " will retry.");
    			throw new Exception("something bad bad happened");
    		}
    		else {
    			// my business logic
    		}
    		} catch(Exception e){
    			if(retryCount.intValue() > numOfRetries){
    				// do not throw exception, mark transaction as pending
    				log.debug("change status to pending");
    				transaction.setStatus(TransactionStatus.pending.name());
    				if(shouldMerge){
    				    transaction.merge();
    				}
    			}
    			else {
    				// throw exception
    				// we need to pass current retry count as a part of the exception, since headers are not preserved when message
    				// is sent to error channel.
    				throw new Exception(String.valueOf(retryCount));
    			}
    		}
    	}
    }
    And my integration application context looks like this:

    Code:
    <int:channel id="input" datatype="java.util.List"/>
    	<int:channel id="output"/>		
    	<int:channel id="transaction" datatype="com.ma.siroo.model.Transaction">
    	<int:dispatcher task-executor="pool"/>
    	
    	</int:channel>
        <int:channel id="errorChannel">
        <int:queue capacity="${queue-capacity}"/>
        </int:channel>	
        
        <task:executor id="pool"
                    pool-size="${pool-size}"
                    queue-capacity="${queue-capacity}" 
                    keep-alive="120" rejection-policy="CALLER_RUNS"/>
    	
    	    <task:executor id="poller"
                    pool-size="1" />
       
       <int:inbound-channel-adapter channel="input" method="receive" ref="transactionInputAdapter">
    		<int:poller fixed-rate="${fixed-rate}" receive-timeout="${receive-timeout}" max-messages-per-poll="${max-messages-per-poll}" task-executor="poller">
       	</int:poller>
    	</int:inbound-channel-adapter>
    	<bean id="transactionInputAdapter" class="com.ma.siroo.integration.adapter.TransactionInputAdapter">
    	</bean>
    	
    	
    	<int:splitter id="transactionSplitter" input-channel="input" output-channel="transaction"/>	
    	
    	<int:gateway id="transactionGateway"
    		service-interface="com.ma.siroo.integration.gateway.TransactionGateway" default-request-channel="transaction" error-channel="errorChannel"/>
    	
    	 <int:service-activator input-channel="transaction">
           <bean class="com.ma.siroo.integration.service.ConversationService"/>
        </int:service-activator>  
        
    
        <int:service-activator input-channel="errorChannel" output-channel="transaction">
           <bean class="com.ma.siroo.integration.transformer.ErrorHandlingTransformer"/>
               <int:poller fixed-rate="${fixed-rate}" task-executor="poller">
           </int:poller>
        </int:service-activator>

    Thanks again,
    Milan

  10. #10

    Default

    I still hope that somebody from SI developers would have a solution for my problem.

    Thanks,
    Milan

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •