Results 1 to 10 of 10

Thread: BlockingQueueConsumer fails because of missing queue in RabbitMQ

  1. #1

    Default BlockingQueueConsumer fails because of missing queue in RabbitMQ

    Hi,

    I have an application that consumes messages from RabbitMQ. Here is the basic XML configuration:

    <rabbit:admin connection-factory="connectionFactory" auto-startup="false" />

    <rabbit:queue name="myproduct.xmlProcessingReplyQueue"/>
    <rabbit:queue name="myproduct.xmlProcessingRequestQueue" />

    When my application starts (with a freshly installed RabbitMQ) I get these exceptions:

    Code:
    2012-05-31 14:48:21,867 [SimpleAsyncTaskExecutor-1] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Starting consumer Consumer: tag=[null], channel=null, acknowledgeMode=AUTO local queue size=0
    2012-05-31 14:48:21,867 [SimpleAsyncTaskExecutor-3] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Starting consumer Consumer: tag=[null], channel=null, acknowledgeMode=AUTO local queue size=0
    2012-05-31 14:48:21,867 [SimpleAsyncTaskExecutor-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Starting consumer Consumer: tag=[null], channel=null, acknowledgeMode=AUTO local queue size=0
    2012-05-31 14:48:22,348 [SimpleAsyncTaskExecutor-1] DEBUG org.springframework.amqp.rabbit.connection.CachingConnectionFactory Creating cached Rabbit Channel from AMQChannel(amqp://guest@127.0.0.1:5672/,3)
    2012-05-31 14:48:22,358 [SimpleAsyncTaskExecutor-3] DEBUG org.springframework.amqp.rabbit.connection.CachingConnectionFactory Creating cached Rabbit Channel from AMQChannel(amqp://guest@127.0.0.1:5672/,2)
    2012-05-31 14:48:22,358 [SimpleAsyncTaskExecutor-2] DEBUG org.springframework.amqp.rabbit.connection.CachingConnectionFactory Creating cached Rabbit Channel from AMQChannel(amqp://guest@127.0.0.1:5672/,1)
    2012-05-31 14:48:22,498 [SimpleAsyncTaskExecutor-2] DEBUG org.springframework.amqp.rabbit.connection.CachingConnectionFactory Detected closed channel on exception.  Re-initializing: null
    2012-05-31 14:48:22,498 [SimpleAsyncTaskExecutor-3] DEBUG org.springframework.amqp.rabbit.connection.CachingConnectionFactory Detected closed channel on exception.  Re-initializing: null
    2012-05-31 14:48:22,508 [SimpleAsyncTaskExecutor-2] WARN  org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Reconnect failed; retries left=2
    java.io.IOException
    	at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
    	at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
    	at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
    	at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:755)
    	at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:61)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)[:1.6.0_22]
    	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)[:1.6.0_22]
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)[:1.6.0_22]
    	at java.lang.reflect.Method.invoke(Unknown Source)[:1.6.0_22]
    	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:331)
    	at $Proxy52.queueDeclarePassive(Unknown Source)
    	at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:213)
    	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:516)
    	at java.lang.Thread.run(Unknown Source)[:1.6.0_22]
    And further down:


    Code:
    2012-05-31 14:48:37,760 [SimpleAsyncTaskExecutor-2] DEBUG org.springframework.amqp.rabbit.connection.CachingConnectionFactory Detected closed channel on exception.  Re-initializing: null
    2012-05-31 14:48:37,760 [SimpleAsyncTaskExecutor-2] ERROR org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer received fatal exception on startup
    org.springframework.amqp.rabbit.listener.FatalListenerStartupException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
    	at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:228)
    	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:516)
    	at java.lang.Thread.run(Unknown Source)[:1.6.0_22]
    I have a feeling that the queues are not detected because the <rabbit:admin> tag has auto-startup=false and so it doesn't create the queues. I'm not sure what to do about it though...

    Anyone?

  2. #2
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,840

    Default

    I believe you are right regarding the auto-startup flag. Why do you need that to be false?

  3. #3

    Default

    Because when the application starts up, RabbitMQ may not yet be available. The application is designed to start up and allow a user, through interaction, to start the messaging services.

    I just checked with auto-startup and it does in fact seem to be the problem (queueDeclarePassive() is invoked, which checks if the queue exists).

  4. #4
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,012

    Default

    Can you not simply declare the queues in the rabbitmq.config, rather than via RabbitAdmin?
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  5. #5

    Default

    Well, this would require the customer/end user to do some manual configuration which we want to avoid if at all possible. But even if we decided to add the configuration to rabbitmq.config, it would not help with the server not being available when the application starts. Or that the location of the server may not be immediately known.

    Besides, the auto-startup attribute exists - isn't it supposed to work? :-)

  6. #6
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,840

    Default

    Yes, it does work; that's why you don't have your queues declared automatically

    I think what you need to do is also set auto-startup=false on your message-listener container and/or any channel-adapters/gateways.

  7. #7
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,012

    Default

    Just to clarify; if you set auto-startup to false on the rabbit admin it will not automatically declare queues etc found in the context. Any elements using those queues must also not be started.

    When you are ready to start, get a reference to the RabbitAdmin bean, and call initialize(); you can then start the other components, such as listener containers, Spring Integration adapters etc.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  8. #8

    Default

    If I have to get a reference to the RabbitAdmin instance in my code, in order to call initialize() on it, I'll be creating a fairly strong coupling to RabbitMQ inside my code. I would like to avoid this, if possible, as the idea is to be able to use a different message infrastructure at some later point.

    Do you have any suggestions for how I might call initialize() on the RabbitAdmin instance without actually referencing any rabbit specific classes in my code?

  9. #9

    Default

    Okay, I fixed the above by introducing an Admin interface in my code, which is then initialized by a messaging-specific implementation. This way the coupling is minimal.

  10. #10
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,012

    Default

    Another option is to send a message to a void return gateway proxy method where the <gateway/>'s request channel is connected to a <control-bus/>.

    The message should contain a String payload "@rabbitAdmin.initialize()".
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

Tags for this Thread

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •