Page 1 of 2 12 LastLast
Results 1 to 10 of 17

Thread: 6 messages are not being released

  1. #1
    Join Date
    Sep 2011
    Posts
    167

    Unhappy 6 messages are not being released

    Hi ,

    I was working on application that makes use of aggregator that takes it input from a input channel and then aggregate the messages and then after that it throws to the destination channel.

    the agregator release stratergy is such that when mesage count reaches to 10 , it releases them..so for example if you send 20 messages it will release them in the group of 10 so finally 2 groups will be there (10*2)

    now my question is such that if you send 6 messages only instaesd of 20 then aggreagtor would keep waiting as it expects 20 messages but for this i have configured MessageGroupStoreReaper and in configuration section and I have also configured send-partial-result-on-expiry="true" in aggregator, Now when I am sending six messages , the program goes in loop and it doesn't releases those 6 messages, please guide me where the things are not going perfect..!!

    the configuration file is shown below.....

    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:si="http://www.springframework.org/schema/integration"
    	xmlns:stream="http://www.springframework.org/schema/integration/stream"
    	xmlns:task="http://www.springframework.org/schema/task"
    	xsi:schemaLocation="http://www.springframework.org/schema/beans
    			http://www.springframework.org/schema/beans/spring-beans.xsd
    			http://www.springframework.org/schema/task
    			http://www.springframework.org/schema/task/spring-task-3.0.xsd
    			http://www.springframework.org/schema/integration
    			http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    			http://www.springframework.org/schema/integration/stream
    			http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">
    
    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    		<property name="brokerURL">
    			<value>tcp://localhost:61616</value>
    		</property>
    	</bean>
    	
    	
    	<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
    		<property name="maxConnections" value="8" />
    		<property name="maximumActive" value="500" />
    		<property name="connectionFactory" ref="connectionFactory" />
    	</bean>
    
    
    	<si:gateway id="gateway" service-interface="com.sample.agg.Gateway"
    		default-request-channel="aggregator-input-channel"  >
    	</si:gateway>
    	
    	<si:channel id="aggregator-input-channel">
    		<si:queue capacity="100" />
    	</si:channel>
    	
    	<si:aggregator id="exampleAggregator" input-channel="aggregator-input-channel" output-channel="aggregator-output-channel" 
    		ref="sampleAggregator" method="aggregateMessagaes"  send-partial-result-on-expiry="true"
    		correlation-strategy="exampleCorrelationBean" correlation-strategy-method="correlationStrategy" 
    		release-strategy="exampleReleaseStrategy" release-strategy-method="releaseStrategy" 
    		message-store="exampleMessageStore" >
    		
    	</si:aggregator>
    	
    	<bean id="sampleAggregator" class="com.sample.agg.ExampleAggregator"></bean>
    	<bean id="exampleCorrelationBean" class="com.sample.agg.ExampleCorrelationBean" />
    	<bean id="exampleReleaseStrategy" class="com.sample.agg.ExampleReleaseStrategy"></bean>
    	
    	<bean id="exampleMessageStore" class="org.springframework.integration.store.SimpleMessageStore" />
    		
    	<bean id="exampleReaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
    			<property name="messageGroupStore" ref="exampleMessageStore" />
    <!--		<property name="timeout" value="5000" />-->
    	</bean>
    	
    	<task:scheduler id="exampleScheduler" />
    
    	<task:scheduled-tasks scheduler="exampleScheduler">
    		<task:scheduled ref="exampleReaper" method="run" fixed-rate="1000" />
    	</task:scheduled-tasks>
    	
    	<si:channel id="aggregator-output-channel">
    		<si:queue capacity="1000" />
    	</si:channel>
    	
    	<si:service-activator input-channel="aggregator-output-channel" ref="printerService" method="printString"></si:service-activator>
    	
    	<bean id="printerService" class="com.sample.agg.PrinterService"></bean>
    	
    	<si:poller max-messages-per-poll="1" id="defaultPoller" default="true">
    		<si:interval-trigger interval="1000" />
    	</si:poller>
    
    </beans>

    the main application from where I am sending the messages are shown below....

    Code:
    package com.sample.agg;
    
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    import org.springframework.integration.Message;
    import org.springframework.integration.support.MessageBuilder;
    
    
    public class TestApp {
    	
    	public static void main(String[] args) throws InterruptedException{
    		 //ApplicationContext context = new ClassPathXmlApplicationContext("resources/config.xml");
    		 ApplicationContext context = new ClassPathXmlApplicationContext("config.xml");
    		 
    		 Gateway gate=(Gateway) context.getBean("gateway");
    		 for(int i=0;i<6;i++){
    			 Message<String> msg=MessageBuilder.withPayload("My Sample Message:"+i).build();
    			 Thread.sleep(1000);
    			 gate.send(msg);
    		 }
    		 
    	}
    
    }

    the correlation bean is shown below...

    Code:
    package com.sample.agg;
    
    import org.springframework.integration.Message;
    
    public class ExampleCorrelationBean {
    	//Not grouping based on correlation ids 
    	public String correlationStrategy(Message<String> request){
    		return "NO GROUPING";
    	}
    }
    the release strategy bean is shown below...

    Code:
    package com.sample.agg;
    
    import java.util.List;
    
    import org.springframework.integration.Message;
    
    public class ExampleReleaseStrategy {
    	//Releases when message count reaches 10
    	public boolean releaseStrategy(List<Message<String>> requests) {
    		if(requests.size()==10){
    			System.out.println("Message count reached 10, so released for aggregation");
    			return true;
    		}
    		return false;
    	  }
    }
    I am also sending the zip of the application , please advise me why it is not releasing those 6 messages ..!!
    Attached Files Attached Files

  2. #2
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,137

    Default

    The reaper timeout is commented out; the default timeout for the reaper is infinity so it will never reap if you don't set a timeout.

    Before posting questoins like this here you should really do some more work on your own. Spring Integration provides copious amounts of logging to help you understand what is going on in your application; just set the org.springframework.integration logger level to DEBUG. For example, in the reaper...

    Code:
    /**
     * Expire all message groups older than the {@link #setTimeout(long) timeout} provided. Normally this method would
     * be executed by a scheduled task.
     */
    public void run() {
    	if (timeout >= 0) {
    		if (logger.isDebugEnabled()) {
    			logger.debug("Expiring all messages older than timeout=" + timeout + " from message group store: "
    					+ messageGroupStore);
    		}
    		messageGroupStore.expireMessageGroups(timeout);
    	}
    }
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  3. #3
    Join Date
    Sep 2011
    Posts
    167

    Default

    Hi Gary,
    Thanks for the explnation one thing I want to know ...that you have guided....
    The reaper timeout is commented out; the default timeout for the reaper is infinity so it will never reap if you don't set a timeout.
    so if I uncomment that reaper timeout will it work..!!!
    Last edited by SARAL SAXENA; Nov 13th, 2011 at 11:01 AM.

  4. #4
    Join Date
    Oct 2011
    Location
    Mumbai, India
    Posts
    213

    Default

    Did you try? Is it still not working after uncommenting it?

  5. #5
    Join Date
    Sep 2011
    Posts
    167

    Default

    Quote Originally Posted by SARAL SAXENA View Post
    Hi Gary,
    Thanks for the explnation one thing I want to know ...that you have guided....

    so if I uncomment that reaper timeout will it work..!!!
    Hi Gary,

    Thanks a lot the code works perfectly, you have guided perfectly it was because of that comment prblm thanks a lot again, now the only one issues was that rite now I was pushing only 6 messages but when I enter say 1900 messages my program got stuck, do we have to any sort of load balancing since this time I am entering messages in bulk although in configuration xml I have increased the capacity of queue channel , but still my program got stuck, please advise me on this,,,!!


    package com.sample.agg;

    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlAp plicationContext;
    import org.springframework.integration.Message;
    import org.springframework.integration.support.MessageBui lder;


    public class TestApp {

    public static void main(String[] args) throws InterruptedException{
    //ApplicationContext context = new ClassPathXmlApplicationContext("resources/config.xml");
    ApplicationContext context = new ClassPathXmlApplicationContext("config.xml");

    Gateway gate=(Gateway) context.getBean("gateway");
    for(int i=1;i<1900;i++){
    Message<String> msg=MessageBuilder.withPayload("My Sample Message:"+i).build();
    Thread.sleep(1000);
    gate.send(msg);
    }

    }

    }
    Below is the structure of the configuration xml file is..
    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:si="http://www.springframework.org/schema/integration"
    	xmlns:stream="http://www.springframework.org/schema/integration/stream"
    	xmlns:task="http://www.springframework.org/schema/task"
    	xsi:schemaLocation="http://www.springframework.org/schema/beans
    			http://www.springframework.org/schema/beans/spring-beans.xsd
    			http://www.springframework.org/schema/task
    			http://www.springframework.org/schema/task/spring-task-3.0.xsd
    			http://www.springframework.org/schema/integration
    			http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    			http://www.springframework.org/schema/integration/stream
    			http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">
    
    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    		<property name="brokerURL">
    			<value>tcp://localhost:61616</value>
    		</property>
    	</bean>
    	
    	
    	<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
    		<property name="maxConnections" value="8" />
    		<property name="maximumActive" value="500" />
    		<property name="connectionFactory" ref="connectionFactory" />
    	</bean>
    
    
    	<si:gateway id="gateway" service-interface="com.sample.agg.Gateway"
    		default-request-channel="aggregator-input-channel"  >
    	</si:gateway>
    	
    	<si:channel id="aggregator-input-channel">
    		<si:queue capacity="2050" />
    	</si:channel>
    	
    	<si:aggregator id="exampleAggregator" input-channel="aggregator-input-channel" output-channel="aggregator-output-channel" 
    		ref="sampleAggregator" method="aggregateMessagaes"  send-partial-result-on-expiry="true"
    		correlation-strategy="exampleCorrelationBean" correlation-strategy-method="correlationStrategy" 
    		release-strategy="exampleReleaseStrategy" release-strategy-method="releaseStrategy" 
    		message-store="exampleMessageStore" >
    		
    	</si:aggregator>
    	
    	<bean id="sampleAggregator" class="com.sample.agg.ExampleAggregator"></bean>
    	<bean id="exampleCorrelationBean" class="com.sample.agg.ExampleCorrelationBean" />
    	<bean id="exampleReleaseStrategy" class="com.sample.agg.ExampleReleaseStrategy"></bean>
    	
    	<bean id="exampleMessageStore" class="org.springframework.integration.store.SimpleMessageStore" />
    		
    	<bean id="exampleReaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
    			<property name="messageGroupStore" ref="exampleMessageStore" />
    		<property name="timeout" value="5000" />
    	</bean>
    	
    	<task:scheduler id="exampleScheduler" />
    
    	<task:scheduled-tasks scheduler="exampleScheduler">
    		<task:scheduled ref="exampleReaper" method="run" fixed-rate="1000" />
    	</task:scheduled-tasks>
    	
    	<si:channel id="aggregator-output-channel">
    		<si:queue capacity="2050" />
    	</si:channel>
    	
    	<si:service-activator input-channel="aggregator-output-channel" ref="printerService" method="printString"></si:service-activator>
    	
    	<bean id="printerService" class="com.sample.agg.PrinterService"></bean>
    	
    	<si:poller max-messages-per-poll="1" id="defaultPoller" default="true">
    		<si:interval-trigger interval="1000" />
    	</si:poller>
    
    </beans>
    I am entering messages in bulk although in configuration xml I have increased the capacity of queue channel , but still my program got stuck, please advise me on this,,,!
    Last edited by SARAL SAXENA; Nov 14th, 2011 at 07:48 AM.

  6. #6
    Join Date
    Sep 2011
    Posts
    167

    Default

    Quote Originally Posted by SARAL SAXENA View Post
    Hi Gary,

    Thanks a lot the code works perfectly, you have guided perfectly it was because of that comment prblm thanks a lot again, now the only one issues was that rite now I was pushing only 6 messages but when I enter say 1900 messages my program got stuck, do we have to any sort of load balancing since this time I am entering messages in bulk although in configuration xml I have increased the capacity of queue channel , but still my program got stuck, please advise me on this,,,!!




    Below is the structure of the configuration xml file is..
    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:si="http://www.springframework.org/schema/integration"
    	xmlns:stream="http://www.springframework.org/schema/integration/stream"
    	xmlns:task="http://www.springframework.org/schema/task"
    	xsi:schemaLocation="http://www.springframework.org/schema/beans
    			http://www.springframework.org/schema/beans/spring-beans.xsd
    			http://www.springframework.org/schema/task
    			http://www.springframework.org/schema/task/spring-task-3.0.xsd
    			http://www.springframework.org/schema/integration
    			http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    			http://www.springframework.org/schema/integration/stream
    			http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">
    
    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    		<property name="brokerURL">
    			<value>tcp://localhost:61616</value>
    		</property>
    	</bean>
    	
    	
    	<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
    		<property name="maxConnections" value="8" />
    		<property name="maximumActive" value="500" />
    		<property name="connectionFactory" ref="connectionFactory" />
    	</bean>
    
    
    	<si:gateway id="gateway" service-interface="com.sample.agg.Gateway"
    		default-request-channel="aggregator-input-channel"  >
    	</si:gateway>
    	
    	<si:channel id="aggregator-input-channel">
    		<si:queue capacity="2050" />
    	</si:channel>
    	
    	<si:aggregator id="exampleAggregator" input-channel="aggregator-input-channel" output-channel="aggregator-output-channel" 
    		ref="sampleAggregator" method="aggregateMessagaes"  send-partial-result-on-expiry="true"
    		correlation-strategy="exampleCorrelationBean" correlation-strategy-method="correlationStrategy" 
    		release-strategy="exampleReleaseStrategy" release-strategy-method="releaseStrategy" 
    		message-store="exampleMessageStore" >
    		
    	</si:aggregator>
    	
    	<bean id="sampleAggregator" class="com.sample.agg.ExampleAggregator"></bean>
    	<bean id="exampleCorrelationBean" class="com.sample.agg.ExampleCorrelationBean" />
    	<bean id="exampleReleaseStrategy" class="com.sample.agg.ExampleReleaseStrategy"></bean>
    	
    	<bean id="exampleMessageStore" class="org.springframework.integration.store.SimpleMessageStore" />
    		
    	<bean id="exampleReaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
    			<property name="messageGroupStore" ref="exampleMessageStore" />
    		<property name="timeout" value="5000" />
    	</bean>
    	
    	<task:scheduler id="exampleScheduler" />
    
    	<task:scheduled-tasks scheduler="exampleScheduler">
    		<task:scheduled ref="exampleReaper" method="run" fixed-rate="1000" />
    	</task:scheduled-tasks>
    	
    	<si:channel id="aggregator-output-channel">
    		<si:queue capacity="2050" />
    	</si:channel>
    	
    	<si:service-activator input-channel="aggregator-output-channel" ref="printerService" method="printString"></si:service-activator>
    	
    	<bean id="printerService" class="com.sample.agg.PrinterService"></bean>
    	
    	<si:poller max-messages-per-poll="1" id="defaultPoller" default="true">
    		<si:interval-trigger interval="1000" />
    	</si:poller>
    
    </beans>
    I am entering messages in bulk although in configuration xml I have increased the capacity of queue channel , but still my program got stuck, please advise me on this,,,!
    Please advise me on this ...!! Thanks in advance..!!

  7. #7
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,137

    Default

    As I said before, you need to turn on DEBUG logging for org.springframework.integration (e.g. in log4j.xml) and figure out what's happening to your messages.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  8. #8
    Join Date
    Sep 2011
    Posts
    167

    Default

    Quote Originally Posted by Gary Russell View Post
    As I said before, you need to turn on DEBUG logging for org.springframework.integration (e.g. in log4j.xml) and figure out what's happening to your messages.
    Hi Gary,

    I have enabled the logs and inside logs I can check that it is going inside the release stratergy but it is stuck there , since the number of messages are 1900 this time earlier it was 6 so the process was flowing smoothly..!! please guide how to overcome from this prblm..!!

  9. #9
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,137

    Default

    This forum is not intended to be a tutorial on how to debug programs but, why don't you add some debugging logs to your release strategy?

    (Hint: requests.size()).

    If you turn on TRACE level, you'll see even more detail from the Aggregator...

    Code:
    if (logger.isTraceEnabled()) {
    	logger.trace("Adding message to group [ " + messageGroup + "]");
    }
    Together with extra debug logging in your release strategy, you should be able to see what's happening.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  10. #10
    Join Date
    Sep 2011
    Posts
    167

    Default

    Hi Gary,

    while when I send the 200 messages the messages are getting enqued into the queue properly into the queue...after that a file has to be created but those file is not created in the case of 200 messages for example lets say I have the release starergy such that when message group reaches to 10 messages it should be get released and in such case if I send say 16 messages then first 10 should be successfully released and 1 file of those 10 messages is created in a directory and of the renaming six those are discarded 1 separate file of them also should be get created , in the case 16 messages this scenerio works perfectly , I have tested it on my end but when I increase the message size to 200 then My program got stuck...please advise me on this...I am attaching the sample code of this also...!! please look and advise me that why files process creation error is got stuck up in case of 200 mesaages..!!

    the structure of the main files where if I increase the messages to 20

    Code:
    package com.walgreens.ods.producer;
    
    
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    import org.springframework.integration.MessageChannel;
    import org.springframework.integration.support.MessageBuilder;
    
    public class BatchRequestFileCreator {
    	public static void main(String[] args) {
    
    		ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
    				"spring-config.xml");
    
    		context.start();
    		
    		MessageChannel input = (MessageChannel) context.getBean("input");		
    		
    		for (int i =0; i< 19; i++){
    			input.send(MessageBuilder
    					.withPayload(
    					"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
    					+"<RetailODS xmlns=\"http://www.walgreens.com/schema/RetailODS\""
    					+" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\""
    					+" xsi:schemaLocation=\"http://www.walgreens.com/schema/RetailODS RetailODS.xsd\">"
    					+"<StoreData>"
    					+"<Store><Number>"+i+""+i+""+i+""+i+""+"</Number></Store><Pricing><WIC>000000</WIC><UPC>00000001411009</UPC><RegularPrice><Amount>000000</Amount><Quantity>01</Quantity><Type>1</Type><UnitAmount>000299</UnitAmount><MixMatchCode>0000</MixMatchCode></RegularPrice><SalePrice><Amount>000000</Amount><Quantity>00</Quantity><Type>0</Type><UnitAmount>000000</UnitAmount><MixMatchCode>0000</MixMatchCode></SalePrice><Coupon><type>00</type><family>0000</family></Coupon><Coupon><type>00</type><family>0000</family></Coupon><Coupon><type>00</type><family>0000</family></Coupon><Coupon><type>00</type><family>0000</family></Coupon><Coupon><type>00</type><family>0000</family></Coupon><Coupon><type>00</type> <family>0000</family></Coupon><Coupon><type>00</type><family>0000</family></Coupon><Coupon><type>00</type><family>0000</family></Coupon><Reference>000</Reference></Pricing><Inventory><PLN>224405</PLN><WIC>000000</WIC><UPC>00000001411009</UPC><OnHands>00002</OnHands><AvailableOnHands>00002</AvailableOnHands><OnOrder></OnOrder><Timestamp>08-03-2011 07:00:00</Timestamp></Inventory>" 
    					+"</StoreData>"
    					+"</RetailODS>"							
    				).setCorrelationId("SERVICEID3_2011-09-24 16:23:28.593")
    					.build() );
    			
    		}			
    
    	}
    	
    	
    
    }
    The structure of the configuration file is ...

    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:si="http://www.springframework.org/schema/integration"
    	xmlns:stream="http://www.springframework.org/schema/integration/stream"
    	xmlns:jms="http://www.springframework.org/schema/integration/jms"
    	xmlns:task="http://www.springframework.org/schema/task"
    	xmlns:file="http://www.springframework.org/schema/integration/file"
    	xsi:schemaLocation="http://www.springframework.org/schema/beans
    			http://www.springframework.org/schema/beans/spring-beans.xsd
    			http://www.springframework.org/schema/task
    			http://www.springframework.org/schema/task/spring-task-3.0.xsd
    			http://www.springframework.org/schema/integration
    			http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    			http://www.springframework.org/schema/integration/jms 
    			http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.0.xsd 
    			http://www.springframework.org/schema/integration/stream
    			http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
    			http://www.springframework.org/schema/integration/file
    			http://www.springframework.org/schema/integration/file/spring-integration-file-2.0.xsd">
    			
    			
    			
    	<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> 
    		<property name="location">
    			<value>config.properties</value>
    		</property>
    	</bean>			
    			
    		<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    		<property name="brokerURL">
    			<value>tcp://localhost:61616</value>
    		</property>
    	</bean>
    
    	<si:channel id="input">
    		
    	</si:channel>
    			
    		 <jms:outbound-channel-adapter id="channel-to-mq"
    			channel="input" destination-name="${queueName}" /> 
    
    
    		<jms:message-driven-channel-adapter
    			id="mq-message-listner" channel="aggregator-input-channel"
    			destination-name="${queueName}" concurrent-consumers="${concurrent-consumers}"   />	
    			
    
    	<!-- <si:gateway id="gateway" service-interface="com.sample.agg.Gateway"
    		default-request-channel="aggregator-input-channel">
    	</si:gateway> -->
    	
    	<si:channel id="aggregator-input-channel">
    		<si:queue capacity="${queueCapacity}" />
    	</si:channel>
    	
    	<si:aggregator id="aggregator" input-channel="aggregator-input-channel" output-channel="aggregator-output-channel"
    	discard-channel="throwAwayChannel"		
    		ref="sampleAggregator" method="aggregateMessagaes"  		
    		correlation-strategy="correlationBean" correlation-strategy-method="correlationStrategy" 
    		release-strategy="releaseStrategyBean" release-strategy-method="releaseStrategy" 
    		message-store="messageStore"
    		order="1" 
    		send-partial-result-on-expiry="false"
    		
    		send-timeout="1000"
    		>
    	</si:aggregator>
    	
    	<bean id="sampleAggregator" class="com.walgreens.ods.producer.Aggregator"></bean> <!-- This bean clubs the List of  messages   -->
    	
    	<bean id="correlationBean" class="com.walgreens.ods.producer.CorrelationBean"/>
    	
    	<bean id="releaseStrategyBean" class="com.walgreens.ods.producer.ReleaseStrategyBean"> <!--This bean if results to true then messages are released  -->
    	<property name="recordLength" value="${recordLength}" />	
    	</bean>
    	
    	<si:channel id="throwAwayChannel"	>		
    	<si:queue capacity="${queueCapacity}" />
    	</si:channel>
    	
    	
    		<file:outbound-channel-adapter channel="aggregator-output-channel"
    			directory="${aggregatorOutputDirectoryPath}" filename-generator-expression="${outputFileNameExpression}"
    			temporary-file-suffix="_swp"   />	
    	
    		<file:outbound-channel-adapter channel="throwAwayChannel"
    			directory="${aggregatorOutputDirectoryPath}" filename-generator-expression="'discard-'+headers.getTimestamp()+'.xml'"
    			temporary-file-suffix="_swp"   />	
    	
    	
    	<si:channel id="aggregator-output-channel">
    		<si:queue capacity="${queueCapacity}" />
    	</si:channel>
    	
    	<task:scheduler id="scheduler" />
    	 	
    	<task:scheduled-tasks scheduler="scheduler">
    		<task:scheduled ref="reaper" method="run" fixed-rate="${reaperSchedulerFixedRate}" />
    	</task:scheduled-tasks> 
    
    	<bean id="reaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
    			<property name="messageGroupStore" ref="messageStore"  />
    		 	
    		<property name="timeout" value="${reaperTimeOut}" />
    		
    	</bean>
    	
     <bean id="messageStore" class="org.springframework.integration.store.SimpleMessageStore" />
    	
    <!-- 	
    	<si:service-activator input-channel="aggregator-output-channel" ref="printerService" method="printString"  ></si:service-activator>
    	
    	<bean id="printerService" class="com.sample.agg.PrinterService"></bean>
    --> 	
    
    	<si:poller max-messages-per-poll="1" id="defaultPoller" default="true">
    		<si:interval-trigger interval="${pollerInterval}" />
    	</si:poller>
    	
    <!-- <si:inbound-channel-adapter id="test" ref="exampleReleaseStrategy" method="releaseStrategy" >
    <si:poller fixed-rate="5000"/>
    </si:inbound-channel-adapter> -->	
    	
    
    </beans>
    Attached Files Attached Files
    Last edited by SARAL SAXENA; Nov 14th, 2011 at 11:50 AM.

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •