Page 1 of 2 12 LastLast
Results 1 to 10 of 19

Thread: SI Aggregator - Timeout release strategy

  1. #1
    Join Date
    Jul 2009
    Location
    Hyderabad India
    Posts
    56

    Default SI Aggregator - Timeout release strategy

    Hi,

    I have configured a producer and a consumer. Producer is sending messages every 3rd second. There is a Aggregator configured that has release-strategy="timeout" and timeout set as 7 seconds. Now I see the messages are aggregated and sent to output channel at 9th seconds (as soon as 3rd message is received), though it should timeout at 7th second and should aggregate only 2 messages.

    I am using org.springframework.integration.aggregator.Timeout CountSequenceSizeReleaseStrategy and I have seen the code, canRelease (..) will be called on every message received in the input-channel and will return true only after receiving 3rd message

    Is there a way to achieve timeout as defined i.e. 7 second and only aggregate the messages in the queue till that time?

    Context.xml

    Code:
    <task:scheduled-tasks scheduler="taskScheduler">
    		<task:scheduled ref="producer" method="run" fixed-rate="3000" />
    	</task:scheduled-tasks>
    
    	<bean id="timeout"
    		class="org.springframework.integration.aggregator.TimeoutCountSequenceSizeReleaseStrategy">
    		<constructor-arg name="threshold" value="1000" />
    		<constructor-arg name="timeout" value="7000" />
    	</bean>
    
    	<bean id="correlationStrategy" class="com.test.MyCorrelationStrategy" />
    	<int:aggregator input-channel="channel_one"
    		output-channel="channel_two" release-strategy="timeout"
    		correlation-strategy="correlationStrategy" method="aggregate">
    		<bean class="com.test.MyAggreegator" />
    	</int:aggregator>
    Regards,
    Pranav

  2. #2
    Join Date
    Sep 2011
    Posts
    167

    Default

    Quote Originally Posted by Pranav Kumar Varshney View Post
    Hi,

    I have configured a producer and a consumer. Producer is sending messages every 3rd second. There is a Aggregator configured that has release-strategy="timeout" and timeout set as 7 seconds. Now I see the messages are aggregated and sent to output channel at 9th seconds (as soon as 3rd message is received), though it should timeout at 7th second and should aggregate only 2 messages.

    I am using org.springframework.integration.aggregator.Timeout CountSequenceSizeReleaseStrategy and I have seen the code, canRelease (..) will be called on every message received in the input-channel and will return true only after receiving 3rd message

    Is there a way to achieve timeout as defined i.e. 7 second and only aggregate the messages in the queue till that time?

    Context.xml

    Code:
    <task:scheduled-tasks scheduler="taskScheduler">
    		<task:scheduled ref="producer" method="run" fixed-rate="3000" />
    	</task:scheduled-tasks>
    
    	<bean id="timeout"
    		class="org.springframework.integration.aggregator.TimeoutCountSequenceSizeReleaseStrategy">
    		<constructor-arg name="threshold" value="1000" />
    		<constructor-arg name="timeout" value="7000" />
    	</bean>
    
    	<bean id="correlationStrategy" class="com.test.MyCorrelationStrategy" />
    	<int:aggregator input-channel="channel_one"
    		output-channel="channel_two" release-strategy="timeout"
    		correlation-strategy="correlationStrategy" method="aggregate">
    		<bean class="com.test.MyAggreegator" />
    	</int:aggregator>
    Regards,
    Pranav
    Hi Pranav,

    Can you please send the whole set up , as it will help me to see more details and will provide you with valuable feedback of your query..!!

  3. #3
    Join Date
    Oct 2011
    Location
    Mumbai, India
    Posts
    213

    Default

    I have configured a producer and a consumer. Producer is sending messages every 3rd second. There is a Aggregator configured that has release-strategy="timeout" and timeout set as 7 seconds. Now I see the messages are aggregated and sent to output channel at 9th seconds (as soon as 3rd message is received), though it should timeout at 7th second and should aggregate only 2 messages.

    I am using org.springframework.integration.aggregator.Timeout CountSequenceSizeReleaseStrategy and I have seen the code, canRelease (..) will be called on every message received in the input-channel and will return true only after receiving 3rd message

    Is there a way to achieve timeout as defined i.e. 7 second and only aggregate the messages in the queue till that time?
    What does your correlation strategy do? Can you show us how your are building the message that is being sent out to the input channel? Can answer how the release is happening after three messages only after looking at it?

    Release strategy in invoked only when a message arrives. It will happen only after a message arrives and 7000 ms (as set by you) have elapsed since the first message in the group has arrived or the group is complete or the threshold for max number of messages in the group is reached (1000 in your case).

    In case we are using HeaderAttributeCorrelationStrategy, the release will happen after the 4th message in the group has arrived

  4. #4
    Join Date
    Sep 2011
    Posts
    167

    Default

    Quote Originally Posted by Amol Nayak View Post
    What does your correlation strategy do? Can you show us how your are building the message that is being sent out to the input channel? Can answer how the release is happening after three messages only after looking at it?

    Release strategy in invoked only when a message arrives. It will happen only after a message arrives and 7000 ms (as set by you) have elapsed since the first message in the group has arrived or the group is complete or the threshold for max number of messages in the group is reached (1000 in your case).

    In case we are using HeaderAttributeCorrelationStrategy, the release will happen after the 4th message in the group has arrived
    Release strategy in invoked only when a message arrives. It will happen only after a message arrives and 7000 ms (as set by you) have elapsed since the first message in the group has arrived or the group is complete or the threshold for max number of messages in the group is reached (1000 in your case).
    Hi Anmol,

    could you please explain about the threshold for max number of messages in the group...does it mean that when the value of messages count reaches to 1000 messages then aggregator would release the messages..??

  5. #5
    Join Date
    Oct 2011
    Location
    Mumbai, India
    Posts
    213

    Default

    Yes, if the message group size >= threshold


    p.s. My name is Amol and not Anmol

  6. #6
    Join Date
    Sep 2011
    Posts
    167

    Default

    Quote Originally Posted by Amol Nayak View Post
    Yes, if the message group size >= threshold


    p.s. My name is Amol and not Anmol
    if the message group size >= threshold
    Hi Amol,

    Please explain this in detail,so that my understanding on thia will be 100% clear...!!please guide in scenearios where we want that message group inside message store should expire after a fixed interval of time period , can we go also for org.springframework.integration.aggregator.Timeout CountSequenceSizeReleaseStrategy as we can avoid message reaper , as rg.springframework.integration.store.MessageGroupS toreReaper we use to expire the message group inside the message store ..!!

    As I have the scenerio in which inside the release starergy i aggregate all the messages into the group of 10 and when the count reaches to 10 I release all those messages and side by side I also need to track the timeout period of those messages for that I have configured the message reaper to expire the message group and if the messages below 10 let assume only seven arrives with in that time period then also those messages to be released ....we cant wait for more time period ,, but for this AI have to add separate reaper,,,, does this org.springframework.integration.aggregator.Timeout CountSequenceSizeReleaseStrategy will also work same ..!!
    Last edited by SARAL SAXENA; Nov 22nd, 2011 at 11:06 AM.

  7. #7
    Join Date
    Oct 2011
    Location
    Mumbai, India
    Posts
    213

    Default

    No, TimeoutCountSequenceSizeReleaseStrategy cannot be used instead of MessageGroupStoreReaper . As i stated earlier, release strategy for a message group will kick in only when a message for the groups arrives. Where as a reaper periodically runs as scheduled and expire the message groups if required as configured.

    For example, you want a group to be timed out after 30 secs after the first message in it arrives.

    In case you have a TimeoutCountSequenceSizeReleaseStrategy configured for a threshold message count to 10 and timeout to 30 secs and you receive 7 of 10 messages within 30 secs, then the group will never expire in absence of reaper if no more messages for the group arrives.

  8. #8
    Join Date
    Jul 2009
    Location
    Hyderabad India
    Posts
    56

    Default

    I have configured org.springframework.integration.store.MessageGroup StoreReaper and its all fixed.

    Context.xml

    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
    	xmlns:int-feed="http://www.springframework.org/schema/integration/feed"
    	xmlns:task="http://www.springframework.org/schema/task"
    	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    			http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    			http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd
    			http://www.springframework.org/schema/integration/feed http://www.springframework.org/schema/integration/feed/spring-integration-feed-2.0.xsd">
    
    	<int:poller id="pooler" default="true" fixed-delay="1000"></int:poller>
    	<int:channel id="channel_one">
    		<int:queue capacity="10" />
    	</int:channel>
    
    	<int:channel id="channel_two">
    		<int:queue capacity="10" />
    	</int:channel>
    
    	<bean id="producer" class="com.test.MessageProducer">
    		<property name="channel" ref="channel_one" />
    	</bean>
    	<task:scheduled-tasks scheduler="taskScheduler">
    		<task:scheduled ref="producer" method="run" fixed-rate="3000" />
    	</task:scheduled-tasks>
    
    	<bean id="timeout"
    		class="org.springframework.integration.aggregator.TimeoutCountSequenceSizeReleaseStrategy">
    		<constructor-arg name="threshold" value="1000" />
    		<constructor-arg name="timeout" value="7000" />
    	</bean>
    
    	<bean id="correlationStrategy" class="com.test.MyCorrelationStrategy" />
    	<int:aggregator input-channel="channel_one"
    		output-channel="channel_two" release-strategy="timeout"
    		send-partial-result-on-expiry="true" correlation-strategy="correlationStrategy"
    		message-store="myMessageStore" method="aggregate">
    		<bean class="com.test.MyAggreegator" />
    	</int:aggregator>
    
    	<bean id="myMessageStore" class="org.springframework.integration.store.SimpleMessageStore" />
    	
    	<bean id="myReaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
    		<property name="messageGroupStore" ref="myMessageStore" />
    		<property name="timeout" value="5000" />
    	</bean>
    	
    	<task:scheduler id="myScheduler" />
    	
    	<task:scheduled-tasks scheduler="myScheduler">
    		<task:scheduled ref="myReaper" method="run" fixed-rate="1000"/>
    	</task:scheduled-tasks>
    
    	<bean id="consumer" class="com.test.MessageConsumer" />
    	<int:service-activator input-channel="channel_two"
    		ref="consumer" />
    </beans>
    Thanks Amol and Saral

  9. #9
    Join Date
    Oct 2011
    Location
    Mumbai, India
    Posts
    213

    Default

    What was your use case? As release strategy and message group reaper are totally different and are intended for different situations.

  10. #10
    Join Date
    Sep 2011
    Posts
    167

    Default

    Hi Pranav ,
    Thanks for the explnation it is working now,I done the corelational-startergy expression attribute via SPEL itself and it's working..!!Thanks a lot..!!
    Last edited by SARAL SAXENA; Nov 29th, 2011 at 12:57 AM.

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •