Results 1 to 5 of 5

Thread: Unable to restart DefaultMessageListener after stopping via stop callback

  1. #1
    Join Date
    Jul 2010
    Posts
    10

    Default Unable to restart DefaultMessageListener after stopping via stop callback

    Hi All,

    I am developing an application which uses Spring's DefaultMessageListenerContainer to implement a MDP. As part of this application I have to implement a task which will run on a schedule (every x minutes) which I am implementing via Springs scheduledTasks support:

    Code:
    <task:scheduled-tasks scheduler="heartbeatScheduler">
        <task:scheduled ref="heartbeatService" method="processHeartbeat" cron="${heartbeat.cron}"/>
    </task:scheduled-tasks>
    
     <task:scheduler id="heartbeatScheduler" pool-size="1"/>
    Before this code executes it needs to stop/pause the JMS Listener container to avoid a potential race condition. As such I have attempted to implement this via the SmartLifecycle stop callback e.g:

    Code:
    messageListenerContainer.stop(new Runnable() {
    
                @Override
                public void run() {
                    LOGGER.info("JMS Container stopped executing task");
                    performTask();
                    LOGGER.info("Task complete starting JMS Container");
                    messageListenerContainer.start();
                }
    });
    The issue however is that although the messageListenerContainer.start(); is called and the container reports that it is both active and running, from checking the activemq console the JMS connection is never reinstated.

    If however I stop the container via the stop() callback and poll the container until the activeConsumerCount == 0 and then restart, the JMS Connection is reinstated correctly.

    My message listener container config is as follows:

    Code:
    <bean id="adapterListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="jmsConnectionFactory"/>
            <property name="destinationName" value="${inbound.destination}" />
            <property name="messageListener" ref="messageListener" />
            <property name="transactionManager" ref="jmsTransactionManager" />
            <property name="maxConcurrentConsumers" value="${max.concurrent.consumers}" />
    </bean>
    Any help you could provide would be much appreciated.
    Last edited by pbray; Jun 21st, 2012 at 11:51 PM.

  2. #2
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,020

    Default

    I suggest you run with TRACE level logging and take a look at the log. Attach it it here if you can't figure out what's going on.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  3. #3
    Join Date
    Jul 2010
    Posts
    10

    Default

    Hi Gary,

    Thanks for your quick response. As suggested I have enabled Trace level logging for Spring as follows:

    Code:
    log4j.category.org.springframework =TRACE
    Unfortunately this has not provided me with any new insights as to why the container is not re-establishing the connection.

    Log output is included below:

    Code:
    2012-07-02 14:00:38,772 [main] INFO  org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService  'heartbeatScheduler'
    2012-07-02 14:00:38,772 [main] DEBUG org.springframework.beans.factory.support.DefaultListableBeanFactory - Finished creating instance of bean 'heartbeatScheduler'
    2012-07-02 14:00:38,772 [main] DEBUG org.springframework.beans.factory.support.DefaultListableBeanFactory - Returning cached instance of singleton bean 'org.springframework.scheduling.support.ScheduledMethodRunnable#0'
    2012-07-02 14:00:38,788 [main] INFO  org.springframework.context.support.DefaultLifecycleProcessor - Starting beans in phase 2147483647
    2012-07-02 14:00:38,788 [main] DEBUG org.springframework.context.support.DefaultLifecycleProcessor - Starting bean 'adapterListener' of type [class org.springframework.jms.listener.DefaultMessageListenerContainer]
    2012-07-02 14:00:38,960 [main] DEBUG org.springframework.jms.listener.DefaultMessageListenerContainer - Established shared JMS Connection
    2012-07-02 14:00:38,960 [main] DEBUG org.springframework.jms.listener.DefaultMessageListenerContainer - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@16fcc4
    2012-07-02 14:00:39,006 [main] DEBUG org.springframework.context.support.DefaultLifecycleProcessor - Successfully started bean 'adapterListener'
    2012-07-02 14:00:58,477 [adapterListener-1] DEBUG org.springframework.jms.connection.JmsTransactionManager - Creating new transaction with name [adapterListener]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
    2012-07-02 14:00:58,477 [adapterListener-1] DEBUG org.springframework.jms.connection.JmsTransactionManager - Created JMS transaction on Session [ActiveMQSession {id=ID:w5c260a886515-2436-1341201638850-1:21:1,started=false}] from Connection [ActiveMQConnection {id=ID:w5c260a886515-2436-1341201638850-1:21,clientId=ID:w5c260a886515-2436-1341201638850-0:21,started=false}]
    2012-07-02 14:00:58,477 [adapterListener-1] TRACE org.springframework.transaction.support.TransactionSynchronizationManager - Bound value [org.springframework.jms.connection.JmsResourceHolder@17ccb2f] for key [org.apache.activemq.ActiveMQConnectionFactory@f49e8f] to thread [adapterListener-1]
    2012-07-02 14:00:58,477 [adapterListener-1] TRACE org.springframework.transaction.support.TransactionSynchronizationManager - Retrieved value [org.springframework.jms.connection.JmsResourceHolder@17ccb2f] for key [org.apache.activemq.ActiveMQConnectionFactory@f49e8f] bound to thread [adapterListener-1]
    2012-07-02 14:00:59,492 [adapterListener-1] TRACE org.springframework.jms.listener.DefaultMessageListenerContainer - Consumer [ActiveMQMessageConsumer { value=ID:w5c260a886515-2436-1341201638850-1:21:1:1, started=true }] of transactional session [ActiveMQSession {id=ID:w5c260a886515-2436-1341201638850-1:21:1,started=true}] did not receive a message
    2012-07-02 14:00:59,492 [adapterListener-1] DEBUG org.springframework.jms.connection.JmsTransactionManager - Initiating transaction commit
    2012-07-02 14:00:59,492 [adapterListener-1] DEBUG org.springframework.jms.connection.JmsTransactionManager - Committing JMS transaction on Session [ActiveMQSession {id=ID:w5c260a886515-2436-1341201638850-1:21:1,started=true}]
    2012-07-02 14:00:59,492 [adapterListener-1] TRACE org.springframework.transaction.support.TransactionSynchronizationManager - Removed value [org.springframework.jms.connection.JmsResourceHolder@17ccb2f] for key [org.apache.activemq.ActiveMQConnectionFactory@f49e8f] from thread [adapterListener-1]
    2012-07-02 14:00:59,523 [adapterListener-1] DEBUG org.springframework.jms.connection.JmsTransactionManager - Creating new transaction with name [adapterListener]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
    2012-07-02 14:00:59,523 [adapterListener-1] DEBUG org.springframework.jms.connection.JmsTransactionManager - Created JMS transaction on Session [ActiveMQSession {id=ID:w5c260a886515-2436-1341201638850-1:22:1,started=false}] from Connection [ActiveMQConnection {id=ID:w5c260a886515-2436-1341201638850-1:22,clientId=ID:w5c260a886515-2436-1341201638850-0:22,started=false}]
    2012-07-02 14:00:59,523 [adapterListener-1] TRACE org.springframework.transaction.support.TransactionSynchronizationManager - Bound value [org.springframework.jms.connection.JmsResourceHolder@1768b0a] for key [org.apache.activemq.ActiveMQConnectionFactory@f49e8f] to thread [adapterListener-1]
    2012-07-02 14:00:59,523 [adapterListener-1] TRACE org.springframework.transaction.support.TransactionSynchronizationManager - Retrieved value [org.springframework.jms.connection.JmsResourceHolder@1768b0a] for key [org.apache.activemq.ActiveMQConnectionFactory@f49e8f] bound to thread [adapterListener-1]
    2012-07-02 14:01:00,538 [adapterListener-1] TRACE org.springframework.jms.listener.DefaultMessageListenerContainer - Consumer [ActiveMQMessageConsumer { value=ID:w5c260a886515-2436-1341201638850-1:22:1:1, started=true }] of transactional session [ActiveMQSession {id=ID:w5c260a886515-2436-1341201638850-1:22:1,started=true}] did not receive a message
    2012-07-02 14:01:00,538 [adapterListener-1] DEBUG org.springframework.jms.connection.JmsTransactionManager - Initiating transaction commit
    2012-07-02 14:01:00,538 [adapterListener-1] DEBUG org.springframework.jms.connection.JmsTransactionManager - Committing JMS transaction on Session [ActiveMQSession {id=ID:w5c260a886515-2436-1341201638850-1:22:1,started=true}]
    2012-07-02 14:01:00,538 [adapterListener-1] TRACE org.springframework.transaction.support.TransactionSynchronizationManager - Removed value [org.springframework.jms.connection.JmsResourceHolder@1768b0a] for key [org.apache.activemq.ActiveMQConnectionFactory@f49e8f] from thread [adapterListener-1]
    2012-07-02 14:01:00,569 [adapterListener-1] INFO  HeartbeatService - JMS Container stopped executing task
    2012-07-02 14:01:01,115 [adapterListener-1] INFO  HeartbeatService - Task complete starting JMS Container
    As you can see after the HeartbeatService Runnable completes execution there is no further output.

    Any further assistance you could provide would be much appreciated.

    Thanks,
    Patrick Bray

  4. #4
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,020

    Default

    I am confused; you seem to be trying to stop/start the container on one of its own threads...

    2012-07-02 14:01:00,569 [adapterListener-1] INFO HeartbeatService - JMS Container stopped executing task
    2012-07-02 14:01:01,115 [adapterListener-1] INFO HeartbeatService - Task complete starting JMS Container

    ...yet I didn't see an inbound message; shouldn't we see the stop() running on a heartbeatScheduler thread?
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  5. #5
    Join Date
    Jul 2010
    Posts
    10

    Default

    Hi Gary,

    Once again thanks for your quick response. You are absolutely right the issue I was having was due to the stop callback running under the JMS listener thread and hence being unable to restart the container. Modifying the stop callback as follows has resolved my issue and the connection is re-established successfully:

    Code:
    messageListenerContainer.stop(new Runnable() {
    
                @Override
                public void run() {
                    taskExecutor.execute(new HeartbeatWorker());
                }
    });
    Code:
        private final class HeartbeatWorker implements Runnable {
    
            @Override
            public void run() {
                try {
                    LOGGER.info("JMS Container stopped executing task");
                    performTask();
                }
                finally {
                    LOGGER.info("Task complete starting JMS Container");
                    messageListenerContainer.start();
                }
            }
        }
    Code:
    2012-07-03 10:32:00,129 [heartbeatScheduler-1] INFO  WSHeartbeatServiceNonPolling - Processing heartbeat
    2012-07-03 10:32:01,113 [adapterListener-1] TRACE org.springframework.jms.listener.DefaultMessageListenerContainer - Consumer [ActiveMQMessageConsumer { value=ID:w5c260a886515-1913-1341274679580-1:814:1:1, started=true }] of transactional session [ActiveMQSession {id=ID:w5c260a886515-1913-1341274679580-1:814:1,started=true}] did not receive a message
    2012-07-03 10:32:01,113 [adapterListener-1] DEBUG org.springframework.jms.connection.JmsTransactionManager - Initiating transaction commit
    2012-07-03 10:32:01,113 [adapterListener-1] DEBUG org.springframework.jms.connection.JmsTransactionManager - Committing JMS transaction on Session [ActiveMQSession {id=ID:w5c260a886515-1913-1341274679580-1:814:1,started=true}]
    2012-07-03 10:32:01,113 [heartbeatScheduler-1] INFO  WSHeartbeatServiceNonPolling - JMS Container stopped executing task
    2012-07-03 10:32:01,113 [heartbeatScheduler-1] INFO  WSHeartbeatServiceNonPolling - Sending heartbeat request
    2012-07-03 10:32:01,144 [heartbeatScheduler-1] INFO  WSHeartbeatServiceNonPolling - Heartbeat processing complete 0 results returned
    2012-07-03 10:32:01,144 [heartbeatScheduler-1] INFO  WSHeartbeatServiceNonPolling - Task complete starting JMS Container
    2012-07-03 10:32:01,144 [adapterListener-1] DEBUG org.springframework.jms.connection.JmsTransactionManager - Creating new transaction with name [adapterListener]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
    2012-07-03 10:32:01,144 [adapterListener-1] DEBUG org.springframework.jms.connection.JmsTransactionManager - Created JMS transaction on Session [ActiveMQSession {id=ID:w5c260a886515-1913-1341274679580-1:815:1,started=false}] from Connection [ActiveMQConnection {id=ID:w5c260a886515-1913-1341274679580-1:815,clientId=ID:w5c260a886515-1913-1341274679580-0:815,started=false}]
    2012-07-03 10:32:02,144 [adapterListener-1] TRACE org.springframework.jms.listener.DefaultMessageListenerContainer - Consumer [ActiveMQMessageConsumer { value=ID:w5c260a886515-1913-1341274679580-1:815:1:1, started=true }] of transactional session [ActiveMQSession {id=ID:w5c260a886515-1913-1341274679580-1:815:1,started=true}] did not receive a message
    2012-07-03 10:32:02,144 [adapterListener-1] DEBUG org.springframework.jms.connection.JmsTransactionManager - Initiating transaction commit
    Thanks for all your help it is much appreciated.

    Thanks,
    Patrick Bray

Tags for this Thread

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •