Results 1 to 8 of 8

Thread: Aggregator Question

  1. #1

    Default Aggregator Question

    I need to get 500 messages at a time off a message queue and then hand them off to a service activator to send them all as part of an HTTP post to an external system.

    I am trying to accomplish this with an aggregator and while my solution seems to work, it does not seem ideal. The only way I have been able to get this do what I need to create my own impl of CorrelationStrategy and Completion strategy. Is this a better/easier way to accomplish this?

    Cheers.

    Jay

    Code:
    <si:aggregator input-channel="channel1" completion-strategy="messageCountCompletionStrategy"
    	send-partial-result-on-timeout="true" ref="messageCountAggregator" method="aggregate"  tracked-correlation-id-capacity="10"
    	correlation-strategy="correlationStrategy" output-channel="channel2" timeout="10000"/>
    
    <!-- just for testing -->
    <si:service-activator input-channel="channel2" ref="fileWriteHandler" method="handleMessage"/>
    
    <bean id="correlationStrategy" class="com.mycompany.MessageCountCorrelationStrategy"/>
    
    <bean id="messageCountCompletionStrategy" class="com.mycompany.MessageCountCompletionStrategy">
    	<property name="maxMessageCount" value="10"/>
    </bean>
    
    <bean id="messageCountAggregator" class="com.mycompany.TrackCountAggregator" />
    Code:
    public class MessageCountCorrelationStrategy implements CorrelationStrategy {
    
    	private short counter = 0;
    	private short maxCount = 10;
    	private UUID correlationKey = UUID.randomUUID();
    	private Logger logger = LogUtils.getLogger(MessageCountCorrelationStrategy.class);
    	
    	public Object getCorrelationKey(Message<?> m) {
    
    		if(++counter > maxCount) {
    			counter = 0;
    			correlationKey = UUID.randomUUID();
    		}
    		return correlationKey.toString();
    	}
    
    }
    Code:
    public class MessageCountCompletionStrategy implements CompletionStrategy {
    
    	private int maxMessageCount = 10;
    	
    	public boolean isComplete(List<Message<?>> messages) {
    		if (CollectionUtils.isEmpty(messages)) {
    			return false;
    		}
    		return messages.size() >= maxMessageCount;
    
    	}
    
    	public void setMaxMessageCount(int maxMessageCount) {
    		this.maxMessageCount = maxMessageCount;
    	}
    
    }

  2. #2
    Join Date
    May 2007
    Location
    Netherlands
    Posts
    614

    Default

    You should make your strategies thread safe (use Atomic* classes for mutable counters for example).

    You could rely on the default strategies and set the correllation id and sequence* headers on the incoming messages. Also I think you don't need to keep track of the count in both correllation and completion strategy. Basically you want to correllate any messages that come in and complete when you have a certain number right?

  3. #3

    Default

    Thanks for the response Iwein. Yes, my use case is to correlate X number of messages and complete when X is reach or when Y timeout occurs.

    I looked at the default impl correlation and completion strategies. Since my messge are being sourced from a message queue I didnt see a good means to have to correlationId and seuqence set. Is this something that I could add with a transformer? HeaderEnricher?

    What would happen if Y timeout occurs before X number of message were correlated? It looks like the aggreator will send partial results (sendPartialResultsOnTimeout), but what happens to rest of correlated messages after parital is sent?

    Thanks again. Cheers.

  4. #4
    Join Date
    May 2007
    Location
    Netherlands
    Posts
    614

    Default

    How about you set the correllationId to 1 on all messages and have a CompletionStrategy that ignores the sequence and just counts. That way you would aggregate X unless timout Y happens first. Just put the header-enricher in front of the aggregator in a chain.

    Please post back your code when you have it working, I haven't tried it myself yet

  5. #5

    Default

    This was my original idea, but it appears messages end up getting discarded because the correlationId has already been processed. It seems to keep track of a number correlationIds based on tracked-correlation-id-capacity attribute on aggregator config but this value cannot be set to zero. So after the first group is processed it always results in the rest of the messages are sent to discard channel.

    This is why I ended up keeping count in correlation strategy and completion strategy.

    2009-05-13 12:37:39,535 DEBUG [main] (AbstractMessageBarrierHandler.java:245) - Handling of Message group with correlationKey '1' has already completed or timed out

  6. #6
    Join Date
    May 2007
    Location
    Netherlands
    Posts
    614

    Default

    Hmm, that's not very useful it seems. If you agree with me that that shouldn't have happened you can create a bug report for it here: http://jira.springframework.org/browse/INT

  7. #7
    Join Date
    May 2007
    Location
    Netherlands
    Posts
    614

    Default

    It appears that you should link that issue to INT-604 too.

  8. #8

    Default

    Thanks again Iwein. Issue created http://jira.springframework.org/browse/INT-653

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •