Community   SpringSource   Projects    Downloads    Documentation    Forums    Training   Exchange   Blogs

Go Back   Spring Community Forums > Core Spring Projects > Spring Integration

Reply
 
Thread Tools Display Modes
  #1  
Old Feb 28th, 2008, 08:23 AM
amsmota amsmota is offline
Senior Member
 
Join Date: Feb 2008
Location: Dublin - Ireland
Posts: 101
Default Random Access Queue

I'm in the process of extending AbstractMessageChannel to obtain a Random Access Queue, is there something similar under way? Any suggestions on this? Should I publish the code in here?

It's nothing complicated, though, just a simple wrapper around a Map.
Reply With Quote
  #2  
Old Feb 28th, 2008, 08:52 AM
Mark Fisher Mark Fisher is offline
Senior Member
Spring Team
 
Join Date: Oct 2005
Location: Boston, MA
Posts: 1,355
Default

Can you elaborate a bit on the motivation/use-case?

I'm curious if it's something that could be handled with a ChannelInterceptor or ChannelPurger.

-Mark
__________________
Mark Fisher
Senior Software Engineer, SpringSource
Spring Integration Lead
http://www.springsource.com
http://www.springsource.org/spring-integration
http://blog.springsource.com/main/author/markf
Reply With Quote
  #3  
Old Feb 28th, 2008, 09:34 AM
amsmota amsmota is offline
Senior Member
 
Join Date: Feb 2008
Location: Dublin - Ireland
Posts: 101
Default

Well, as I said in another post, I'm building a "service layer" based on REST to our in-house framework. In it I can have (at least) 3 types of invocations:

1) Fire & Forget
2) Request-and-wait-for-Response
3) Request-and-get-Id
4) get the response by id from 3)

So for 3) and 4) I can have something like

A) GET /slayer/reports/reportABC -> 8e596fd2-216e-4015-8609-463f3b83a253

B) GET /slayer/reports -> list of reports on the "reports" queue not yet consumed

C) GET /slayer/reports/8e596fd2-216e-4015-8609-463f3b83a253 -> the report from A)

So I have to access the queue in a random way.

What do you think?
Reply With Quote
  #4  
Old Feb 28th, 2008, 01:25 PM
amsmota amsmota is offline
Senior Member
 
Join Date: Feb 2008
Location: Dublin - Ireland
Posts: 101
Default

BTW, how do I define what type of channel to use? Is using the datatype?

<channel id="responseChannel" datatype="XXXXXXXX"/>


And for other completely different thing, since I updated spring integration from trunk today I always get

BeanDefinitionParsingException: Configuration problem: Unable to locate Spring NamespaceHandler for XML schema namespace [http://www.springframework.org/schema/integration]

Have I missed anything?

Thanks.
Reply With Quote
  #5  
Old Feb 28th, 2008, 01:37 PM
Mark Fisher Mark Fisher is offline
Senior Member
Spring Team
 
Join Date: Oct 2005
Location: Boston, MA
Posts: 1,355
Default

The "datatype" attribute is for the accepted types of Message payload (see: ttp://static.springframework.org/spring-integration/reference/htmlsingle/spring-integration-reference.html#namespace-channel)

When using <channel/> currently it will always create a SimpleChannel (with different internal queue based on the capacity you provide). If you want to use a PriorityChannel, you would need to define a generic <bean/> at this point (the namespace support for that is on the M3 roadmap).

As far as the XSD resolution, can you post your xmlns declarations from the top-level element? (or try this... it was temporarily expecting "spring-integration-adapters-1.0.xsd" but now should be just "spring-integration-1.0.xsd").

Mark
__________________
Mark Fisher
Senior Software Engineer, SpringSource
Spring Integration Lead
http://www.springsource.com
http://www.springsource.org/spring-integration
http://blog.springsource.com/main/author/markf
Reply With Quote
  #6  
Old Feb 28th, 2008, 03:46 PM
amsmota amsmota is offline
Senior Member
 
Join Date: Feb 2008
Location: Dublin - Ireland
Posts: 101
Default

Ok, no problem with that. But I have another question. (moved to here)

Last edited by amsmota; Mar 3rd, 2008 at 10:41 AM.
Reply With Quote
  #7  
Old Mar 4th, 2008, 09:33 AM
amsmota amsmota is offline
Senior Member
 
Join Date: Feb 2008
Location: Dublin - Ireland
Posts: 101
Default

I don't know if it's of general interest, but here's a draft of a RandomAccessChannel, that wraps a Random Access Queue.

I have 3 doubts about this, anyhow:

1) What should I do with the timeouts (see TODO's) ?
2) Should I enforce Map<String, Message<?>> instead of Map<Object, Message<?>> ?
3) What is the Java version dependency of Spring Integration? I was thinking about using a ConcurrentMap instead of a Map, but that implies Java 1.5.

Here's the code so far:

Code:
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Vector;

import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.channel.DispatcherPolicy;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.selector.MessageSelector;
import org.springframework.util.Assert;

public class RandomAccessChannel extends AbstractMessageChannel{
	
