Hi - this might not be suited to a framework solution, but here's what I did for this. I created a FIFOMessage class (see extract 1 below) that maintains a global sequence number. Then I have created my own subclass of QueueChannel (see extract 2 below) that wraps my message inside a FIFOMessage on a send, and unwraps it on a receive.
You may ask why I have created my own class derived from QueueChannel - the reason is that I had already created this class as a workaround for a different issue (which is fixed in 1.0.5, but this is not released yet). See http://forum.springsource.org/showthread.php?t=92699 for details.
Regards
-Matt
Code extract 1:
Code:
/**
* A subclass of Message that maintains a sequence number as well as priority.
* This is to enable messages to be processed in order of priority, but if
* two messages have the same priority, then they will be ordered based on
* their position in the queue (as determined by the sequence number).
* In other words, messages with the same priority will be processed in
* first-in, first-out (FIFO) order.
*
* @author carlmat
*
* @param <T>
*/
@SuppressWarnings("serial")
public class FIFOMessage<T> extends GenericMessage<T> implements Comparable<FIFOMessage<T>> {
final static AtomicLong seq = new AtomicLong();
final long seqNum;
public FIFOMessage(T payload, MessagePriority priority) {
super(payload, getHeadersMap(priority));
seqNum = seq.getAndIncrement();
}
private static Map<String,Object> getHeadersMap(MessagePriority priority) {
Map<String, Object> headers = new HashMap<String,Object>();
headers.put(MessageHeaders.PRIORITY, priority);
return headers;
}
@Override
public int compareTo(FIFOMessage<T> other) {
// compare the priority of the messages first
MessagePriority priority1 = this.getHeaders().getPriority();
MessagePriority priority2 = other.getHeaders().getPriority();
priority1 = priority1 != null ? priority1 : MessagePriority.NORMAL;
priority2 = priority2 != null ? priority2 : MessagePriority.NORMAL;
int res = priority1.compareTo(priority2);
// for two messages of the same priority, compare their sequence number
if (res == 0 && other != this)
res = (seqNum < other.seqNum ? -1 : 1);
return res;
}
}
Code extract 2:
Code:
public class PriorityBlockingChannel extends QueueChannel {
...
@Override
protected boolean doSend(Message<?> message, long timeout) {
if (!acquirePermitIfNecessary(timeout)) {
return false;
}
log.debug("Channel {} - sending message with timeout {}", this.getName(), timeout);
FIFOMessage<Message<?>> fifoMessage = new FIFOMessage<Message<?>>(message, message.getHeaders().getPriority());
boolean ret = super.doSend(fifoMessage, 0);
return ret;
}
@Override
protected Message<?> doReceive(long timeout) {
FIFOMessage<Message<?>> fifoMessage = (FIFOMessage<Message<?>>) super.doReceive(timeout);
if ( fifoMessage != null ) {
Message<?> message = fifoMessage.getPayload();
this.releasePermitIfNecessary();
return message;
}
return null;
}
...
}