Hi Everyone!
I have a little problem with my integration flow, using the Aggregator and the MessageGroupStoreReaper.
My flow, so far, is:inbound-channel-adapter -> transformer -> splitter -> aggregator [+MessageGroupStoreReaper]
I'm using a correlation-strategy-expression on the aggregator which works just fine.
The problem is, I can't implement a release-strategy, because I have no information at which point the group is complete. BUT I can savely assume that if nothing has been added to the group for a minute, it can be treated as complete.
Because of this I set up the MessageGroupStoreReaper like this:
The aggregator looks like this:Code:<bean id="messageStore" class="org.springframework.integration.store.SimpleMessageStore" /> <bean id="messageStoreReaper" class="org.springframework.integration.store.MessageGroupStoreReaper"> <property name="messageGroupStore" ref="messageStore" /> <property name="timeout" value="60000" /> </bean> <task:scheduler id="reaperScheduler" /> <task:scheduled-tasks scheduler="reaperScheduler"> <task:scheduled ref="messageStoreReaper" method="run" fixed-rate="5000"/> </task:scheduled-tasks>
(...but the release-strategy will never be fulfilled)
What I don't understand is the resulting exception (even though the aggregation/timeout works as intended):Code:<int:aggregator id="statustypeAndInvoiceAggregator" message-store="messageStore" input-channel="ch_all_splitted" output-channel="ch_all_aggregated" discard-channel="nullChannel" release-strategy-expression="#this.size() gt 1000" send-timeout="12000" send-partial-result-on-expiry="true" correlation-strategy-expression="myStrategyHere " />
Events + Stacktrace:
Can someone please point me into the right direction? Why is this happening?Code:Read in the file: ================= 2012-01-30 14:13:00,021 INFO [task-scheduler-4] org.springframework.integration.file.FileReadingMessageSource: Created message: [[Payload=c:\tmp\springint\BC1812006.txt][Headers={timestamp=1327929180021, id=58177c40-d52d-4dca-86bc-d4905df701c6}]] The messages after splitting: ============================= 2012-01-30 14:13:00,146 INFO [task-scheduler-4] org.springframework.integration.handler.LoggingHandler: [Payload=Item{<contentSnipped/>}][Headers={timestamp=1327929180146, id=ddfd87d0-b694-4886-a041-4ef6a57ed624}] 2012-01-30 14:13:00,146 INFO [task-scheduler-4] org.springframework.integration.handler.LoggingHandler: [Payload=Item{<contentSnipped/>}][Headers={timestamp=1327929180146, id=5e67dab8-e591-478e-993c-44d9de9be289}] Timeout: ======== 2012-01-30 14:14:04,865 INFO [reaperScheduler-1] org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler: Expiring MessageGroup with correlationKey[STATUSITEM_C18_2R85436] Release of the "partial result" messageGroup ============================================= 2012-01-30 14:14:04,865 INFO [reaperScheduler-1] org.springframework.integration.handler.LoggingHandler: [Payload=[Item{<contentSnipped/>}, Item{<contentSnipped/>}]] Exception!? Why? ================= 2012-01-30 14:14:05,115 ERROR [reaperScheduler-1] org.springframework.integration.store.SimpleMessageStore: Exception in expiry callback org.springframework.integration.MessageDeliveryException: Dispatcher has no subscribers. at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:108) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:101) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:61) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157) at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288) at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149) at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.sendReplyMessage(AbstractCorrelatingMessageHandler.java:342) at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.sendReplies(AbstractCorrelatingMessageHandler.java:335) at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:310) at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:295) at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.expireGroup(AbstractCorrelatingMessageHandler.java:278) at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.forceComplete(AbstractCorrelatingMessageHandler.java:232) at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.access$000(AbstractCorrelatingMessageHandler.java:63) at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler$1.execute(AbstractCorrelatingMessageHandler.java:114) at org.springframework.integration.store.AbstractMessageGroupStore.expire(AbstractMessageGroupStore.java:117) at org.springframework.integration.store.AbstractMessageGroupStore.expireMessageGroups(AbstractMessageGroupStore.java:87) at org.springframework.integration.store.MessageGroupStoreReaper.run(MessageGroupStoreReaper.java:110) </snip> 2012-01-30 14:14:05,162 ERROR [reaperScheduler-1] org.springframework.scheduling.support.MethodInvokingRunnable: Invocation of method 'run' on target class [class org.springframework.integration.store.MessageGroupStoreReaper] failed org.springframework.integration.MessageDeliveryException: Dispatcher has no subscribers. at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:108) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:101) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:61) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157) at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288) at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149) at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.sendReplyMessage(AbstractCorrelatingMessageHandler.java:342) at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.sendReplies(AbstractCorrelatingMessageHandler.java:335) at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:310) at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:295) at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.expireGroup(AbstractCorrelatingMessageHandler.java:278) at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.forceComplete(AbstractCorrelatingMessageHandler.java:232) at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.access$000(AbstractCorrelatingMessageHandler.java:63) at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler$1.execute(AbstractCorrelatingMessageHandler.java:114) at org.springframework.integration.store.AbstractMessageGroupStore.expire(AbstractMessageGroupStore.java:117) at org.springframework.integration.store.AbstractMessageGroupStore.expireMessageGroups(AbstractMessageGroupStore.java:87) at org.springframework.integration.store.MessageGroupStoreReaper.run(MessageGroupStoreReaper.java:110) </snip>
Is this the designed behaviour?


Reply With Quote
