|
#1
|
|||
|
|||
|
I'm in the process of extending AbstractMessageChannel to obtain a Random Access Queue, is there something similar under way? Any suggestions on this? Should I publish the code in here?
It's nothing complicated, though, just a simple wrapper around a Map. |
|
#2
|
|||
|
|||
|
Can you elaborate a bit on the motivation/use-case?
I'm curious if it's something that could be handled with a ChannelInterceptor or ChannelPurger. -Mark
__________________
Mark Fisher Senior Software Engineer, SpringSource Spring Integration Lead http://www.springsource.com http://www.springsource.org/spring-integration http://blog.springsource.com/main/author/markf |
|
#3
|
|||
|
|||
|
Well, as I said in another post, I'm building a "service layer" based on REST to our in-house framework. In it I can have (at least) 3 types of invocations:
1) Fire & Forget 2) Request-and-wait-for-Response 3) Request-and-get-Id 4) get the response by id from 3) So for 3) and 4) I can have something like A) GET /slayer/reports/reportABC -> 8e596fd2-216e-4015-8609-463f3b83a253 B) GET /slayer/reports -> list of reports on the "reports" queue not yet consumed C) GET /slayer/reports/8e596fd2-216e-4015-8609-463f3b83a253 -> the report from A) So I have to access the queue in a random way. What do you think? |
|
#4
|
|||
|
|||
|
BTW, how do I define what type of channel to use? Is using the datatype?
<channel id="responseChannel" datatype="XXXXXXXX"/> And for other completely different thing, since I updated spring integration from trunk today I always get BeanDefinitionParsingException: Configuration problem: Unable to locate Spring NamespaceHandler for XML schema namespace [http://www.springframework.org/schema/integration] Have I missed anything? Thanks. |
|
#5
|
|||
|
|||
|
The "datatype" attribute is for the accepted types of Message payload (see: ttp://static.springframework.org/spring-integration/reference/htmlsingle/spring-integration-reference.html#namespace-channel)
When using <channel/> currently it will always create a SimpleChannel (with different internal queue based on the capacity you provide). If you want to use a PriorityChannel, you would need to define a generic <bean/> at this point (the namespace support for that is on the M3 roadmap). As far as the XSD resolution, can you post your xmlns declarations from the top-level element? (or try this... it was temporarily expecting "spring-integration-adapters-1.0.xsd" but now should be just "spring-integration-1.0.xsd"). Mark
__________________
Mark Fisher Senior Software Engineer, SpringSource Spring Integration Lead http://www.springsource.com http://www.springsource.org/spring-integration http://blog.springsource.com/main/author/markf |
|
#7
|
|||
|
|||
|
I don't know if it's of general interest, but here's a draft of a RandomAccessChannel, that wraps a Random Access Queue.
I have 3 doubts about this, anyhow: 1) What should I do with the timeouts (see TODO's) ? 2) Should I enforce Map<String, Message<?>> instead of Map<Object, Message<?>> ? 3) What is the Java version dependency of Spring Integration? I was thinking about using a ConcurrentMap instead of a Map, but that implies Java 1.5. Here's the code so far: Code:
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Vector;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.channel.DispatcherPolicy;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.selector.MessageSelector;
import org.springframework.util.Assert;
public class RandomAccessChannel extends AbstractMessageChannel{
private final Map<Object, Message<?>> queue;
public RandomAccessChannel() {
this(null, null);
}
public RandomAccessChannel(Map<Object, Message<?>> queue) {
this(null, queue);
}
public RandomAccessChannel(DispatcherPolicy dispatcherPolicy) {
this(dispatcherPolicy, null);
}
public RandomAccessChannel(DispatcherPolicy dispatcherPolicy, Map<Object, Message<?>> queue) {
super((dispatcherPolicy != null) ? dispatcherPolicy : new DispatcherPolicy());
this.queue = (queue != null) ? queue : new Hashtable<Object, Message<?>>();
}
@Override
protected boolean doSend(Message<?> message, long timeout) {
Assert.notNull(message, "'message' must not be null");
//what TODO with the timeout?
return doSend(message);
}
@Override
protected Message<?> doReceive(long timeout) {
//what TODO with the timeout?
String key = queue.keySet().toArray()[0].toString();
return doReceive(key);
}
public List<Message<?>> clear() {
List<Message<?>> list = new Vector<Message<?>>(this.queue.values());
queue.clear();
return list;
}
public List<Message<?>> purge(MessageSelector selector) {
List<Message<?>> list = new Vector<Message<?>>();
for(Message<?> message: queue.values()){
if(!selector.accept(message)){
list.add(message);
queue.remove(message.getHeader().getCorrelationId().toString());
}
}
return list;
}
public boolean doSend(Message<?> message) {
Assert.notNull(message, "'message' must not be null");
this.queue.put(message.getHeader().getCorrelationId().toString(), message);
return true;
}
public Message<?> doReceive(Object id) {
return this.queue.remove(id);
}
public Message<?> peek(Object id) {
return this.queue.get(id);
}
public boolean containsMessageForId(Object key){
return this.queue.containsKey(key);
}
public Set<Object> iterator() {
return queue.keySet();
}
}
Last edited by amsmota; Mar 4th, 2008 at 09:57 AM. |
|
#8
|
|||
|
|||
|
I'm not sure, but it smells to me like you're trying to give the channel a responsibility that is not related to channeling.
If you create a registry that stores the state of the reports you can influence that registry from a ChannelInterceptor. That would decouple your businesslogic (of keeping track of the state of the message) from the Channel. In essence you would just copy the code that is your business from the RAQ to your registry and glue it to the channel with a ChIn. Et voila, your business code is independent from Spring again. I'd have to check, but maybe there is already a MethodInvokingChannelInterceptor? |
|
#9
|
|||
|
|||
|
I think I don't understand your reply. It seems to me that I don't have any business logic in my use case. Let me try to explain myself a little better.
I have two queues, a "request" queue and a "response" queue. The user-agent put a request in the request queue and gets back the messageId og the message generated on that queue. The user can then query the response queue using that messageId whenever he wants. Of course a message with that Id can exist or not on the response queue at a given time. Or he can obtain a list off all the messages that are in the response queue and see if the message with a corresponding Id already exists in there. Now the message in the request queue is consumed eventually by a service, that makes whatever it needs, taking the time it needs (in my case it seems that can be some long period), and when it finishes it puts its results in a message with correlationId equal to the consumed messageId in the response queue. Once it's there it can be consumed by the user-agent through the original messageId. It's for this last "response queue" that I need the RAQ channel. So I don't have any "status" of messages, just a response queue with messages to be consumed at any time after they are put there, that can be consumed through a "key". Now I don't know if this is a clear explanation, so bare with me... |
|
#10
|
|||
|
|||
|
Yes, the term 'business logic' was out of order. But I do think I understand.
The RAQ is responsible for two things: 1. transporting messages with the result of the slow service 2. holding them for a user agent to query by id I'm looking for a better place for the second responsibility. |
![]() |
| Thread Tools | |
| Display Modes | |
|
|