View Full Version : Timeouts waiting for responses
mberg
May 21st, 2012, 06:13 AM
Hello, I have a problem getting responses back from an async gateway.
The gateway looks like this:
public interface JobServiceGateway {
@Gateway(requestChannel="xmlTestProcessingRequestChannel")
Future<String> sendMessageAndGetReply(String message) throws Exception;
}
The xml configuration is pretty basic, the essential parts being:
(The server)
<int:channel id="xmlProcessingRequestChannel" />
<int-amqp:inbound-gateway id="amqpInboundGateway"
request-channel="xmlProcessingRequestChannel"
connection-factory="connectionFactory"
queue-names="myproduct.xmlProcessingRequestQueue"
concurrent-consumers="3"
/>
(The client)
<int:gateway id="jobServiceGateway" service-interface="com.myproduct.JobServiceGateway"/>
<int:channel id="xmlTestProcessingRequestChannel"/>
<int:channel id="pdfTestDeliveryChannel"/>
<int-amqp:outbound-gateway
request-channel="xmlTestProcessingRequestChannel"
exchange-name="xml-request-delivery"
routing-key-expression="'xml-request.number'"
amqp-template="amqpTemplate"
/>
...Code for the xml-request-delivery exchange omitted...
The problem is that if I begin pumping in tasks like this:
List<FutureTask> futures = new ArrayList<FutureTask>();
for (int i=0; i<50; i++) {
System.out.println(new Date()+ ": Sending message " + i);
Future<String> response = jobServiceGateway.sendMessageAndGetReply("Hello Message #" + i);
futures.add(new FutureTask(String.valueOf(i),response));
}
And then wait for the tasks to complete:
for (FutureTask future: futures) {
try {
System.out.println(new Date()+ "Getting response for " + future);
System.out.println(new Date()+ " Response = " + future.getFuture().get(3000, TimeUnit.MILLISECONDS));
} catch (TimeoutException e) {
System.err.println(new Date()+ " TIMEOUT while waiting for " + future);
}
}
What happens is the following:
- The server receives all the tasks, 3 at a time (concurrent-consumers=3). I used a synthetic delay of 2 seconds processing each task, so each bunch of 3 tasks takes 2 seconds to complete. Then it gets 3 new tasks, processes them - etc.
- The client receives responses to *some* tasks, while others time out. This happens even if I insert a Thread.sleep(total time to process 50 tasks, 2 seconds per task, 3 at a time) between sending the messages and starting my process-responses loop.
What is happening to the responses that are generated, but somehow never received by the client?
I use RabbitMQ as the message middleware, and in the web interface the queue becomes empty as soon as the last task has been processed, and before the "receive-loop" starts processing task responses. Where are the responses going when they disappear out of RabbitMQ?
Mark Fisher
May 21st, 2012, 06:50 AM
Can you please try increasing the timeout on the "client" side adapter? The default is only 5 seconds.
Thanks,
Mark
Mark Fisher
May 21st, 2012, 06:52 AM
Also, this recent thread might be helpful: http://forum.springsource.org/showthread.php?126578-Random-Failures-when-running-spring-integration-over-amqp
It's quite possible that you are experiencing the same thing. Please read through the posts there to see if you agree.
Hope that helps.
-Mark
mberg
May 21st, 2012, 07:00 AM
When you say "client side adapter", are you referring to my "int-amqp:outbound-gateway"?
How do I set the timeout on the outbound gateway?
The strange thing is that when i do a future.get(3000, TimeUnit.MILLISECONDS) it actually waits for 3 seconds before it tells me the request timed out. According to the server, the message was already processed. So where did the response go?
PS... On a related note, I read somewhere else that there is a known issue with the inbound-gateway not exposing the replyTimeout property, so I'm doing this on the server:
AmqpInboundGateway amqpInboundGateway = spring.getBean("amqpInboundGateway",AmqpInboundGateway.class);
amqpInboundGateway.setReplyTimeout(4000);
For a single, non-async (i.e. non-"futurized" gateway) I had to do this if my message took longer than 1000 ms to process.
Gary Russell
May 21st, 2012, 07:12 AM
How do I set the timeout on the outbound gateway?
<int-amqp:outbound-gateway
request-channel="xmlTestProcessingRequestChannel"
exchange-name="xml-request-delivery"
routing-key-expression="'xml-request.number'"
amqp-template="amqpTemplate"
/>
See post #7 in the thread Mark referred you too - it's an attribute on the amqpTemplate that's referenced by the outbound gateway.
mberg
May 21st, 2012, 07:15 AM
Hello Mark, thanks for the reference. You were right, by adding a "reply-timeout" to my rabbit:template tag, and setting it to the worst case scenario, my requests were all processed without problems. I didn't try that because for some reason eclipse is not showing reply-timeout to be a valid attribute on the rabbit:template tag. Oh well.
So what is happening is that my service is generating a reply, but my client times out before it can read it.
I'm wondering about the temporary queues that is created in RabbitMQ. How long do they live for? If the default time-to-live for the temporary amqp reply queues is less than my rabbit reply-timeout value, what will happen then?
Mark Fisher
May 21st, 2012, 07:20 AM
Each temporary queue should live until a reply Message is received or as long as the timeout, whichever is longer. Then, it is deleted.
Are you actually witnessing something different?
mberg
May 21st, 2012, 07:28 AM
No, but I was worried that perhaps I needed to set another timeout value elsewhere :-)
Would it be better to create a permanent reply channel and specifying that on the gateway method? It sounds costly to create a new queue for each message?
Gary Russell
May 21st, 2012, 07:31 AM
That's a new feature in spring-amqp 1.1.0 - I mentioned that in post#4 in the same thread.
mberg
May 21st, 2012, 07:42 AM
Thanks for your reply Gary. I didn't notice the reply-queue attribute.
I'm a bit confused now. How does this reply-queue work together with the replyChannel annotation value on my gateway method?
....
@Gateway(requestChannel="xmlTestProcessingRequestChannel", replyChannel="pdfTestDeliveryChannel")
Future<String> sendMessageAndGetReply(String message) throws Exception;
....
Gary Russell
May 21st, 2012, 07:49 AM
You don't need a replyChannel on your gateway; because you don't have a reply-channel on your outbound gateway, the framework knows it needs to send the reply to the gateway.
The reply queue on the rabbit template is unrelated to the framework reply channel. The reply-queue is an AMQP (rabbit) Queue on which all replies are sent. This is to avoid the overhead of creating a new queue for each reply.
It is a new feature in the 1.1.0 release that came out last week.
mberg
May 21st, 2012, 08:03 AM
Thanks Gary.
There is a replyTo message header, a reply-channel on the outbound gateway and a reply queue on the template.
I know there's probably not a much better way to do this, but it can be confusing to a newbie like myself.. :-)
What header parts are relevant with respect to message routing? I know replyTo stores the queue to which to send the reply to. But if that queue has a fixed name, how does the system know which client should pull the reply off the queue? I mean, if multiple clients are talking to my server, and the reply queue has a fixed name, how are messages routed out to each client?
mberg
May 21st, 2012, 08:16 AM
After adding the reply-queue header to my rabbit:template tag, I'm getting this error:
ERROR org.springframework.amqp.rabbit.core.RabbitTemplat e [SimpleAsyncTaskExecutor-1] RabbitTemplate - No correlation header in reply
No hits on google for this error ... perhaps you can help?
Gary Russell
May 21st, 2012, 08:23 AM
Oops; sorry; for this to work out of the box, you need Spring Integration 2.2.0.M1.
However, you can make it work with earlier versions by mapping the headers on the server side. If you look at debug logs on the server, you will see that some headers are being dropped; you need to add mappings for
spring_reply_correlation
spring_reply_to
mberg
May 21st, 2012, 08:44 AM
Adding the mapping solved the problem. I guess everything is working as it should now, thanks for your help Gary and Mark :-)
ygowda
Oct 23rd, 2012, 06:26 PM
Each temporary queue should live until a reply Message is received or as long as the timeout, whichever is longer. Then, it is deleted.
Are you actually witnessing something different?
Hi Mark,
Temporary Reply Queue is not getting deleted after it received the message. I'm using Spring amqp template to send and receive the message.
Below is the code snippet
context = new AnnotationConfigApplicationContext(RabbitConfigura tion.class);
Queue replyQueue = (Queue)context.getBean("springAmqpQ");
System.out.println("Reply Queue Name : " + replyQueue.getName());
RabbitTemplate template = (RabbitTemplate)context.getBean("amqpTemplate");
template.setReplyTimeout(120000);
template.setReplyQueue(replyQueue);
//template.convertAndSend(exchange, routingKey, createMessage(reportDbPing.getBytes()));
template.send(exchange, routingKey, createMessage(reportDbPing.getBytes()));
System.out.println("Message sent and went for a sleep.....");
Thread.sleep(120000);
System.out.println("Wake up from a sleep ready to consume Message");
Message message = template.receive(replyQueue.getName());
One more question, on how to set ack or Nack to the RabbitMq broker using AmqpTemplate? Or AmqpTemplate.receive() implicitly does autoAck??
Thanks in advance.
Gary Russell
Oct 24th, 2012, 09:02 AM
The template doesn't use temporary queues when you specify a replyQueue. See the java docs...
/**
* A queue for replies; if not provided, a temporary exclusive, auto-delete queue will
* be used for each reply.
*
* @param replyQueue the replyQueue to set
*/
However, the replyQueue (or temporary reply queue) is only used by the template when you use the ...sendAndReceive(...) methods. Also, an explicit replyQueue requires a listenerContainer to handle the replies. Ack behavior is configured on the listener container. Besides transactions, there is no way to control the ack behavior in receive().
ygowda
Oct 24th, 2012, 12:22 PM
The template doesn't use temporary queues when you specify a replyQueue. See the java docs...
/**
* A queue for replies; if not provided, a temporary exclusive, auto-delete queue will
* be used for each reply.
*
* @param replyQueue the replyQueue to set
*/
However, the replyQueue (or temporary reply queue) is only used by the template when you use the ...sendAndReceive(...) methods. Also, an explicit replyQueue requires a listenerContainer to handle the replies. Ack behavior is configured on the listener container. Besides transactions, there is no way to control the ack behavior in receive().
Thanks Gary,
I see the behavior as you said, as temporary queue requires Ack, I assume the autoAck is set to true on the channel.
Powered by vBulletin® Version 4.2.1 Copyright © 2013 vBulletin Solutions, Inc. All rights reserved.