Results 1 to 9 of 9

Thread: RabbitMQ and Control Bus

  1. #1
    Join Date
    May 2012
    Posts
    14

    Default RabbitMQ and Control Bus

    Hi,

    Continuing from a previous reply to a thread (http://forum.springsource.org/showth...I&goto=newpost). I did try to use the control bus. I have looked at the example of control bus, however, I have a few questions. I modeled my consumer process with control bus, similar to the control bus example.

    My amqp rabbit adapter has an id = " rabbitAdapter" . Should the following statement stop the rabbitAdapter from listening??

    controlChannel.send(new GenericMessage<String>("@rabbitAdapter.start()")); ??

    I have modeled this based on the Control bus example..In the control bus example it is done as follows:

    controlChannel.send(new GenericMessage<String>("@inboundAdapter.start()")) ;

    Let me know your suggestions

  2. #2
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,840

    Default

    If you want to *stop* it from listening, you should call stop() rather than start().

  3. #3
    Join Date
    May 2012
    Posts
    14

    Default

    My bad, I meant controlChannel.send(new GenericMessage<String>("@rabbitAdapter.stop()"))

  4. #4
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,036

    Default

    I just modified the amqp sample as folllows...

    Code:
            MessageChannel controlBus = context.getBean("controlBus", MessageChannel.class);
    
            controlBus.send(new GenericMessage<String>("@rabbitAdapter.start()"));
    
            Thread.sleep(6000);
    
            controlBus.send(new GenericMessage<String>("@rabbitAdapter.stop()"));
    Code:
        <!-- From RabbitMQ To STDOUT -->
    
    	<int:channel id="controlBus" />
    	
    	<int:control-bus input-channel="controlBus"/>
    
        <int-amqp:inbound-channel-adapter channel="fromRabbit" id="rabbitAdapter"
                                          queue-names="si.test.queue" auto-startup="false"
                                          connection-factory="connectionFactory" />
    And all worked fine...

    Code:
    2012-05-24 12:52:11,243  INFO | main | o.s.integration.amqp.inbound.AmqpInboundChannelAdapter  | started rabbitAdapter 
    xxx
    2012-05-24 12:52:16,140  INFO | SimpleAsyncTaskExecutor-1 | org.springframework.integration.handler.LoggingHandler  | [Payload=xxx][Headers={timestamp=1337878336139, id=9fdbf001-7b86-42aa-9f09-6e652e1c4858, amqp_receivedRoutingKey=si.test.binding, amqp_deliveryMode=PERSISTENT, amqp_contentType=text/plain, amqp_receivedExchange=si.test.exchange, amqp_contentEncoding=UTF-8, amqp_redelivered=false, amqp_deliveryTag=1}] 
    xxx
    2012-05-24 12:52:17,245  INFO | main | o.s.amqp.rabbit.listener.SimpleMessageListenerContainer | Waiting for workers to finish. 
    2012-05-24 12:52:18,144  INFO | main | o.s.amqp.rabbit.listener.SimpleMessageListenerContainer | Successfully waited for workers to finish. 
    2012-05-24 12:52:18,144  INFO | main | o.s.integration.amqp.inbound.AmqpInboundChannelAdapter  | stopped rabbitAdapter

    Notice auto-startup="false" to prevent the adapter from immediately starting.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  5. #5
    Join Date
    May 2012
    Posts
    14

    Default

    Thank you so much for your help Russell. Looks like that may have stopped it. I do not know why it was not stopping earlier, may be I had another adapter running elsewhere.

    Follow up question on that is, does stopping the adapter stop the broker or does it just stop the consumer. I would guess the consumer?
    Additionally, my output of stopping on the console looks different than the one you pasted above. Here is what I get:


    2012-05-24 13:28:09,983 INFO | Thread-0 | o.s.context.support.ClassPathXmlApplicationContext | Closing org.springframework.context.support.ClassPathXmlAp plicationContext@2d58f9d3: startup date [Thu May 24 13:28:08 EDT 2012]; root of context hierarchy
    2012-05-24 13:28:09,987 INFO | Thread-0 | o.s.context.support.DefaultLifecycleProcessor | Stopping beans in phase 0
    2012-05-24 13:28:09,987 INFO | Thread-0 | o.s.context.support.DefaultLifecycleProcessor | Stopping beans in phase -2147483648
    2012-05-24 13:28:09,988 INFO | Thread-0 | o.s.integration.endpoint.EventDrivenConsumer | Removing {service-activator} as a subscriber to the 'controlChannel' channel
    2012-05-24 13:28:09,988 INFO | Thread-0 | org.springframework.integration.channel.DirectChan nel | Channel 'controlChannel' has 0 subscriber(s).
    2012-05-24 13:28:09,989 INFO | Thread-0 | o.s.integration.endpoint.EventDrivenConsumer | stopped org.springframework.integration.config.ConsumerEnd pointFactoryBean#0
    2012-05-24 13:28:09,989 INFO | Thread-0 | o.s.integration.endpoint.EventDrivenConsumer | Removing {message-handler:consoleOut} as a subscriber to the 'fromRabbit' channel
    2012-05-24 13:28:09,989 INFO | Thread-0 | org.springframework.integration.channel.DirectChan nel | Channel 'fromRabbit' has 0 subscriber(s).
    2012-05-24 13:28:09,989 INFO | Thread-0 | o.s.integration.endpoint.EventDrivenConsumer | stopped consoleOut
    2012-05-24 13:28:09,989 INFO | Thread-0 | o.s.integration.endpoint.EventDrivenConsumer | Removing {logging-channel-adapter:loggingChannel.adapter} as a subscriber to the 'loggingChannel' channel
    2012-05-24 13:28:09,989 INFO | Thread-0 | org.springframework.integration.channel.DirectChan nel | Channel 'loggingChannel' has 0 subscriber(s).
    2012-05-24 13:28:09,989 INFO | Thread-0 | o.s.integration.endpoint.EventDrivenConsumer | stopped loggingChannel.adapter
    2012-05-24 13:28:09,989 INFO | Thread-0 | o.s.integration.endpoint.EventDrivenConsumer | Removing {logging-channel-adapter:_org.springframework.integration.errorLogg er} as a subscriber to the 'errorChannel' channel
    2012-05-24 13:28:09,989 INFO | Thread-0 | o.s.integration.channel.PublishSubscribeChannel | Channel 'errorChannel' has 0 subscriber(s).
    2012-05-24 13:28:09,989 INFO | Thread-0 | o.s.integration.endpoint.EventDrivenConsumer | stopped _org.springframework.integration.errorLogger
    2012-05-24 13:28:09,989 INFO | Thread-0 | o.s.beans.factory.support.DefaultListableBeanFacto ry | Destroying singletons in org.springframework.beans.factory.support.DefaultL istableBeanFactory@3caa4b: defining beans [channelInitializer,$autoCreateChannelCandidates,or g.springframework.integration.internalDefaultConfi guringBeanFactoryPostProcessor,org.springframework .integration.config.ExpressionControlBusFactoryBea n#0,org.springframework.integration.config.Consume rEndpointFactoryBean#0,controlChannel,rabbitAdapte r,org.springframework.integration.channel.intercep tor.WireTap#0,fromRabbit,org.springframework.integ ration.stream.CharacterStreamWritingMessageHandler #0,consoleOut,loggingChannel,org.springframework.i ntegration.handler.LoggingHandler#0,loggingChannel .adapter,connectionFactory,amqpTemplate,org.spring framework.amqp.rabbit.core.RabbitAdmin#0,org.sprin gframework.amqp.core.Queue#0,org.springframework.a mqp.rabbit.config.BindingFactoryBean#0,org.springf ramework.amqp.core.DirectExchange#0,nullChannel,er rorChannel,_org.springframework.integration.errorL ogger,taskScheduler,org.springframework.integratio n.config.IdGeneratorConfigurer#0]; root of factory hierarchy
    2012-05-24 13:28:09,990 INFO | Thread-0 | o.s.scheduling.concurrent.ThreadPoolTaskScheduler | Shutting down ExecutorService 'taskScheduler'


    Is there anyway one could pause and resume the consumer ?

  6. #6
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,036

    Default

    Just the consumer.

    Looks like your entire context is shutting down. How are you running your test? You can't let your main() exit before you are ready.

    With the amqp example above, I can start and stop the adapter as many times as I like.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  7. #7
    Join Date
    May 2012
    Posts
    14

    Default

    Well I was just trying to stop it so I really had not written it as a Unit Test. It was written in my Main. I fail to understand what you mean by "before you are ready" ??

    How are you stopping and starting the adapter as many times as you want?? Will be of great help if you could share code like you did above...it really clarifies things ..

    Thanks.

  8. #8
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,036

    Default

    I just meant I can do this...

    Code:
            controlBus.send(new GenericMessage<String>("@rabbitAdapter.start()"));
    
            Thread.sleep(6000);
    
            controlBus.send(new GenericMessage<String>("@rabbitAdapter.stop()"));
    
            controlBus.send(new GenericMessage<String>("@rabbitAdapter.start()"));
    
            Thread.sleep(6000);
    
            controlBus.send(new GenericMessage<String>("@rabbitAdapter.stop()"));
    This is in a main() of the amqp sample. Can you show us your main() so we can see what you are doing/
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  9. #9
    Join Date
    May 2012
    Posts
    14

    Default

    I think I know what was going wrong ...

    I had left context.registerShutdownHook() in the main... Removing that I get the same output as you were getting ...

    Thanks for your help !

    Really appreciate all the help... Spring Integration rocks ! \m/

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •