Page 1 of 3 123 LastLast
Results 1 to 10 of 27

Thread: Random Access Queue

  1. #1
    Join Date
    Feb 2008
    Location
    Dublin - Ireland
    Posts
    102

    Default Random Access Queue

    I'm in the process of extending AbstractMessageChannel to obtain a Random Access Queue, is there something similar under way? Any suggestions on this? Should I publish the code in here?

    It's nothing complicated, though, just a simple wrapper around a Map.

  2. #2
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,840

    Default

    Can you elaborate a bit on the motivation/use-case?

    I'm curious if it's something that could be handled with a ChannelInterceptor or ChannelPurger.

    -Mark

  3. #3
    Join Date
    Feb 2008
    Location
    Dublin - Ireland
    Posts
    102

    Default

    Well, as I said in another post, I'm building a "service layer" based on REST to our in-house framework. In it I can have (at least) 3 types of invocations:

    1) Fire & Forget
    2) Request-and-wait-for-Response
    3) Request-and-get-Id
    4) get the response by id from 3)

    So for 3) and 4) I can have something like

    A) GET /slayer/reports/reportABC -> 8e596fd2-216e-4015-8609-463f3b83a253

    B) GET /slayer/reports -> list of reports on the "reports" queue not yet consumed

    C) GET /slayer/reports/8e596fd2-216e-4015-8609-463f3b83a253 -> the report from A)

    So I have to access the queue in a random way.

    What do you think?

  4. #4
    Join Date
    Feb 2008
    Location
    Dublin - Ireland
    Posts
    102

    Default

    BTW, how do I define what type of channel to use? Is using the datatype?

    <channel id="responseChannel" datatype="XXXXXXXX"/>


    And for other completely different thing, since I updated spring integration from trunk today I always get

    BeanDefinitionParsingException: Configuration problem: Unable to locate Spring NamespaceHandler for XML schema namespace [http://www.springframework.org/schema/integration]

    Have I missed anything?

    Thanks.

  5. #5
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,840

    Default

    The "datatype" attribute is for the accepted types of Message payload (see: ttp://static.springframework.org/spring-integration/reference/htmlsingle/spring-integration-reference.html#namespace-channel)

    When using <channel/> currently it will always create a SimpleChannel (with different internal queue based on the capacity you provide). If you want to use a PriorityChannel, you would need to define a generic <bean/> at this point (the namespace support for that is on the M3 roadmap).

    As far as the XSD resolution, can you post your xmlns declarations from the top-level element? (or try this... it was temporarily expecting "spring-integration-adapters-1.0.xsd" but now should be just "spring-integration-1.0.xsd").

    Mark

  6. #6
    Join Date
    Feb 2008
    Location
    Dublin - Ireland
    Posts
    102

    Default

    Ok, no problem with that. But I have another question. (moved to here)
    Last edited by amsmota; Mar 3rd, 2008 at 09:41 AM.

  7. #7
    Join Date
    Feb 2008
    Location
    Dublin - Ireland
    Posts
    102

    Default

    I don't know if it's of general interest, but here's a draft of a RandomAccessChannel, that wraps a Random Access Queue.

    I have 3 doubts about this, anyhow:

    1) What should I do with the timeouts (see TODO's) ?
    2) Should I enforce Map<String, Message<?>> instead of Map<Object, Message<?>> ?
    3) What is the Java version dependency of Spring Integration? I was thinking about using a ConcurrentMap instead of a Map, but that implies Java 1.5.

    Here's the code so far:

    Code:
    import java.util.Hashtable;
    import java.util.List;
    import java.util.Map;
    import java.util.Set;
    import java.util.Vector;
    
    import org.springframework.integration.channel.AbstractMessageChannel;
    import org.springframework.integration.channel.DispatcherPolicy;
    import org.springframework.integration.message.Message;
    import org.springframework.integration.message.selector.MessageSelector;
    import org.springframework.util.Assert;
    
    public class RandomAccessChannel extends AbstractMessageChannel{
    	
    	private final Map<Object, Message<?>> queue; 
    	
    	public RandomAccessChannel() {
    		this(null, null);
    	}
    	
    	public RandomAccessChannel(Map<Object, Message<?>> queue) {
    		this(null, queue);
    	}
    
    	public RandomAccessChannel(DispatcherPolicy dispatcherPolicy) {
    		this(dispatcherPolicy, null);
    	}
    	
    	public RandomAccessChannel(DispatcherPolicy dispatcherPolicy, Map<Object, Message<?>> queue) {
    		super((dispatcherPolicy != null) ? dispatcherPolicy : new DispatcherPolicy());
    		this.queue = (queue != null) ? queue : new Hashtable<Object, Message<?>>();
    
    	}
    	
    	@Override
    	protected boolean doSend(Message<?> message, long timeout) {
    		Assert.notNull(message, "'message' must not be null");
    		//what TODO with the timeout?
    		return doSend(message);
    
    	}
    	
    	@Override
    	protected Message<?> doReceive(long timeout) {
    		//what TODO with the timeout?
    		String key = queue.keySet().toArray()[0].toString();
    		return doReceive(key);
    	}
    
    	public List<Message<?>> clear() {
    		List<Message<?>> list = new Vector<Message<?>>(this.queue.values());
    		queue.clear();
    		return list;
    	}
    
    	public List<Message<?>> purge(MessageSelector selector) {
    		List<Message<?>> list = new Vector<Message<?>>();
    		for(Message<?> message: queue.values()){
    			if(!selector.accept(message)){
    				list.add(message);
    				queue.remove(message.getHeader().getCorrelationId().toString());
    			}
    		}
    		return list;
    	}
    
    	
    	public boolean doSend(Message<?> message) {
    		Assert.notNull(message, "'message' must not be null");
    		this.queue.put(message.getHeader().getCorrelationId().toString(), message);
    		return true;
    	}
    	
    	public Message<?> doReceive(Object id) {
    		return this.queue.remove(id);
    	}
    	
    	public Message<?> peek(Object id) {
    		return this.queue.get(id);
    	}
    	
    	public boolean containsMessageForId(Object key){
    		return this.queue.containsKey(key);
    	}
    
    	public Set<Object> iterator() {
    		return queue.keySet();
    	}
    	
    }
    Last edited by amsmota; Mar 4th, 2008 at 08:57 AM.

  8. #8
    Join Date
    May 2007
    Location
    Netherlands
    Posts
    614

    Default

    I'm not sure, but it smells to me like you're trying to give the channel a responsibility that is not related to channeling.

    If you create a registry that stores the state of the reports you can influence that registry from a ChannelInterceptor. That would decouple your businesslogic (of keeping track of the state of the message) from the Channel.

    In essence you would just copy the code that is your business from the RAQ to your registry and glue it to the channel with a ChIn. Et voila, your business code is independent from Spring again.

    I'd have to check, but maybe there is already a MethodInvokingChannelInterceptor?

  9. #9
    Join Date
    Feb 2008
    Location
    Dublin - Ireland
    Posts
    102

    Default

    I think I don't understand your reply. It seems to me that I don't have any business logic in my use case. Let me try to explain myself a little better.

    I have two queues, a "request" queue and a "response" queue. The user-agent put a request in the request queue and gets back the messageId og the message generated on that queue. The user can then query the response queue using that messageId whenever he wants. Of course a message with that Id can exist or not on the response queue at a given time. Or he can obtain a list off all the messages that are in the response queue and see if the message with a corresponding Id already exists in there.

    Now the message in the request queue is consumed eventually by a service, that makes whatever it needs, taking the time it needs (in my case it seems that can be some long period), and when it finishes it puts its results in a message with correlationId equal to the consumed messageId in the response queue. Once it's there it can be consumed by the user-agent through the original messageId.

    It's for this last "response queue" that I need the RAQ channel.

    So I don't have any "status" of messages, just a response queue with messages to be consumed at any time after they are put there, that can be consumed through a "key".

    Now I don't know if this is a clear explanation, so bare with me...

  10. #10
    Join Date
    May 2007
    Location
    Netherlands
    Posts
    614

    Default

    Yes, the term 'business logic' was out of order. But I do think I understand.

    The RAQ is responsible for two things:
    1. transporting messages with the result of the slow service
    2. holding them for a user agent to query by id

    I'm looking for a better place for the second responsibility.

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •