Results 1 to 5 of 5

Thread: Grouping with Aggregator only releases first group

  1. #1

    Default Grouping with Aggregator only releases first group

    I've been trying to reproduce the behaviour shown at this github page:
    https://github.com/olegz/s12gx.2011/...ion/aggregator

    However, I see only one group released when debugging and in the log.

    Code:
    <channel id="inboundBatchingChannel" />
    
        <aggregator input-channel="inboundBatchingChannel"
                    output-channel="batchChannel"
                    expire-groups-upon-completion="false"
                    send-partial-result-on-expiry="true"
                    message-store="batchMessageStore"
                    release-strategy-expression="size() == 10"
                />
    
        <channel id="batchChannel">
            <interceptors>
                <wire-tap channel="aggregatedBatchLogger"/>
            </interceptors>
        </channel>
    
        <logging-channel-adapter id="aggregatedBatchLogger"
                                 log-full-message="true"
                                 level="DEBUG" logger-name="AGGREGATED-BATCHES-LOGGER"  />
        
        <beans:bean id="reaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
            <beans:property name="messageGroupStore" ref="messageStore"/>
            <beans:property name="timeout" value="5000"/>
        </beans:bean>
    
        <task:scheduled-tasks scheduler="scheduler">
            <task:scheduled ref="reaper" method="run" fixed-rate="100"/>
        </task:scheduled-tasks>
    
        <task:scheduler id="scheduler"/>
    I have 25 items being written to the inbound channel. I'm hoping for 3 groups to be created by the aggregator, but I only see one.

  2. #2
    Join Date
    Jan 2009
    Location
    Ukraine, Kharkov
    Posts
    646

    Default

    Hello!
    I'm hoping for 3 groups to be created
    Messages are grouped by CorrelationStrategy. In your case it is a default one:
    Code:
    new HeaderAttributeCorrelationStrategy(MessageHeaders.CORRELATION_ID)
    From other side you tell to the aggregator:
    expire-groups-upon-completion="false"
    send-partial-result-on-expiry="true"
    release-strategy-expression="size() == 10"
    So, it creates a MessageGroup based on CORRELATION_ID. And if all your 25 inbound Messages have the same CORRELATION_ID, the aggregator creates the group on first Message and it will by only one group in the MessageStore for these 25 Messages. Further it releases Messages on expiry by reaper as a list of 10 and only once. And, of course, never removes this group, because expire-groups-upon-completion="false". In this case the MessageGroup will be marked as "completed" and won't allow any new Message. It is a scenario "Late arrival" .
    There is a discardChannel on the AbstractCorrelatingMessageHandler. It allows to you to handle "Late arrival" Messages.
    It's about theory.

    And now a question: what do you want to achieve?

    Take care,
    Artem

  3. #3

    Default

    I want to process the objects 10 at a time, and then process whatever is left over at the end. In this case there would be 2 groups of 10 and one group of 5. (25 messages overall)

    The example I cited claims that this can be done and uses the same correlation id throughout. However, I found it doesn't work.

    So, I created a separate correlationId per group and now it works well. (ie. For each set of 10 increment the correlationId ) - This way the aggregator releases each set of 10 and the reaper releases the group of 5 at the end.

    If there is another way I'd be interested to hear about it.

    Thanks,
    Matt

  4. #4
    Join Date
    Jan 2009
    Location
    Ukraine, Kharkov
    Posts
    646

    Default

    expire-groups-upon-completion="true" should fix your wishes with the same correlationId

  5. #5

    Default

    Yes, it works now. This is very good. Thanks so much for the help.

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •