Results 1 to 4 of 4

Thread: RPC style exception handling with Spring Integration & RabbitMQ

  1. #1

    Default RPC style exception handling with Spring Integration & RabbitMQ

    I have a simple producer / consumer setup doing an RPC style call over RabbitMQ using Spring Integration...

    I would like to know what the best paten for handling exceptions thrown by the consumer is? In an ideal world I would have wanted the exception to be thrown directly out of the int:gateway on the producer so that it can be handled by the client.

    This is my producer config:

    Code:
        <rabbit:connection-factory id="connectionFactory"/>
    
        <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>
    
        <rabbit:admin connection-factory="connectionFactory"/>
    
        <rabbit:queue name="Q.aggregator.Proceed"/>
    
        <rabbit:direct-exchange name="E.aggregator.direct.Proceed">
            <rabbit:bindings>
                <rabbit:binding queue="Q.aggregator.Proceed" key="aggregator.Proceed"/>
            </rabbit:bindings>
        </rabbit:direct-exchange>
    
        <rabbit:direct-exchange name="E.thomas.direct.Proceed"/>
    
        <int:channel id="aggregator.Proceed"/>
    
        <int:gateway id="uwEngine" service-interface="some.Interface" default-request-channel="aggregator.Proceed" />
    
        <int-amqp:outbound-gateway request-channel="aggregator.Proceed" amqp-template="amqpTemplate" exchange-name="E.aggregator.direct.Proceed" routing-key="aggregator.Proceed"/>
    This is the consumer:

    Code:
        <int:channel id="aggregator.Proceed"/>
    
        <int-amqp:inbound-gateway request-channel="aggregator.Proceed" queue-names="Q.aggregator.Proceed" connection-factory="connectionFactory" concurrent-consumers="1"/>
    
        <bean id="beany" class="some.Interface"/>
    
        <int:service-activator input-channel="aggregator.Proceed" ref="beany" method="proceed"/>
    I currently get the following error:

    Code:
    INFO: Execution of Rabbit message listener failed, and no ErrorHandler has been set: class org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Listener threw exception
    Nov 20, 2012 2:16:00 PM org.springframework.integration.gateway.MessagingGatewaySupport doSendAndReceive
    WARNING: failure occurred in gateway sendAndReceive
    From reading other threads and documentation I can see a couple of options, but none would seem to get the exception back to the client...

    • Define the error-channel property on the int-amqp:inbound-gateway on the consumer and then define an int-amqp:outbound-gateway to send the error back to the client. I am not sure how I would send this back to the temporary queue that was created and the client is blocking on.
    • Define an ErrorHandler, not sure what I should be doing inside it, or where this is documented?
    • Catch all exceptions at the consumer level and wrap these in my response payload.
    • Timeout the response on the client, useless because I won't know why...


    Thanks in advance!

  2. #2
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,017

    Default

    1. Add an error-channel to the inbound-gateway; add a service-activator to consume from that channel to invoke some custom service that examines the payload (MessagingException with failedMessage and cause properties) and creates some form of error message to return to the client (simply return that value from the service and the gateway will return it to the client.

    2. On the client, add a reply-channel to the outbound-gateway; add a consumer that examines the reply and, if it's the error message, throw an exception and it will be propagated to the gateway client. If the message is not an error, simply return it from this response analyzer service and it will be returned to the gateway normally.

    In effect, you are adding a post-processing service after the outbound gateway.

    HTH
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  3. #3

    Default

    Thanks Gary, that makes sense.

    Where I get confused is how the reply-channel that is created gets wired back to the int:gateway to receive the response. In my naive mind this channel seems to have no endpoints...

    If I just wanted to handle all exceptions and return these as error codes within my payload, would the service-activator be the place to do this?

  4. #4
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,017

    Default

    When you call the <int:gateway/>, the gateway creates a temporary channel and puts it in the message replyChannel header. This channel is ultimately where any reply will be sent; if there is any asynch processing, the thread calling the gateway will block in receive() on this channel. If there is no asynch handoff, the thread will call receive() to immediately retrieve the result when the flow ends. It's not "wired into the gateway", it's wired into the message.

    If you don't want to propagate the error by throwing an exception, you don't need any post-processing on the producer, just assemble the response you want in the service wired into the error-channel on the consumer inbound gateway.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

Tags for this Thread

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •