Results 1 to 3 of 3

Thread: Using Aggregator with Completion Strategy

  1. #1

    Default Using Aggregator with Completion Strategy

    I am trying to use Aggregator annotation with the Completion strategy . The documentation on these two annotations is rather thin.

    Here is what I am trying to do. I retrieve a list of objects from a repository and I try to process these objects simulataneously. Processing an object would entail some calculations and then generating an xml snippet describing the object. I use the splitter to first split the list of objects into one each to put them on a queue channel, then I have a Message End point which reads off this queue channel and then invokes a Handler which process this object . The End point class has concurrency annotation which enables multiple threads process objects off the queue.

    The problem comes on trying to aggregate the responses from the handlers. Each of the above handler , after finishing the task publishes to an output queue (as defined in the MEP). I would like add another message handler that subscribes to the above output queue aggregates alll the messages and republishes them back to the final output adapter. The aggregator should be able to use a completion strategy to figure out if all the messages ( that were expected) have been recieved and be able to publish only after the strategy returns true.

    I guess this should be possible with Spring M4 , not sure how exactly to do it. Any directions will be of great help.

    thanks
    -Satish

  2. #2
    Join Date
    May 2007
    Location
    Netherlands
    Posts
    614

    Default

    In the simplest case (where you would be dealing with messages only) you wouldn't have to specify a completion strategy. Spring Integration automatically tags the messages that are produced by the splitter so that they can be aggregated.

    A sample that works for me:
    Code:
    		MessageChannel inputChannel = channelRegistry.lookupChannel("input");
    		MessageChannel outputChannel = channelRegistry
    				.lookupChannel("aggregated");
    		assertNotNull(inputChannel);
    		assertNotNull(outputChannel);
    		inputChannel.send(new GenericMessage<String>("aap noot mies"));
    		assertEquals("aap noot mies ", outputChannel.receive().getPayload());
    Code:
    	<si:message-bus auto-create-channels="true" />
    	<si:annotation-driven/>
    	<si:channel id="aggregated"/>
    	<bean id="aggregator" class="org.springframework.integration.config.namespaceTests.TestAggregator"/>
    	<bean id="splitter" class="org.springframework.integration.config.namespaceTests.TestSplitter"/>
    Code:
    @MessageEndpoint(input = "input", output="split")
    public class TestSplitter {
    	@Splitter
    	public List<Message<String>> name(Message<String> m) {
    		String[] split = m.getPayload().split(" ");
    		ArrayList<Message<String>> list = new ArrayList<Message<String>>();
    		for (String string : split) {
    			list.add(new GenericMessage<String>(string));
    		}
    		return list;
    	}
    }
    Code:
    @MessageEndpoint(input = "split")
    public class TestAggregator {
    	@Aggregator(defaultReplyChannel = "aggregated")
    	public Message<String> name(List<Message<String>> messages) {
    		StringBuffer buf = new StringBuffer();
    		for (Message<String> message : messages) {
    			buf.append(message.getPayload()+" ");
    		}
    		return new GenericMessage<String>(buf.toString());
    	}
    }

  3. #3

    Default

    Thanks Iwen.

    I figured it out after posting the question ( after some more investigation). Glad to have some one reconfirm the findings.

    thanks
    -Satish

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •