Results 1 to 5 of 5

Thread: Durable RPC with Spring Integration and RabbitMQ

  1. #1

    Default Durable RPC with Spring Integration and RabbitMQ

    I have created a simple message sender and consumer and linked them together using spring-integration and RabbitMq.

    I have used the outbound-gateway and inbound-gateway in order to create RPC (request / response) style messaging. I have not specified a reply-channel as I am leaving this up to spring to create (I am assuming it will create an anonymous queue for the reply).

    This works just fine when the consumer is started before the sender, but I am not receiving a response if the consumer is started after the sender. I can see that the consumer receives the message but no response is returned.

    On further inspection, I can see that a temporary anonymous queue is created in rabbit when the sender sends the message, and the message contains this queue name in the reply-to header. This queue, however, disappears shortly after it is created, before I start up the consumer. I'm guessing that as the queue no longer exists, the consumer cannot publish a response to it.

    I can see from the rabbit admin tool that the anonymous queue is set as exclusive, and auto-delete is set to true. I have no control over these properties though as the queue is being created by spring-integration.

    Does anyone have any ideas of how to solve this? My config is as follows:

    Sender:

    Code:
    <import resource="classpath:rabbit.xml" />
    
    <int:channel id="output" />
    
    <int:gateway id="senderGateway" service-interface="gordon.outbound.SenderGateway" default-request-channel="output"/>
    
    <int-amqp:outbound-gateway request-channel="output"
                                       amqp-template="amqpTemplate" exchange-name="silly-wabbit-exchange"
                                       routing-key="silly-wabbit-key"/>
    Consumer:

    Code:
    <import resource="classpath:rabbit.xml" />
    
    <int:channel id="input"/>
    
    <int-amqp:inbound-gateway request-channel="input" queue-names="silly-wabbit-queue" connection-factory="connectionFactory"/>
    
    <bean id="listenerService" class="gordon.inbound.ListenerService"/>
    
    <int:service-activator input-channel="input" ref="listenerService" method="receiveMessage"/>

  2. #2
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,040

    Default

    The default replyTimeout is 5 seconds. If the consumer doesn't reply in 5 seconds, the outbound gatgeway's consumer is canceled, removing the temporary queue.

    You can increase the timeout by configuring a reply-timeout on the <rabbit:template/> (milliseconds).
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  3. #3

    Default

    Thanks Gary... Would I expect to see an exception or something similar upon timeout so that I can react to it in my code?

    Currently it just seems to continue to block.

    Also, is there a suggested approach to what the consumer should then do with a message that the producer is no longer waiting for?

  4. #4
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,040

    Default

    It shouldn't be blocked - under DEBUG logging, you should see

    logger.debug("handler '" + this + "' produced no reply for request Message: " + message);

    In a future release, we could expose an (existing) property "requiresReply", which would cause the gateway to throw an exception rather than just emitting a DEBUG log.

    However, if you believe it's blocking, can you submit a thread dump? (use jstack, or VisualVM)
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  5. #5
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,040

    Default

    I see you are using an <int:gateway/> to send to the amqp gateway; it WILL block (because it's default timeout is infinity). You can set a default-reply-timeout on the <int:gateway/> and you'll get an exception.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

Tags for this Thread

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •