H-m-m.
Ok, now it's look clear and, of course, as valid business task.
I suggest you a bit different solution:
HTML Code:
<si:channel id="aggregator-input-channel">
<si:queue/>
</si:channel>
<si:aggregator id="clientRequestsAggregator" input-channel="aggregator-input-channel"
output-channel="aggregator-output-channel"
correlation-expression="payload.custId"
auto-startup="false"/>
<si:service-activator input-channel="aggregator-output-channel" ref="clientProcessor" method="process"/>
<jms:message-driven-channel-adapter channel="startClientRequestsAggregatorCommandChannel"
destination-name="endOfTheDayNotification"/>
<si:transformer input-channel="startClientRequestsAggregatorCommandChannel"
output-channel="controlBus"
expression="'@clientRequestsAggregator.start()'"/>
<si:control-bus input-channel="controlBus"/>
So, in this case:
1. All client requests will collect on the 'queue'
2. By default aggregator doesn't do anything, because it's switched off via auto-startup="false"
3. <jms:message-driven-channel-adapter> is listening 'endOfTheDayNotification' topic.
4. The message from topic will travel to the <transformer> to create the 'start' command for aggragator
5. This command will send to the <control-bus> and the last one initiates the work of aggregator.
6. The 'clientRequestsAggregator' receives all messages from it's input channel using default release-strategy and correlates them with suggested by you 'payload.custId'
But here you should think when to 'stop' 'clientRequestsAggregator', if your application works 24/7. Maybe you have 'startOfTheDayNotification' too ?
From other side you can use MessageGroupStoreReaper as well:
HTML Code:
<bean id="clientRequestsStore" class="org.springframework.integration.store.SimpleMessageStore"/>
<si:aggregator input-channel="aggregator-input-channel"
output-channel="aggregator-output-channel"
correlation-expression=" payload.custId"
release-strategy-expression="false"
send-partial-result-on-expiry="true"
message-store="clientRequestsStore"/>
<si:service-activator input-channel="aggregator-output-channel" ref="clientProcessor" method="process"/>
<jms:message-driven-channel-adapter channel="aggregateClientRequestsChannel" destination-name="endOfTheDayNotification"/>
<si:outbound-channel-adapter id="aggregateClientRequestsChannel" expression="@clientRequestsStoreReaper.run()"/>
<bean id="clientRequestsStoreReaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
<constructor-arg ref="clientRequestsStore"/>
</bean>
So, HTH and you'll get the point to start the right solution.
For more info about Aggregator, Control Bus and MessageGroupStoreReaper:
http://static.springsource.org/sprin...tml#aggregator
http://static.springsource.org/sprin...ml#control-bus