Asynchronous communication can be handy to decouple components, and JMS was made for this. Only I find JMS to heavy if I need channels/pipes-filters in my system and I don`t care about the remoting aspect. Therefore I have created a lightweight channel implemention that doesn`t rely on jms (or anything else)
The basis of the system is the InputChannel (you can take messages from) and the OutputChannel (you can put messages into). These channels are blocking, or are blocking with a timeout, this makes it very easy to use them in a production line.
I have been looking for a good implemenation for some time, but I didn`t find anything lightweight that fits my needs, so I wrote my own. A lot of components are based on the book: Enterprise Integration Patterns, so people who read it know how those components work.
In the near future (as soon as I have the time) I want to make the project opensource, but I first want to get some comment on it. Be warned.. the documention is partly in dutch.. there is not nearly enough documention.. the unit tests are missing.. so you don`t have to read a lot
the link:
http://members.home.nl/peter-veentjer01/index.htm
btw:
I have to create a new homepage also... (but I don`t like building webpages).
This is an example I use in my Lucene implementation:
These are the channels I`m currently using in this part of the system.
-------------------------------------------------------------------
crawler->deleteChannelBeginPoint->indexUpdater
-------------------------------------------------------------------
crawler->normalCrawlBeginPoint->retrieveOrCreateDocumentChannel->loadAndAnalyzeContentChannel
->metadataAddingChannel>writingEndPoint->indexUpdater
-------------------------------------------------------------------
this is a part of the configration in Spring:
Code:<bean id="crawler" class="com.jph.lucene.adoc.crawler.DirMappingTwoWayCrawler"> <constructor-arg index="0"> <bean class="com.jph.lucene.crawlers.SimpleCrawlerId"> <constructor-arg index="0" value="standaard"/> </bean> </constructor-arg> <constructor-arg index="1" ref="index"/> <constructor-arg index="2" ref="docConverter"/> <constructor-arg index="3" ref="normalCrawlBeginPoint"/> <constructor-arg index="4" ref="deleteChannelBeginPoint"/> <constructor-arg index="5" ref="dirMappingRepository"/> </bean> <bean id="processingMap" class="org.jph.channel.StdProcessingMap"/> <bean id="deleteQueue" class="java.util.concurrent.LinkedBlockingQueue"> <constructor-arg index="0" value="50000"/> </bean> <bean id="deleteChannelBeginPoint" class="org.jph.channel.StdActiveOuputChannel"> <constructor-arg index="0"> <bean class="org.jph.concurrent.BlockingThreadPoolExecutor" destroy-method="shutdown"> <!-- core pool size --> <constructor-arg index="0" value="1"/> <!-- max pool size --> <constructor-arg index="1" value="1"/> <!-- die-time --> <constructor-arg index="2" value="5000"/> <!-- de tijdeenheid waarmee deze executorservice gaat werken --> <constructor-arg index="3" ref="java.util.concurrent.TimeUnit.MILLISECONDS"/> <!-- de queue die gebruikt wordt om taken in op te slaan --> <constructor-arg index="4" ref="retrieveOrCreateDocumentQueue"/> <!-- - de threadfactory die deze executorservice gebruikt om threads voor de threadpool - aan te maken. --> <constructor-arg index="5"> <bean class="org.jph.concurrent.StdThreadFactory"> <!-- priority --> <constructor-arg index="0" value="1"/> <!-- De naam van ThreadGroup waar de Threads onder vallen --> <constructor-arg index="1" value="deleteQueue"/> </bean> </constructor-arg> </bean> </constructor-arg> <constructor-arg index="1"> <bean class="com.jph.lucene.adoc.standard.DeleteChannel"> <constructor-arg index="0" ref="indexUpdater"/> <constructor-arg index="1" ref="pnlDocumentIndexReaderProvider"/> <constructor-arg index="2" ref="processingMap"/> </bean> </constructor-arg> </bean> <bean id="normalCrawlBeginPoint" class="anchormen.pnl.search.NormalCrawlStartOutputChannel"> <constructor-arg index="0" ref="retrieveOrCreateDocumentChannel"/> <constructor-arg index="1" ref="processingMap"/> </bean> <bean id="retrieveOrCreateDocumentQueue" class="java.util.concurrent.LinkedBlockingQueue"> <constructor-arg index="0" value="500000"/> </bean> <bean id="loadAndAnalyzeContentQueue" class="java.util.concurrent.LinkedBlockingQueue"> <constructor-arg index="0" value="5000"/> </bean> <bean id="metadataAddingQueue" class="java.util.concurrent.LinkedBlockingQueue"> <constructor-arg index="0" value="5000"/> </bean> <bean id="retrieveOrCreateDocumentChannel" class="org.jph.channel.StdActiveOuputChannel"> <constructor-arg index="0"> <bean class="org.jph.concurrent.BlockingThreadPoolExecutor" destroy-method="shutdown"> <!-- core pool size --> <constructor-arg index="0" value="1"/> <!-- max pool size --> <constructor-arg index="1" value="1"/> <!-- die-time --> <constructor-arg index="2" value="5000"/> <!-- de tijdeenheid waarmee deze executorservice gaat werken --> <constructor-arg index="3" ref="java.util.concurrent.TimeUnit.MILLISECONDS"/> <!-- de queue die gebruikt wordt om taken in op te slaan --> <constructor-arg index="4" ref="retrieveOrCreateDocumentQueue"/> <constructor-arg index="5"> <bean class="org.jph.concurrent.StdThreadFactory"> <!-- priority --> <constructor-arg index="0" value="1"/> <!-- De naam van ThreadGroup waar de Threads onder vallen --> <constructor-arg index="1" value="retrieveOrCreateDocument"/> </bean> </constructor-arg> </bean> </constructor-arg> <constructor-arg index="1"> <bean class="org.jph.channel.StdProcessingOutputChannel"> <constructor-arg index="0" ref="loadAndAnalyzeContentChannel"/> <constructor-arg index="1" > <bean class="anchormen.pnl.search.RetrieveOrCreateDocumentMsgProcessor"> <constructor-arg index="0" ref="pnlDocumentIndexReaderProvider"/> <constructor-arg index="1" ref="idFactory"/> </bean> </constructor-arg> </bean> </constructor-arg> </bean> <bean id="loadAndAnalyzeContentChannel" class="org.jph.channel.StdActiveOuputChannel"> <constructor-arg index="0"> <bean class="org.jph.concurrent.BlockingThreadPoolExecutor" destroy-method="shutdown"> <!-- core pool size --> <constructor-arg index="0" value="2"/> <!-- max pool size --> <constructor-arg index="1" value="6"/> <!-- die-time --> <constructor-arg index="2" value="5000"/> <!-- de tijdeenheid waarmee deze executorservice gaat werken --> <constructor-arg index="3" ref="java.util.concurrent.TimeUnit.MILLISECONDS"/> <!-- de queue die gebruikt wordt om taken in op te slaan --> <constructor-arg index="4" ref="loadAndAnalyzeContentQueue"/> <constructor-arg index="5"> <bean class="org.jph.concurrent.StdThreadFactory"> <!-- priority --> <constructor-arg index="0" value="1"/> <!-- De naam van ThreadGroup waar de Threads onder vallen --> <constructor-arg index="1" value="loadersAndAnalyzers"/> </bean> </constructor-arg> </bean> </constructor-arg> <constructor-arg index="1"> <bean class="org.jph.channel.StdProcessingOutputChannel"> <constructor-arg index="0" ref="metadataAddingChannel"/> <constructor-arg index="1"> <bean class="anchormen.pnl.search.LoadAndAnalyzeContentMsgProcessor"> <constructor-arg index="0" ref="dirMappingRepository"/> </bean> </constructor-arg> </bean> </constructor-arg> </bean> <bean id="metadataAddingChannel" class="org.jph.channel.StdActiveOuputChannel"> <constructor-arg index="0"> <bean class="org.jph.concurrent.BlockingThreadPoolExecutor" destroy-method="shutdown"> <!-- core pool size --> <constructor-arg index="0" value="1"/> <!-- max pool size --> <constructor-arg index="1" value="1"/> <!-- die-time --> <constructor-arg index="2" value="5000"/> <!-- de tijdeenheid waarmee deze executorservice gaat werken --> <constructor-arg index="3" ref="java.util.concurrent.TimeUnit.MILLISECONDS"/> <!-- de queue die gebruikt wordt om taken in op te slaan --> <constructor-arg index="4" ref="metadataAddingQueue"/> <!-- - de threadfactory die deze executorservice gebruikt om threads voor de threadpool - aan te maken. --> <constructor-arg index="5"> <bean class="org.jph.concurrent.StdThreadFactory"> <!-- priority --> <constructor-arg index="0" value="1"/> <!-- De naam van ThreadGroup waar de Threads onder vallen --> <constructor-arg index="1" value="analyzers"/> </bean> </constructor-arg> </bean> </constructor-arg> <constructor-arg index="1"> <bean class="org.jph.channel.StdProcessingOutputChannel"> <constructor-arg index="0" ref="writingEndPoint"/> <constructor-arg index="1"> <bean class="anchormen.pnl.search.MetadataAddingMsgProcessor"> <constructor-arg index="0" ref="pnlManager"/> </bean> </constructor-arg> </bean> </constructor-arg> </bean> <bean id="writingEndPoint" class="anchormen.pnl.search.WritingOutputChannel"> <constructor-arg index="0" ref="indexUpdater"/> </bean>



Reply With Quote
