Hi All,
I am looking for a sample/example configuration. I want to release aggregator by subscribing to a Publish/Subscribe channel.
Thanks,
Srinivas
Hi All,
I am looking for a sample/example configuration. I want to release aggregator by subscribing to a Publish/Subscribe channel.
Thanks,
Srinivas
Hi!
Sorry, but you provide not enough info.
Maybe you mean Scatter-Gather.
Then take a look into my example: https://github.com/artembilan/spring...be7cf68d92c920
Cheers,
Artem Bilan
Thanks Artem. My configuration is as below.
I want to release the aggregator with a notification on "aggregator-release-channel" instead of release-strategy="size() == 2"Quote:
<channel id="aggregator-input-channel"/>
<publish-subscribe-channel id="aggregator-release-channel" topic="endOfTheDayNotification"/>
<aggregator input-channel="aggregator-input-channel" output-channel="aggregator-output-channel"
release-strategy="size() == 2" expression=" payload.custId"/>
<service-activator input-channel="aggregator-output-channel" ref="clientProcessor" method="process"/>
No, it's still confused.
Provide your use-case, please.
Describe it in human words.
There is no suitable abilities for your configuration, but maybe I can help when I get business requriments.
Cleric Hi,
My use case is - I want to aggregate all client in bound requests and correlate with the client id in payload and I want to release the aggregated messages after our end of the day process. The end of the day process is running in an another system which broadcasts a message via JMS topic.
Thanks
H-m-m.
Ok, now it's look clear and, of course, as valid business task.
I suggest you a bit different solution:
So, in this case:HTML Code:<si:channel id="aggregator-input-channel">
<si:queue/>
</si:channel>
<si:aggregator id="clientRequestsAggregator" input-channel="aggregator-input-channel"
output-channel="aggregator-output-channel"
correlation-expression="payload.custId"
auto-startup="false"/>
<si:service-activator input-channel="aggregator-output-channel" ref="clientProcessor" method="process"/>
<jms:message-driven-channel-adapter channel="startClientRequestsAggregatorCommandChannel"
destination-name="endOfTheDayNotification"/>
<si:transformer input-channel="startClientRequestsAggregatorCommandChannel"
output-channel="controlBus"
expression="'@clientRequestsAggregator.start()'"/>
<si:control-bus input-channel="controlBus"/>
1. All client requests will collect on the 'queue'
2. By default aggregator doesn't do anything, because it's switched off via auto-startup="false"
3. <jms:message-driven-channel-adapter> is listening 'endOfTheDayNotification' topic.
4. The message from topic will travel to the <transformer> to create the 'start' command for aggragator
5. This command will send to the <control-bus> and the last one initiates the work of aggregator.
6. The 'clientRequestsAggregator' receives all messages from it's input channel using default release-strategy and correlates them with suggested by you 'payload.custId'
But here you should think when to 'stop' 'clientRequestsAggregator', if your application works 24/7. Maybe you have 'startOfTheDayNotification' too ?
From other side you can use MessageGroupStoreReaper as well:
So, HTH and you'll get the point to start the right solution.HTML Code:<bean id="clientRequestsStore" class="org.springframework.integration.store.SimpleMessageStore"/>
<si:aggregator input-channel="aggregator-input-channel"
output-channel="aggregator-output-channel"
correlation-expression=" payload.custId"
release-strategy-expression="false"
send-partial-result-on-expiry="true"
message-store="clientRequestsStore"/>
<si:service-activator input-channel="aggregator-output-channel" ref="clientProcessor" method="process"/>
<jms:message-driven-channel-adapter channel="aggregateClientRequestsChannel" destination-name="endOfTheDayNotification"/>
<si:outbound-channel-adapter id="aggregateClientRequestsChannel" expression="@clientRequestsStoreReaper.run()"/>
<bean id="clientRequestsStoreReaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
<constructor-arg ref="clientRequestsStore"/>
</bean>
For more info about Aggregator, Control Bus and MessageGroupStoreReaper:
http://static.springsource.org/sprin...tml#aggregator
http://static.springsource.org/sprin...ml#control-bus