Results 1 to 6 of 6

Thread: Aggregator release with subscribing to Publish/Subscribe channel

  1. #1

    Default Aggregator release with subscribing to Publish/Subscribe channel

    Hi All,

    I am looking for a sample/example configuration. I want to release aggregator by subscribing to a Publish/Subscribe channel.

    Thanks,

    Srinivas

  2. #2
    Join Date
    Jan 2009
    Location
    Ukraine, Kharkov
    Posts
    640

    Default

    Hi!

    Sorry, but you provide not enough info.
    Maybe you mean Scatter-Gather.
    Then take a look into my example: https://github.com/artembilan/spring...be7cf68d92c920

    Cheers,
    Artem Bilan

  3. #3

    Default

    Thanks Artem. My configuration is as below.

    <channel id="aggregator-input-channel"/>

    <publish-subscribe-channel id="aggregator-release-channel" topic="endOfTheDayNotification"/>

    <aggregator input-channel="aggregator-input-channel" output-channel="aggregator-output-channel"
    release-strategy="size() == 2" expression=" payload.custId"/>

    <service-activator input-channel="aggregator-output-channel" ref="clientProcessor" method="process"/>
    I want to release the aggregator with a notification on "aggregator-release-channel" instead of release-strategy="size() == 2"

  4. #4
    Join Date
    Jan 2009
    Location
    Ukraine, Kharkov
    Posts
    640

    Default

    No, it's still confused.
    Provide your use-case, please.
    Describe it in human words.
    There is no suitable abilities for your configuration, but maybe I can help when I get business requriments.

  5. #5

    Default

    Cleric Hi,

    My use case is - I want to aggregate all client in bound requests and correlate with the client id in payload and I want to release the aggregated messages after our end of the day process. The end of the day process is running in an another system which broadcasts a message via JMS topic.

    Thanks
    Last edited by csrinivasrao; Dec 8th, 2012 at 11:28 AM.

  6. #6
    Join Date
    Jan 2009
    Location
    Ukraine, Kharkov
    Posts
    640

    Default

    H-m-m.
    Ok, now it's look clear and, of course, as valid business task.

    I suggest you a bit different solution:
    HTML Code:
    <si:channel id="aggregator-input-channel">
    		<si:queue/>
    	</si:channel>
    
    <si:aggregator id="clientRequestsAggregator" input-channel="aggregator-input-channel"
    				   output-channel="aggregator-output-channel"
    				   correlation-expression="payload.custId"
    				   auto-startup="false"/>
    
    	<si:service-activator input-channel="aggregator-output-channel" ref="clientProcessor" method="process"/>
    
    	<jms:message-driven-channel-adapter channel="startClientRequestsAggregatorCommandChannel"
    										destination-name="endOfTheDayNotification"/>
    
    	<si:transformer input-channel="startClientRequestsAggregatorCommandChannel"
    					output-channel="controlBus"
    					expression="'@clientRequestsAggregator.start()'"/>
    	
    	<si:control-bus input-channel="controlBus"/>		
    So, in this case:
    1. All client requests will collect on the 'queue'
    2. By default aggregator doesn't do anything, because it's switched off via auto-startup="false"
    3. <jms:message-driven-channel-adapter> is listening 'endOfTheDayNotification' topic.
    4. The message from topic will travel to the <transformer> to create the 'start' command for aggragator
    5. This command will send to the <control-bus> and the last one initiates the work of aggregator.
    6. The 'clientRequestsAggregator' receives all messages from it's input channel using default release-strategy and correlates them with suggested by you 'payload.custId'

    But here you should think when to 'stop' 'clientRequestsAggregator', if your application works 24/7. Maybe you have 'startOfTheDayNotification' too ?


    From other side you can use MessageGroupStoreReaper as well:
    HTML Code:
    <bean id="clientRequestsStore" class="org.springframework.integration.store.SimpleMessageStore"/>
    	
    <si:aggregator input-channel="aggregator-input-channel"
    		   output-channel="aggregator-output-channel"
    		   correlation-expression=" payload.custId"
    		   release-strategy-expression="false"
    		   send-partial-result-on-expiry="true"
    		   message-store="clientRequestsStore"/>
    
    <si:service-activator input-channel="aggregator-output-channel" ref="clientProcessor" method="process"/>
    
    <jms:message-driven-channel-adapter channel="aggregateClientRequestsChannel" destination-name="endOfTheDayNotification"/>
         
    <si:outbound-channel-adapter id="aggregateClientRequestsChannel" expression="@clientRequestsStoreReaper.run()"/>
    
    <bean id="clientRequestsStoreReaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
    	<constructor-arg ref="clientRequestsStore"/>
    </bean>
    So, HTH and you'll get the point to start the right solution.

    For more info about Aggregator, Control Bus and MessageGroupStoreReaper:
    http://static.springsource.org/sprin...tml#aggregator
    http://static.springsource.org/sprin...ml#control-bus

Tags for this Thread

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •