I need to get 500 messages at a time off a message queue and then hand them off to a service activator to send them all as part of an HTTP post to an external system.
I am trying to accomplish this with an aggregator and while my solution seems to work, it does not seem ideal. The only way I have been able to get this do what I need to create my own impl of CorrelationStrategy and Completion strategy. Is this a better/easier way to accomplish this?
Cheers.
Jay
Code:<si:aggregator input-channel="channel1" completion-strategy="messageCountCompletionStrategy" send-partial-result-on-timeout="true" ref="messageCountAggregator" method="aggregate" tracked-correlation-id-capacity="10" correlation-strategy="correlationStrategy" output-channel="channel2" timeout="10000"/> <!-- just for testing --> <si:service-activator input-channel="channel2" ref="fileWriteHandler" method="handleMessage"/> <bean id="correlationStrategy" class="com.mycompany.MessageCountCorrelationStrategy"/> <bean id="messageCountCompletionStrategy" class="com.mycompany.MessageCountCompletionStrategy"> <property name="maxMessageCount" value="10"/> </bean> <bean id="messageCountAggregator" class="com.mycompany.TrackCountAggregator" />Code:public class MessageCountCorrelationStrategy implements CorrelationStrategy { private short counter = 0; private short maxCount = 10; private UUID correlationKey = UUID.randomUUID(); private Logger logger = LogUtils.getLogger(MessageCountCorrelationStrategy.class); public Object getCorrelationKey(Message<?> m) { if(++counter > maxCount) { counter = 0; correlationKey = UUID.randomUUID(); } return correlationKey.toString(); } }Code:public class MessageCountCompletionStrategy implements CompletionStrategy { private int maxMessageCount = 10; public boolean isComplete(List<Message<?>> messages) { if (CollectionUtils.isEmpty(messages)) { return false; } return messages.size() >= maxMessageCount; } public void setMaxMessageCount(int maxMessageCount) { this.maxMessageCount = maxMessageCount; } }


Reply With Quote
