Hello, I have a problem getting responses back from an async gateway.
The gateway looks like this:
The xml configuration is pretty basic, the essential parts being:Code:public interface JobServiceGateway { @Gateway(requestChannel="xmlTestProcessingRequestChannel") Future<String> sendMessageAndGetReply(String message) throws Exception; }
(The server)
(The client)Code:<int:channel id="xmlProcessingRequestChannel" /> <int-amqp:inbound-gateway id="amqpInboundGateway" request-channel="xmlProcessingRequestChannel" connection-factory="connectionFactory" queue-names="myproduct.xmlProcessingRequestQueue" concurrent-consumers="3" />
The problem is that if I begin pumping in tasks like this:Code:<int:gateway id="jobServiceGateway" service-interface="com.myproduct.JobServiceGateway"/> <int:channel id="xmlTestProcessingRequestChannel"/> <int:channel id="pdfTestDeliveryChannel"/> <int-amqp:outbound-gateway request-channel="xmlTestProcessingRequestChannel" exchange-name="xml-request-delivery" routing-key-expression="'xml-request.number'" amqp-template="amqpTemplate" /> ...Code for the xml-request-delivery exchange omitted...
And then wait for the tasks to complete:Code:List<FutureTask> futures = new ArrayList<FutureTask>(); for (int i=0; i<50; i++) { System.out.println(new Date()+ ": Sending message " + i); Future<String> response = jobServiceGateway.sendMessageAndGetReply("Hello Message #" + i); futures.add(new FutureTask(String.valueOf(i),response)); }
What happens is the following:Code:for (FutureTask future: futures) { try { System.out.println(new Date()+ "Getting response for " + future); System.out.println(new Date()+ " Response = " + future.getFuture().get(3000, TimeUnit.MILLISECONDS)); } catch (TimeoutException e) { System.err.println(new Date()+ " TIMEOUT while waiting for " + future); } }
- The server receives all the tasks, 3 at a time (concurrent-consumers=3). I used a synthetic delay of 2 seconds processing each task, so each bunch of 3 tasks takes 2 seconds to complete. Then it gets 3 new tasks, processes them - etc.
- The client receives responses to *some* tasks, while others time out. This happens even if I insert a Thread.sleep(total time to process 50 tasks, 2 seconds per task, 3 at a time) between sending the messages and starting my process-responses loop.
What is happening to the responses that are generated, but somehow never received by the client?
I use RabbitMQ as the message middleware, and in the web interface the queue becomes empty as soon as the last task has been processed, and before the "receive-loop" starts processing task responses. Where are the responses going when they disappear out of RabbitMQ?


Reply With Quote