	private final Map<Object, Message<?>> queue; 
	
	public RandomAccessChannel() {
		this(null, null);
	}
	
	public RandomAccessChannel(Map<Object, Message<?>> queue) {
		this(null, queue);
	}

	public RandomAccessChannel(DispatcherPolicy dispatcherPolicy) {
		this(dispatcherPolicy, null);
	}
	
	public RandomAccessChannel(DispatcherPolicy dispatcherPolicy, Map<Object, Message<?>> queue) {
		super((dispatcherPolicy != null) ? dispatcherPolicy : new DispatcherPolicy());
		this.queue = (queue != null) ? queue : new Hashtable<Object, Message<?>>();

	}
	
	@Override
	protected boolean doSend(Message<?> message, long timeout) {
		Assert.notNull(message, "'message' must not be null");
		//what TODO with the timeout?
		return doSend(message);

	}
	
	@Override
	protected Message<?> doReceive(long timeout) {
		//what TODO with the timeout?
		String key = queue.keySet().toArray()[0].toString();
		return doReceive(key);
	}

	public List<Message<?>> clear() {
		List<Message<?>> list = new Vector<Message<?>>(this.queue.values());
		queue.clear();
		return list;
	}

	public List<Message<?>> purge(MessageSelector selector) {
		List<Message<?>> list = new Vector<Message<?>>();
		for(Message<?> message: queue.values()){
			if(!selector.accept(message)){
				list.add(message);
				queue.remove(message.getHeader().getCorrelationId().toString());
			}
		}
		return list;
	}

	
	public boolean doSend(Message<?> message) {
		Assert.notNull(message, "'message' must not be null");
		this.queue.put(message.getHeader().getCorrelationId().toString(), message);
		return true;
	}
	
	public Message<?> doReceive(Object id) {
		return this.queue.remove(id);
	}
	
	public Message<?> peek(Object id) {
		return this.queue.get(id);
	}
	
	public boolean containsMessageForId(Object key){
		return this.queue.containsKey(key);
	}

	public Set<Object> iterator() {
		return queue.keySet();
	}
	
}

Last edited by amsmota; Mar 4th, 2008 at 09:57 AM.
Reply With Quote
  #8  
Old Mar 18th, 2008, 06:37 AM
iwein iwein is offline
Senior Member
 
Join Date: May 2007
Location: Netherlands
Posts: 488
Default

I'm not sure, but it smells to me like you're trying to give the channel a responsibility that is not related to channeling.

If you create a registry that stores the state of the reports you can influence that registry from a ChannelInterceptor. That would decouple your businesslogic (of keeping track of the state of the message) from the Channel.

In essence you would just copy the code that is your business from the RAQ to your registry and glue it to the channel with a ChIn. Et voila, your business code is independent from Spring again.

I'd have to check, but maybe there is already a MethodInvokingChannelInterceptor?
__________________
Iwein Fuld
Check out the first Spring Integration book
Reply With Quote
  #9  
Old Mar 18th, 2008, 08:17 AM
amsmota amsmota is offline
Senior Member
 
Join Date: Feb 2008
Location: Dublin - Ireland
Posts: 101
Default

I think I don't understand your reply. It seems to me that I don't have any business logic in my use case. Let me try to explain myself a little better.

I have two queues, a "request" queue and a "response" queue. The user-agent put a request in the request queue and gets back the messageId og the message generated on that queue. The user can then query the response queue using that messageId whenever he wants. Of course a message with that Id can exist or not on the response queue at a given time. Or he can obtain a list off all the messages that are in the response queue and see if the message with a corresponding Id already exists in there.

Now the message in the request queue is consumed eventually by a service, that makes whatever it needs, taking the time it needs (in my case it seems that can be some long period), and when it finishes it puts its results in a message with correlationId equal to the consumed messageId in the response queue. Once it's there it can be consumed by the user-agent through the original messageId.

It's for this last "response queue" that I need the RAQ channel.

So I don't have any "status" of messages, just a response queue with messages to be consumed at any time after they are put there, that can be consumed through a "key".

Now I don't know if this is a clear explanation, so bare with me...
Reply With Quote
  #10  
Old Mar 18th, 2008, 09:13 AM
iwein iwein is offline
Senior Member
 
Join Date: May 2007
Location: Netherlands
Posts: 488
Default

Yes, the term 'business logic' was out of order. But I do think I understand.

The RAQ is responsible for two things:
1. transporting messages with the result of the slow service
2. holding them for a user agent to query by id

I'm looking for a better place for the second responsibility.
__________________
Iwein Fuld
Check out the first Spring Integration book
Reply With Quote
Reply

Thread Tools
Display Modes

Posting Rules
You may not post new threads
You may not post replies
You may not post attachments
You may not edit your posts

BB code is On
Smilies are On
[IMG] code is On
HTML code is Off

Forum Jump


All times are GMT -5. The time now is 08:54 AM.


Contegix provides first-class managed hosting and partial sponsorship of these forums.

Powered by vBulletin® Version 3.8.4
Copyright ©2000 - 2010, Jelsoft Enterprises Ltd.