Results 1 to 8 of 8

Thread: Aggregator Issue

  1. #1
    Join Date
    Aug 2009
    Location
    Lima, Perú
    Posts
    38

    Default Aggregator Issue

    Hello,

    I'm getting a strange behavior with the aggregator with SI 2.1.4 as compared to SI 2.0.3. Reduced to a test case which just accumulates up to two messages before releasing:

    Code:
    public class AgTest {
    
    	public static void main(String[] args) {
    		ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("beans.xml");
    		MessageChannel ch = (MessageChannel) ctx.getBean("channel1");
    		MessagingTemplate template = new MessagingTemplate();
    		for(int z = 0 ; z < 5 ; z ++) {
    			template.send(ch, MessageBuilder.withPayload(1).build());
    			template.send(ch, MessageBuilder.withPayload(2).build());
    		}
    	}
    	
    	public Integer aggregate(List<Integer> numbers) {
    		return 666;
    	}
    	
    	public Integer correlate(Integer number) {
    		return 1;
    	}
    	
    	public boolean canRelease(List<Integer> numbers) {
    		return numbers.size() == 2;
    	}
    With the configuration:

    Code:
      	<int:channel id="channel1"/>
      	
     	<int:aggregator id="ag" 
    		input-channel="channel1" ref="agm" method="aggregate" 
    		output-channel="channel2"
    		send-partial-result-on-expiry="true"
    		correlation-strategy="agm" correlation-strategy-method="correlate"
    		release-strategy="agm" release-strategy-method="canRelease" />
    	
    	<bean id="agm" class="com.borrar.AgTest"/>
     	
     	<int:channel id="channel2"/>
      
      	<stream:stdout-channel-adapter id="stout" channel="channel2" append-newline="true"/>
    With SI 2.1.4 I just get one "666" on stdout. With SI 2.0.3 I get (as I expected) five times a "666". Is this a change in the behavior?

    regards,

    Diego

  2. #2
    Join Date
    Aug 2009
    Location
    Lima, Perú
    Posts
    38

    Default

    Hello, some idea? What could be wrong?

    BTW the documentation regarding the "release" is not clear enough (at least for me.) It starts with the "group released for aggregation", but the rest of the paragraph states that the group could also be "complete" and implies that in the first case the group will not be deleted from the message store... why two cases? for the "partial results on expiration"? so the partial results sent that way will remain in the message store?

    It also introduces a "marked/unmarked messages" concept that is superposed to the "group completed (and ready to be released)" vs "group uncomplete (and not released)":

    From section 5.4.3 for 2.1.4.RELEASE
    "When the group is released for aggregation, all its unmarked messages are processed and then marked so they will not be processed again. If the group is also complete (i.e. if all messages from a sequence have arrived or if there is no sequence defined), then the group is removed from the message store. Partial sequences can be released, in which case the next time the ReleaseStrategy is called it will be presented with a group containing marked messages (already processed) and unmarked messages (potentially a new partial sequence)."

  3. #3
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,017

    Default

    This is a change in behavior between 2.0 and 2.1. It is discussed in the migration guide.

    https://github.com/SpringSource/spri...igration-Guide

    By default, once a group (messages with the same correlation) has been marked complete, all subsequent messages for that group are sent to the discard channel (if defined).

    To revert to the previous behavior, you have to set expire-groups-upon-completion="true" on the aggregator...

    Code:
    <int:aggregator id="ag" 
    	input-channel="channel1" ref="agm" method="aggregate" 
    	output-channel="channel2"
    	send-partial-result-on-expiry="true"
    	correlation-strategy="agm" correlation-strategy-method="correlate"
    	release-strategy="agm" release-strategy-method="canRelease" 
    	expire-groups-upon-completion="true" />
    This removes the group and allows a new group to be created the next time a message with the expired correlation arrives.

    The notion of marked/unmarked messages no longer exists but it appears the documentation was not updated. Please open a 'Documentation' JIRA issue for that; thanks. https://jira.springsource.org/browse/INT
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  4. #4
    Join Date
    Aug 2009
    Location
    Lima, Perú
    Posts
    38

    Default

    Thank you Gary.

    https://jira.springsource.org/browse/INT-2832

    One more question: with the new behavior, in order to discard such subsequent messages, the aggregator will have to store every past correlation key? this could exhaust the JVM memory or I'm not understanding at all?

  5. #5
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,017

    Default

    No, you are correct - I was expecting that configuring a MessageGroupStoreReaper would remove these "empty" groups but it does not.

    In other words, you should be able to configure a MGSR to run every, say, 30 mins and clean up these released groups - picking an expiry time long enough so that you can be confident no new messages would arrive in that time and the reaper should remove the group.

    Code:
     	<int:aggregator id="ag" 
    		input-channel="channel1" ref="agm" method="aggregate" 
    		output-channel="channel2" 
    		send-partial-result-on-expiry="true" discard-channel="discard"
    		correlation-strategy="agm" correlation-strategy-method="correlate"
    		release-strategy="agm" release-strategy-method="canRelease" 
    		expire-groups-upon-completion="false" 
    		message-store="ms" />
    
      	<bean id="ms" class="org.springframework.integration.store.SimpleMessageStore" />
      	
      	<bean id="reaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
      		<property name="messageGroupStore" ref="ms" />
      		<property name="timeout" value="10000" />
      	</bean>
      	
      	<task:scheduled-tasks>
      		<task:scheduled ref="reaper" method="run" fixed-rate="15000" />
      	</task:scheduled-tasks>
    However, I notice that the reaper only expires groups that actually contain messages.

    This is a bug. I will open another JIRA (unless you beat me to it
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  6. #6
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,017

    Default

    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  7. #7
    Join Date
    Aug 2009
    Location
    Lima, Perú
    Posts
    38

    Default

    Wow , that explains another of my issues. IMHO the expire-groups-upon-completion="true" should be the default. BTW I don't like the aggregator's new role of remembering such imprecise number of "recent past messages" (maybe it could be more useful with a new "remember()" strategy method). Anyway, with the new behavior the MGSR should be highly recommended (or mandatory) to prevent memory exhaustion.

    Thanks a lot for the support. I'm using the aggregator for some time and really helped in my work.

  8. #8
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,017

    Default

    Yes; we were talking about this today and, on reflection, it would have been better to default to TRUE. But, unfortunately, 2.1 has been out for nearly a year and 2.2 is about to be released so we couldn't make such a change right now. We might consider it in 3.0.

    The main driver for the change was to resolve a number of issues, including the proper handling of late-arriving messages and a (previously) dysfunctional resequencer which used the same code base.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •