Results 1 to 10 of 18

Thread: Timeouts waiting for responses

Hybrid View

  1. #1

    Default Timeouts waiting for responses

    Hello, I have a problem getting responses back from an async gateway.

    The gateway looks like this:

    Code:
    public interface JobServiceGateway {
        @Gateway(requestChannel="xmlTestProcessingRequestChannel")
        Future<String> sendMessageAndGetReply(String message) throws Exception;
    }
    The xml configuration is pretty basic, the essential parts being:

    (The server)

    Code:
    <int:channel id="xmlProcessingRequestChannel" />
    
    <int-amqp:inbound-gateway id="amqpInboundGateway"
        request-channel="xmlProcessingRequestChannel"
        connection-factory="connectionFactory"
        queue-names="myproduct.xmlProcessingRequestQueue"
        concurrent-consumers="3"
    />
    (The client)

    Code:
    <int:gateway id="jobServiceGateway" service-interface="com.myproduct.JobServiceGateway"/>
    
    <int:channel id="xmlTestProcessingRequestChannel"/>
    <int:channel id="pdfTestDeliveryChannel"/>
    
    <int-amqp:outbound-gateway
        request-channel="xmlTestProcessingRequestChannel"
        exchange-name="xml-request-delivery"
        routing-key-expression="'xml-request.number'"
        amqp-template="amqpTemplate"
    />
    
    ...Code for the xml-request-delivery exchange omitted...
    The problem is that if I begin pumping in tasks like this:

    Code:
    List<FutureTask> futures = new ArrayList<FutureTask>();
    for (int i=0; i<50; i++) {
        System.out.println(new Date()+ ": Sending message " + i);
        Future<String> response = jobServiceGateway.sendMessageAndGetReply("Hello Message #" + i);
        futures.add(new FutureTask(String.valueOf(i),response));
    }
    And then wait for the tasks to complete:

    Code:
    for (FutureTask future: futures) {
        try {
            System.out.println(new Date()+ "Getting response for " + future);
            System.out.println(new Date()+ "    Response = " + future.getFuture().get(3000, TimeUnit.MILLISECONDS));
        } catch (TimeoutException e) {
            System.err.println(new Date()+ "    TIMEOUT while waiting for " + future);
        }
    }
    What happens is the following:

    - The server receives all the tasks, 3 at a time (concurrent-consumers=3). I used a synthetic delay of 2 seconds processing each task, so each bunch of 3 tasks takes 2 seconds to complete. Then it gets 3 new tasks, processes them - etc.

    - The client receives responses to *some* tasks, while others time out. This happens even if I insert a Thread.sleep(total time to process 50 tasks, 2 seconds per task, 3 at a time) between sending the messages and starting my process-responses loop.

    What is happening to the responses that are generated, but somehow never received by the client?

    I use RabbitMQ as the message middleware, and in the web interface the queue becomes empty as soon as the last task has been processed, and before the "receive-loop" starts processing task responses. Where are the responses going when they disappear out of RabbitMQ?
    Last edited by mberg; May 21st, 2012 at 06:28 AM.

  2. #2
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,844

    Default

    Can you please try increasing the timeout on the "client" side adapter? The default is only 5 seconds.

    Thanks,
    Mark

  3. #3
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,844

    Default

    Also, this recent thread might be helpful: http://forum.springsource.org/showth...tion-over-amqp

    It's quite possible that you are experiencing the same thing. Please read through the posts there to see if you agree.

    Hope that helps.
    -Mark

  4. #4

    Default

    Hello Mark, thanks for the reference. You were right, by adding a "reply-timeout" to my rabbit:template tag, and setting it to the worst case scenario, my requests were all processed without problems. I didn't try that because for some reason eclipse is not showing reply-timeout to be a valid attribute on the rabbit:template tag. Oh well.

    So what is happening is that my service is generating a reply, but my client times out before it can read it.

    I'm wondering about the temporary queues that is created in RabbitMQ. How long do they live for? If the default time-to-live for the temporary amqp reply queues is less than my rabbit reply-timeout value, what will happen then?

  5. #5
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,844

    Default

    Each temporary queue should live until a reply Message is received or as long as the timeout, whichever is longer. Then, it is deleted.

    Are you actually witnessing something different?

  6. #6

    Default

    No, but I was worried that perhaps I needed to set another timeout value elsewhere :-)

    Would it be better to create a permanent reply channel and specifying that on the gateway method? It sounds costly to create a new queue for each message?

  7. #7
    Join Date
    Oct 2012
    Posts
    2

    Default Temporary Reply Queue is not getting deleted

    Quote Originally Posted by Mark Fisher View Post
    Each temporary queue should live until a reply Message is received or as long as the timeout, whichever is longer. Then, it is deleted.

    Are you actually witnessing something different?
    Hi Mark,
    Temporary Reply Queue is not getting deleted after it received the message. I'm using Spring amqp template to send and receive the message.

    Below is the code snippet

    context = new AnnotationConfigApplicationContext(RabbitConfigura tion.class);

    Queue replyQueue = (Queue)context.getBean("springAmqpQ");
    System.out.println("Reply Queue Name : " + replyQueue.getName());
    RabbitTemplate template = (RabbitTemplate)context.getBean("amqpTemplate");
    template.setReplyTimeout(120000);
    template.setReplyQueue(replyQueue);
    //template.convertAndSend(exchange, routingKey, createMessage(reportDbPing.getBytes()));
    template.send(exchange, routingKey, createMessage(reportDbPing.getBytes()));
    System.out.println("Message sent and went for a sleep.....");
    Thread.sleep(120000);
    System.out.println("Wake up from a sleep ready to consume Message");
    Message message = template.receive(replyQueue.getName());

    One more question, on how to set ack or Nack to the RabbitMq broker using AmqpTemplate? Or AmqpTemplate.receive() implicitly does autoAck??

    Thanks in advance.

  8. #8

    Default

    When you say "client side adapter", are you referring to my "int-amqp:outbound-gateway"?

    How do I set the timeout on the outbound gateway?

    The strange thing is that when i do a future.get(3000, TimeUnit.MILLISECONDS) it actually waits for 3 seconds before it tells me the request timed out. According to the server, the message was already processed. So where did the response go?

    PS... On a related note, I read somewhere else that there is a known issue with the inbound-gateway not exposing the replyTimeout property, so I'm doing this on the server:

    Code:
    AmqpInboundGateway amqpInboundGateway = spring.getBean("amqpInboundGateway",AmqpInboundGateway.class);
    amqpInboundGateway.setReplyTimeout(4000);
    For a single, non-async (i.e. non-"futurized" gateway) I had to do this if my message took longer than 1000 ms to process.
    Last edited by mberg; May 21st, 2012 at 07:05 AM.

  9. #9
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,038

    Default

    How do I set the timeout on the outbound gateway?
    Code:
    <int-amqp:outbound-gateway
        request-channel="xmlTestProcessingRequestChannel"
        exchange-name="xml-request-delivery"
        routing-key-expression="'xml-request.number'"
        amqp-template="amqpTemplate"
    />
    See post #7 in the thread Mark referred you too - it's an attribute on the amqpTemplate that's referenced by the outbound gateway.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

Tags for this Thread

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •