I don't know if it's of general interest, but here's a draft of a RandomAccessChannel, that wraps a Random Access Queue.
I have 3 doubts about this, anyhow:
1) What should I do with the timeouts (see TODO's) ?
2) Should I enforce Map<String, Message<?>> instead of Map<Object, Message<?>> ?
3) What is the Java version dependency of Spring Integration? I was thinking about using a ConcurrentMap instead of a Map, but that implies Java 1.5.
Here's the code so far:
Code:
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Vector;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.channel.DispatcherPolicy;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.selector.MessageSelector;
import org.springframework.util.Assert;
public class RandomAccessChannel extends AbstractMessageChannel{
private final Map<Object, Message<?>> queue;
public RandomAccessChannel() {
this(null, null);
}
public RandomAccessChannel(Map<Object, Message<?>> queue) {
this(null, queue);
}
public RandomAccessChannel(DispatcherPolicy dispatcherPolicy) {
this(dispatcherPolicy, null);
}
public RandomAccessChannel(DispatcherPolicy dispatcherPolicy, Map<Object, Message<?>> queue) {
super((dispatcherPolicy != null) ? dispatcherPolicy : new DispatcherPolicy());
this.queue = (queue != null) ? queue : new Hashtable<Object, Message<?>>();
}
@Override
protected boolean doSend(Message<?> message, long timeout) {
Assert.notNull(message, "'message' must not be null");
//what TODO with the timeout?
return doSend(message);
}
@Override
protected Message<?> doReceive(long timeout) {
//what TODO with the timeout?
String key = queue.keySet().toArray()[0].toString();
return doReceive(key);
}
public List<Message<?>> clear() {
List<Message<?>> list = new Vector<Message<?>>(this.queue.values());
queue.clear();
return list;
}
public List<Message<?>> purge(MessageSelector selector) {
List<Message<?>> list = new Vector<Message<?>>();
for(Message<?> message: queue.values()){
if(!selector.accept(message)){
list.add(message);
queue.remove(message.getHeader().getCorrelationId().toString());
}
}
return list;
}
public boolean doSend(Message<?> message) {
Assert.notNull(message, "'message' must not be null");
this.queue.put(message.getHeader().getCorrelationId().toString(), message);
return true;
}
public Message<?> doReceive(Object id) {
return this.queue.remove(id);
}
public Message<?> peek(Object id) {
return this.queue.get(id);
}
public boolean containsMessageForId(Object key){
return this.queue.containsKey(key);
}
public Set<Object> iterator() {
return queue.keySet();
}
}