Results 1 to 3 of 3

Thread: Aggregator + MessageGroupStoreReaper: Exception in expiry callback

  1. #1
    Join Date
    Jan 2012
    Posts
    4

    Default [resolved] Aggregator + MessageGroupStoreReaper: Exception in expiry callback

    Hi Everyone!

    I have a little problem with my integration flow, using the Aggregator and the MessageGroupStoreReaper.
    My flow, so far, is:inbound-channel-adapter -> transformer -> splitter -> aggregator [+MessageGroupStoreReaper]

    I'm using a correlation-strategy-expression on the aggregator which works just fine.
    The problem is, I can't implement a release-strategy, because I have no information at which point the group is complete. BUT I can savely assume that if nothing has been added to the group for a minute, it can be treated as complete.
    Because of this I set up the MessageGroupStoreReaper like this:

    Code:
        <bean id="messageStore" class="org.springframework.integration.store.SimpleMessageStore" />	
        <bean id="messageStoreReaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
            <property name="messageGroupStore" ref="messageStore" />
            <property name="timeout" value="60000" />
        </bean>
        <task:scheduler id="reaperScheduler" />	
        <task:scheduled-tasks scheduler="reaperScheduler">
            <task:scheduled ref="messageStoreReaper" method="run" fixed-rate="5000"/>
        </task:scheduled-tasks>
    The aggregator looks like this:
    (...but the release-strategy will never be fulfilled)
    Code:
    <int:aggregator id="statustypeAndInvoiceAggregator"
                        message-store="messageStore"
                        input-channel="ch_all_splitted"
                        output-channel="ch_all_aggregated"
                        discard-channel="nullChannel"
                        release-strategy-expression="#this.size() gt 1000"
                        send-timeout="12000"
                        send-partial-result-on-expiry="true"                    
                        correlation-strategy-expression="myStrategyHere " />
    What I don't understand is the resulting exception (even though the aggregation/timeout works as intended):

    Events + Stacktrace:
    Code:
    Read in the file:
    =================
    2012-01-30 14:13:00,021 INFO  [task-scheduler-4] org.springframework.integration.file.FileReadingMessageSource: Created message: [[Payload=c:\tmp\springint\BC1812006.txt][Headers={timestamp=1327929180021, id=58177c40-d52d-4dca-86bc-d4905df701c6}]]
    
    The messages after splitting:
    =============================
    2012-01-30 14:13:00,146 INFO  [task-scheduler-4] org.springframework.integration.handler.LoggingHandler: [Payload=Item{<contentSnipped/>}][Headers={timestamp=1327929180146, id=ddfd87d0-b694-4886-a041-4ef6a57ed624}]
    2012-01-30 14:13:00,146 INFO  [task-scheduler-4] org.springframework.integration.handler.LoggingHandler: [Payload=Item{<contentSnipped/>}][Headers={timestamp=1327929180146, id=5e67dab8-e591-478e-993c-44d9de9be289}]
    
    Timeout:
    ========
    2012-01-30 14:14:04,865 INFO  [reaperScheduler-1] org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler: Expiring MessageGroup with correlationKey[STATUSITEM_C18_2R85436]
    
    Release of the "partial result" messageGroup
    =============================================
    2012-01-30 14:14:04,865 INFO  [reaperScheduler-1] org.springframework.integration.handler.LoggingHandler: [Payload=[Item{<contentSnipped/>}, Item{<contentSnipped/>}]]
    
    Exception!? Why?
    =================
    2012-01-30 14:14:05,115 ERROR [reaperScheduler-1] org.springframework.integration.store.SimpleMessageStore: Exception in expiry callback
    org.springframework.integration.MessageDeliveryException: Dispatcher has no subscribers.
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:108)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:101)
    	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:61)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
    	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
    	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
    	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.sendReplyMessage(AbstractCorrelatingMessageHandler.java:342)
    	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.sendReplies(AbstractCorrelatingMessageHandler.java:335)
    	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:310)
    	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:295)
    	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.expireGroup(AbstractCorrelatingMessageHandler.java:278)
    	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.forceComplete(AbstractCorrelatingMessageHandler.java:232)
    	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.access$000(AbstractCorrelatingMessageHandler.java:63)
    	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler$1.execute(AbstractCorrelatingMessageHandler.java:114)
    	at org.springframework.integration.store.AbstractMessageGroupStore.expire(AbstractMessageGroupStore.java:117)
    	at org.springframework.integration.store.AbstractMessageGroupStore.expireMessageGroups(AbstractMessageGroupStore.java:87)
    	at org.springframework.integration.store.MessageGroupStoreReaper.run(MessageGroupStoreReaper.java:110)
    	</snip>
    2012-01-30 14:14:05,162 ERROR [reaperScheduler-1] org.springframework.scheduling.support.MethodInvokingRunnable: Invocation of method 'run' on target class [class org.springframework.integration.store.MessageGroupStoreReaper] failed
    org.springframework.integration.MessageDeliveryException: Dispatcher has no subscribers.
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:108)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:101)
    	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:61)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
    	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
    	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
    	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.sendReplyMessage(AbstractCorrelatingMessageHandler.java:342)
    	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.sendReplies(AbstractCorrelatingMessageHandler.java:335)
    	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:310)
    	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:295)
    	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.expireGroup(AbstractCorrelatingMessageHandler.java:278)
    	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.forceComplete(AbstractCorrelatingMessageHandler.java:232)
    	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.access$000(AbstractCorrelatingMessageHandler.java:63)
    	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler$1.execute(AbstractCorrelatingMessageHandler.java:114)
    	at org.springframework.integration.store.AbstractMessageGroupStore.expire(AbstractMessageGroupStore.java:117)
    	at org.springframework.integration.store.AbstractMessageGroupStore.expireMessageGroups(AbstractMessageGroupStore.java:87)
    	at org.springframework.integration.store.MessageGroupStoreReaper.run(MessageGroupStoreReaper.java:110)
    	</snip>
    Can someone please point me into the right direction? Why is this happening?
    Is this the designed behaviour?
    Last edited by joerg_s; Jan 30th, 2012 at 08:56 AM.

  2. #2
    Join Date
    Jan 2009
    Location
    Ukraine, Kharkov
    Posts
    645

    Default

    Hello

    Does your ch_all_aggregated have any subscribers?

    Artem Bilan

  3. #3
    Join Date
    Jan 2012
    Posts
    4

    Default

    Hi Artem!

    Thanks for your fast response!

    ...omg, I wasn't able to see the wood for trees.
    It was indeed a missing subscriber.
    After adding a dummy outbound-channel-adapter it works like a charm.
    I'm feeling stupid now, haha.


    [edit] is there a way I can mark this thread as resolved?

Tags for this Thread

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •