Page 1 of 2 12 LastLast
Results 1 to 10 of 11

Thread: Send on a PriorityChannel does not block

  1. #1
    Join Date
    May 2010
    Posts
    24

    Default Send on a PriorityChannel does not block

    Hi - I am working on an application that uses priority channels, e.g.

    Code:
    	<si:channel id="inbound_soi_lookup">
    		<si:priority-queue capacity="100"/>
    	</si:channel>
    During load testing, I have noticed several occurrences of the following exception:

    Code:
    16:01:52.486 [subscriptionService-3] ERROR o.s.i.handler.LoggingHandler - org.springframework.integration.message.MessageDeliveryException: Router failed to send to channel: inbound_soi_lookup
    	at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:118)
    	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:59)
    	at org.springframework.integration.endpoint.PollingConsumer.doPoll(PollingConsumer.java:59)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.innerPoll(AbstractPollingEndpoint.java:232)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.poll(AbstractPollingEndpoint.java:216)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.access$0(AbstractPollingEndpoint.java:213)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:204)
    	at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:49)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    	at java.lang.Thread.run(Thread.java:619)
    This is happening when the priority channel's internal queue is already at maximum capacity, and another message is sent to the channel. The call to send() returns false because acquirePermitIfNecessary in PriorityChannel.doSend() returns false:

    Code:
    	@Override
    	protected boolean doSend(Message<?> message, long timeout) {
    		if (!acquirePermitIfNecessary(timeout)) {
    			return false;
    		}
    		return super.doSend(message, 0);
    	}
    I also tried changing the PriorityChannels to regular QueueChannels to verify that the issue was related to priority channels in particular. When I did this, the exception did not occur. Presumably this is because when using a QueueChannel, the call to send() will block until there is space on the queue. This behaviour is noted in the reference manual:

    A channel that has not reached its capacity limit will store messages in its internal queue, and the send() method will return immediately even if no receiver is ready to handle the message. If the queue has reached capacity, then the sender will block until room is available.
    The behaviour described above is what I was expecting would happen with the PriorityChannels as well (eventually leading to a backlog of messages on the 'external' MQ queue). Is there a particular reason why PriorityChannel cannot be implemented so that it also blocks on send() when the queue is full? Can you suggest a workaround for this issue, other than increasing the capacity on the queue?

    Thanks
    -Matt

  2. #2
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,844

    Default

    The call to acquirePermitIfNecessary(timeout) should be blocking. What are you passing as the timeout value?

  3. #3
    Join Date
    May 2010
    Posts
    24

    Default

    Hi Mark - I have not defined any specific timeout for the channel, so the timeout parameter for acquirePermitIfNecessary has a value of -1.

    Rgds
    -Matt

  4. #4
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,844

    Default

    Matt, what version are you using?

  5. #5
    Join Date
    May 2010
    Posts
    24

    Default

    Version 1.0.4

    Thanks
    Matt

  6. #6
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,844

    Default

    Matt,

    Can you open an issue for this? The problem is that we normally interpret -1 as an "indefinite" timeout value (as on QueueChannel), but in this case we are passing that value directly to tryAcquire on the Semaphore. The javadoc from that method explains why this is not working as you expect:
    Code:
      * <p>If the specified waiting time elapses then the value {@code false}
      * is returned.  If the time is less than or equal to zero, the method
      * will not wait at all.
    Thanks,
    Mark

  7. #7
    Join Date
    May 2010
    Posts
    24

    Default

    Hi Mark - this is now raised as issue 1275: http://jira.springframework.org/browse/INT-1275. Can you suggest a workaround that we could use in the meantime?

    Thanks
    -Matt

  8. #8
    Join Date
    Jun 2005
    Posts
    4,232

    Default

    From your stack trace it looks like you can set the timeout in the router. Does that work (if you set it to something long that is)?
    Last edited by Dave Syer; Jul 27th, 2010 at 05:36 AM. Reason: clarification

  9. #9
    Join Date
    May 2010
    Posts
    24

    Default

    Hi Dave -- yes, I could try to avoid it by setting a timeout or increasing the queue capacity, but there's still a risk of the condition occurring and therefore one or more messages failing to process. Ideally, I want the same blocking behaviour as a standard QueueChannel. So, as a workaround, I've created a new class PriorityBlockingChannel that calls Semaphore.acquire() instead of Semaphore.tryAcquire(timeout) when timeout is <= 0 (see below)

    This seems to do the trick, but let me know if you think there will be any problems with this approach.

    Thanks
    -Matt

    Code:
    ...
    
    public class PriorityBlockingChannel extends QueueChannel {
    	private final Semaphore semaphore;
    
    ...
    
    	@Override
    	protected boolean doSend(Message<?> message, long timeout) {
    		if (!acquirePermitIfNecessary(timeout)) {
    			return false;
    		}
    		return super.doSend(message, 0);
    	}
    
    ...
    
    	private boolean acquirePermitIfNecessary(long timeoutInMilliseconds) {
    		if (this.semaphore != null) {
    			try {
    				if (timeoutInMilliseconds > 0) {
    					return this.semaphore.tryAcquire(timeoutInMilliseconds, TimeUnit.MILLISECONDS);
    				}
    				else {
    					this.semaphore.acquire();
    					return true;
    				}
    			}
    			catch (InterruptedException e) {
    				Thread.currentThread().interrupt();
    				return false;
    			}
    		}
    		return true;
    	}
    
    ...
    
    }

  10. #10
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,844

    Default

    Thanks again for raising the issue. I've fixed it on trunk as a well as the 1.0.x branch. The 2.0 M6 release will be out very soon (probably today), and 1.0.5 will likely be out within days or a week.

Tags for this Thread

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •