Hi - I am working on an application that uses priority channels, e.g.
During load testing, I have noticed several occurrences of the following exception:Code:<si:channel id="inbound_soi_lookup"> <si:priority-queue capacity="100"/> </si:channel>
This is happening when the priority channel's internal queue is already at maximum capacity, and another message is sent to the channel. The call to send() returns false because acquirePermitIfNecessary in PriorityChannel.doSend() returns false:Code:16:01:52.486 [subscriptionService-3] ERROR o.s.i.handler.LoggingHandler - org.springframework.integration.message.MessageDeliveryException: Router failed to send to channel: inbound_soi_lookup at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:118) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:59) at org.springframework.integration.endpoint.PollingConsumer.doPoll(PollingConsumer.java:59) at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.innerPoll(AbstractPollingEndpoint.java:232) at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.poll(AbstractPollingEndpoint.java:216) at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.access$0(AbstractPollingEndpoint.java:213) at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:204) at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:49) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:619)
I also tried changing the PriorityChannels to regular QueueChannels to verify that the issue was related to priority channels in particular. When I did this, the exception did not occur. Presumably this is because when using a QueueChannel, the call to send() will block until there is space on the queue. This behaviour is noted in the reference manual:Code:@Override protected boolean doSend(Message<?> message, long timeout) { if (!acquirePermitIfNecessary(timeout)) { return false; } return super.doSend(message, 0); }
The behaviour described above is what I was expecting would happen with the PriorityChannels as well (eventually leading to a backlog of messages on the 'external' MQ queue). Is there a particular reason why PriorityChannel cannot be implemented so that it also blocks on send() when the queue is full? Can you suggest a workaround for this issue, other than increasing the capacity on the queue?A channel that has not reached its capacity limit will store messages in its internal queue, and the send() method will return immediately even if no receiver is ready to handle the message. If the queue has reached capacity, then the sender will block until room is available.
Thanks
-Matt


Reply With Quote
