I'm using Spring Integration to build a high-volume customer email merge program for my organization. I have need of the splitter - filter - aggregator pattern mentioned in the title.
Specifically, when the system is performing email previews, I want to aggregate the previous split, which may have had some results filtered out (due to ineligibility, opt-out, system error, etc), and select a small random sample of the message group to send to a preview recipient.
Because the default behavior for the aggregator is to wait until the sequence size has been reached before releasing, I have taken the sub-optimal solution of timing out the message store after 10 seconds. This is sub-optimal because it reduces the apparent responsiveness of the system, and it risks not aggregating the complete message group, which results in the preview samples doubling, tripling, etc. in size.
My initial line of thinking is to use a shared AtomicInteger in the message header, and intercept the filtered-out messages to decrement this value. However, I'm concerned that this could cause a race condition with the aggregator. Further, it significantly complicates marshalling/unmarshalling across JMS-backed channels.
Is there a way to accomplish this simply in Spring Integration?