In the simplest case (where you would be dealing with messages only) you wouldn't have to specify a completion strategy. Spring Integration automatically tags the messages that are produced by the splitter so that they can be aggregated.
A sample that works for me:
Code:
MessageChannel inputChannel = channelRegistry.lookupChannel("input");
MessageChannel outputChannel = channelRegistry
.lookupChannel("aggregated");
assertNotNull(inputChannel);
assertNotNull(outputChannel);
inputChannel.send(new GenericMessage<String>("aap noot mies"));
assertEquals("aap noot mies ", outputChannel.receive().getPayload());
Code:
<si:message-bus auto-create-channels="true" />
<si:annotation-driven/>
<si:channel id="aggregated"/>
<bean id="aggregator" class="org.springframework.integration.config.namespaceTests.TestAggregator"/>
<bean id="splitter" class="org.springframework.integration.config.namespaceTests.TestSplitter"/>
Code:
@MessageEndpoint(input = "input", output="split")
public class TestSplitter {
@Splitter
public List<Message<String>> name(Message<String> m) {
String[] split = m.getPayload().split(" ");
ArrayList<Message<String>> list = new ArrayList<Message<String>>();
for (String string : split) {
list.add(new GenericMessage<String>(string));
}
return list;
}
}
Code:
@MessageEndpoint(input = "split")
public class TestAggregator {
@Aggregator(defaultReplyChannel = "aggregated")
public Message<String> name(List<Message<String>> messages) {
StringBuffer buf = new StringBuffer();
for (Message<String> message : messages) {
buf.append(message.getPayload()+" ");
}
return new GenericMessage<String>(buf.toString());
}
}