Results 1 to 7 of 7

Thread: PriorityChannel Gotcha! Messages with same priority are not processed in FIFO order

  1. #1
    Join Date
    May 2010
    Posts
    24

    Default PriorityChannel Gotcha! Messages with same priority are not processed in FIFO order

    We use PriorityChannels in our application, and during testing we discovered that PriorityChannel doesn't quite behave in the way we expected. As PriorityChannel is derived from QueueChannel, I assumed that the order in which messages are processed would be determined by the relative priority of the messages and their relative position on the queue. In other words, if two messages have the same priority, then the one that was placed on the queue first would be read off the queue first.

    But this is not the case - the messages are compared based on priority only, so messages with the same priority will be processed in an arbitrary order This came to light during our testing, when we saw some messages taking 15 minutes to process, but messages received immediately before and after took less than a second to process!

    Now, this may be by design - I note that PriorityChannel is backed by PriorityBlockingQueue, and in the JavaDoc for that class, it explicitly states that the class "makes no guarantees about the ordering of elements with equal priority" (and it even provides a code excerpt that can be used to provide priority+FIFO (first-in, first-out) ordering).

    However, I don't think this behaviour is particularly intuitive (or practical). At the very least, I think the documentation (JavaDoc and reference manual) should be explicit about how messages will be ordered when using PriorityChannel. Better still, I think that the default behaviour should be to order messages by both priority and their position on the queue.

    Regards
    -Matt

  2. #2
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,037

    Post

    Please refer to the reference manual, wherin it states...

    ...However, for custom priority determination logic, a comparator of type Comparator<Message<?>> can be provided to the PriorityChannel's constructor. ...

    And, further, ...

    Code:
    <channel id="priorityChannel" datatype="example.Widget">
        <priority-queue comparator="widgetComparator" capacity="10"/>
    </channel>

  3. #3
    Join Date
    May 2010
    Posts
    24

    Default

    Gary, Thanks for your reply. I still think it would be helpful to state explicitly that messages with the same priority will be received in an arbitrary order. To me, anything that is based on a queue implies FIFO behaviour (all other things being equal), so the behaviour in this case seems counter-intuitive to me.

    Regards
    -Matt

  4. #4
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,843

    Default

    Matt, you might need to argue that case to Doug Lea and Josh Bloch

    We are simply delegating to java.util.PriorityQueue under the covers. Here's a relevant excerpt from its JavaDoc:
    Code:
     * <p>The <em>head</em> of this queue is the <em>least</em> element
     * with respect to the specified ordering.  If multiple elements are
     * tied for least value, the head is one of those elements -- ties are
     * broken arbitrarily.
    -Mark

  5. #5
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,843

    Default

    Sorry, I should have read the entire post before commenting I realize that you did mention the underlying JavaDoc, and I also realized that I accidentally quoted PriorityQueue and not PriorityBlockingQueue. So, this is actually the relevant JavaDoc excerpt that you were referring to I presume:
    Code:
     * <p>Operations on this class make no guarantees about the ordering
     * of elements with equal priority. If you need to enforce an
     * ordering, you can define custom classes or comparators that use a
     * secondary key to break ties in primary priority values.  For
     * example, here is a class that applies first-in-first-out
     * tie-breaking to comparable elements. To use it, you would insert a
     * <tt>new FIFOEntry(anEntry)</tt> instead of a plain entry object.
    What Gary is suggesting is that you could implement that Comparator and provide it. I guess we could consider providing such a Comparator implementation within Spring Integration so that others may use it as well. I don't think we would want to assume that every provided Comparator should be wrapped though. As long as we provide the class that someone can configure as a bean (with a targetComparator property or something for the "primary" ordering), then that can always be defined as a bean itself. Thoughts?

  6. #6
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,037

    Default

    One question is what to use as the secondary key in the comparator.

    In all but the very simplest case, $timestamp won't work because messages may arrive at the channel in a different order to that in which the messages were originated. Reading Matt's original question, he wants the arrival sequence to be preserved (within priority).

    Wouldn't this need some header-enrichment immediately upstream of the channel; either to add a simple incrementing sequence header, or a new timestamp?

  7. #7
    Join Date
    May 2010
    Posts
    24

    Default

    Hi - this might not be suited to a framework solution, but here's what I did for this. I created a FIFOMessage class (see extract 1 below) that maintains a global sequence number. Then I have created my own subclass of QueueChannel (see extract 2 below) that wraps my message inside a FIFOMessage on a send, and unwraps it on a receive.

    You may ask why I have created my own class derived from QueueChannel - the reason is that I had already created this class as a workaround for a different issue (which is fixed in 1.0.5, but this is not released yet). See http://forum.springsource.org/showthread.php?t=92699 for details.

    Regards
    -Matt

    Code extract 1:
    Code:
    /**
     * A subclass of Message that maintains a sequence number as well as priority.
     * This is to enable messages to be processed in order of priority, but if
     * two messages have the same priority, then they will be ordered based on
     * their position in the queue (as determined by the sequence number).
     * In other words, messages with the same priority will be processed in
     * first-in, first-out (FIFO) order.
     *
     * @author carlmat
     *
     * @param <T>
     */
    @SuppressWarnings("serial")
    public class FIFOMessage<T> extends GenericMessage<T> implements Comparable<FIFOMessage<T>> {
    	final static AtomicLong seq = new AtomicLong();
    	final long seqNum;
    	
    	public FIFOMessage(T payload, MessagePriority priority) {
    		super(payload, getHeadersMap(priority));
    		seqNum = seq.getAndIncrement();		
    	}
    	
    	private static Map<String,Object> getHeadersMap(MessagePriority priority) {
    		Map<String, Object> headers = new HashMap<String,Object>();
    		headers.put(MessageHeaders.PRIORITY, priority);
    		return headers;
    	}
    
    	@Override
    	public int compareTo(FIFOMessage<T> other) {
    		// compare the priority of the messages first
    		MessagePriority priority1 = this.getHeaders().getPriority();
    		MessagePriority priority2 = other.getHeaders().getPriority();
    		priority1 = priority1 != null ? priority1 : MessagePriority.NORMAL;
    		priority2 = priority2 != null ? priority2 : MessagePriority.NORMAL;
    		int res = priority1.compareTo(priority2);
    
    		// for two messages of the same priority, compare their sequence number
    		if (res == 0 && other != this)
    			res = (seqNum < other.seqNum ? -1 : 1);
    		return res;
    	}
    }
    Code extract 2:
    Code:
    public class PriorityBlockingChannel extends QueueChannel {
    ...
    	@Override
    	protected boolean doSend(Message<?> message, long timeout) {
    		if (!acquirePermitIfNecessary(timeout)) {
    			return false;
    		}
    		
    		log.debug("Channel {} - sending message with timeout {}", this.getName(), timeout);
    		
    		FIFOMessage<Message<?>> fifoMessage = new FIFOMessage<Message<?>>(message, message.getHeaders().getPriority());
    		boolean ret = super.doSend(fifoMessage, 0);
    		
    		return ret;
    	}
    
    	@Override
    	protected Message<?> doReceive(long timeout) {
    		FIFOMessage<Message<?>> fifoMessage = (FIFOMessage<Message<?>>) super.doReceive(timeout);
    		if ( fifoMessage != null ) {
    			Message<?> message = fifoMessage.getPayload();
    			this.releasePermitIfNecessary();
    			return message;
    		}
    		
    		return null;
    	}
    ...
    }

Tags for this Thread

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •