Page 1 of 2 12 LastLast
Results 1 to 10 of 15

Thread: Automatic reconnects

  1. #1

    Default Automatic reconnects

    I'm new to AMQP and RabbitMQ, and I have a simple project set up using Spring AMQP to consume messages from a queue. As far as I can tell, there's nothing built in to attempt reconnects in the event of a lost connection. Is that right? I saw this thread:
    http://forum.springsource.org/showthread.php?t=93955

    Is that still a recommended way to go? Given the high level of similarity between Spring AMQP and Spring JMS, I'd expect that there would eventually be an AMQP DefaultMessageListenerContainer that will handle things like this. If I want to get automatic reconnects, which direction should I look?

    On a related note, might it be possible to generify the JMS DefaultMessageListenerContainer (and other JMS classes) to work for JMS, AMQP, and message protocol X by doing some refactoring and creating a new module like "spring-messaging"? The spring-messaging module would be the API-ish thing with providers (JMS, AMQP, X) plugged into the back end, very much like spring-amqp is intended to support multiple AMQP providers. It seems this would add a lot of value by making important functionality--like auto-reconnect--immediately available to spring-amqp and any other message protocols that might come along, assuming they conform to the same basic idiom.

  2. #2
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,853

    Default

    I believe you are asking for this feature: https://jira.springsource.org/browse/AMQP-93

    It has been resolved and will be available in the upcoming M3 release.

  3. #3

    Default

    I don't think that's what I'm looking for. That would take care of the case where a connection can't be made at startup, but what about when the RabbitMQ server restarts or shuts down or the connection is temporarily lost for some other reason? If I'm reading this right, the SimpleMessageListenerContainer only gets a connection from the factory at startup. If the server shuts down or the connection goes bad, then it would generally result in an exception's being thrown, and it seems that those exceptions are caught by the AsyncMessageProcessingConsumer in the SimpleMessageListenerContainer. When that gets an exception, the receive loop exits and no further invocations are scheduled, so it is no longer trying to receive messages.

    More of what I'm looking for is what happens in the org.springframework.jms.listener.DefaultMessageLis tenerContainer. In its receive loop, in the inner AsyncMessageListenerInvoker class, if any exception is thrown while attempting to receive a message, it calls recoverAfterListenerSetupFailure(), which calls refreshConnectionUntilSuccessful(), which, as the name implies, attempts forever to reconnect until the container is shut down. After establishing a new connection, it begins scheduling receive() invocations again so that message processing resumes.

    It seems to me that the connection factory is at too low of a level to be able to intelligently handle this kind of thing, so any changes there would only get you part of the way to being able to fully recover from a disconnect. This is the kind of mechanism I was talking about in my first post that would be nice to share between JMS and AMQP.

  4. #4
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,853

    Default

    Check out the last comments here: https://jira.springsource.org/browse/AMQP-44

    Also, if you checkout the latest code (HEAD), you can see the changes that have been made.

  5. #5

    Default

    Ah, never mind. I didn't see everything there. It does go beyond the connection factory. I think that will be exactly what I'm looking for. Thanks a lot.

  6. #6
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,853

    Default

    Great! If there's any way you can take it for a spin in the current state, it would be great to get some feedback. Otherwise, we'll have M3 out soon (hopefully next week).

    Thanks,
    Mark

  7. #7

    Default

    Quote Originally Posted by Mark Fisher View Post
    Great! If there's any way you can take it for a spin in the current state, it would be great to get some feedback. Otherwise, we'll have M3 out soon (hopefully next week).

    Thanks,
    Mark
    Hey, I got pulled into something else for a while, but I came back to this today. Here's what I've found so far.

    1) I see that M3 was released, but I didn't get a tag from the repo. It should be on 83f0f48db2471cfcf9035dca4849029924d4f27c.

    2) The M3 release doesn't appear to handle simple reconnects, where the RabbitMQ server just restarts, but as of f1d783cf5c09b16c06263b2a1475ef12658aa3ef (latest as of this morning), it does reconnect when I restart RabbitMQ.

    3) Even with reconnects working, if the server comes up and the queue I'm trying to receive from doesn't exist--e.g. it wasn't durable and hasn't yet been recreated--the reconnect attempts stop. The trace-level logs and stacktrace for when this happens are below.

    I'm going to keep looking and poking at this for a while.

    Code:
    12:58:29,694 INFO  Restarting Consumer: tag=[null], channel=null, acknowledgeMode=AUTO local queue size=0                                                                 [SimpleAsyncTaskExecutor-3] SimpleMessageListenerContainer
    12:58:29,695 DEBUG Closing Rabbit Channel: null                                                                                                                           [SimpleAsyncTaskExecutor-3] BlockingQueueConsumer
    12:58:29,698 DEBUG Detected closed connection. Opening a new one before creating Channel.                                                                                 [SimpleAsyncTaskExecutor-4] CachingConnectionFactory
    12:58:29,721 ERROR Consumer received fatal exception on startup                                                                                                           [SimpleAsyncTaskExecutor-4] SimpleMessageListenerContainer
    org.springframework.amqp.rabbit.listener.ListenerStartupFatalException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
    	at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:156)
    	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:471)
    	at java.lang.Thread.run(Thread.java:619)
    Caused by: java.io.IOException
    	at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:107)
    	at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:131)
    	at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:624)
    	at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:59)
    	at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:153)
    	... 2 more
    Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404,reply-text=NOT_FOUND - no queue 'hello' in vhost '/',class-id=50,method-id=10),null,""}
    	at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    	at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    	at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:328)
    	at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:201)
    	at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:125)
    	... 5 more
    Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404,reply-text=NOT_FOUND - no queue 'hello' in vhost '/',class-id=50,method-id=10),null,""}
    	at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:365)
    	at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:235)
    	at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:151)
    	at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:96)
    	at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:446)
    12:58:29,729 DEBUG Cancelling Consumer: tag=[null], channel=AMQChannel(amqp://guest@dev-tomcat:5672/,1), acknowledgeMode=AUTO local queue size=0                          [SimpleAsyncTaskExecutor-4] SimpleMessageListenerContainer
    12:58:29,729 DEBUG Closing Rabbit Channel: AMQChannel(amqp://guest@dev-tomcat:5672/,1)                                                                                    [SimpleAsyncTaskExecutor-4] BlockingQueueConsumer
    12:58:29,730 DEBUG Shutting down Rabbit listener container                                                                                                                [SimpleAsyncTaskExecutor-4] SimpleMessageListenerContainer
    12:58:29,731 DEBUG Waiting for workers to finish.                                                                                                                         [SimpleAsyncTaskExecutor-4] SimpleMessageListenerContainer
    12:58:29,731 INFO  Successfully waited for workers to finish.                                                                                                             [SimpleAsyncTaskExecutor-4] SimpleMessageListenerContainer

  8. #8

    Default

    Oh, and I also have to note that in glancing through the repo, I see several commits that mix large code reformats with real changes, and picking the changes out of the reformats is really difficult. If your IDE is reformatting on the fly, then "git add -p" is your friend for picking out the relevant changes to stage for a commit, and then you can commit the reformatting changes separately.

  9. #9
    Join Date
    Jun 2005
    Posts
    4,241

    Default

    Quote Originally Posted by zzantozz View Post
    if the server comes up and the queue I'm trying to receive from doesn't exist--e.g. it wasn't durable and hasn't yet been recreated--the reconnect attempts stop.
    That is the expected behaviour. Would you like to change it?

    The reasoning is that if the queue doesn't exist on startup then there is a configuration problem and you don't want the context to start at all. If you intend to configure the queue yourself programmatically at startup, then you can set auto-startup="false" on the container and only start it when the queue exists.

    If you use RabbitAdmin autostartup to declare the queue then it should happen strictly before the listener container starts up so it should be fine. If this is not what you are seeing please describe in more detail what you do see and what you would expect.

    Thanks for the git tips. There is a tag on M3 as far as I can see (f45dea87bb0389aa714e08c03e1be3cd2dd4f463).

  10. #10

    Default

    Quote Originally Posted by Dave Syer View Post
    That is the expected behaviour. Would you like to change it?

    The reasoning is that if the queue doesn't exist on startup then there is a configuration problem and you don't want the context to start at all. If you intend to configure the queue yourself programmatically at startup, then you can set auto-startup="false" on the container and only start it when the queue exists.
    That sounds fine. I'm new to RabbitMQ, and I'm not really familiar with conventional ways of using it. I have a feeling it should be okay the way it is. I just wanted to be sure you guys were aware that's how it was working.

    Quote Originally Posted by Dave Syer View Post
    If you use RabbitAdmin autostartup to declare the queue then it should happen strictly before the listener container starts up so it should be fine. If this is not what you are seeing please describe in more detail what you do see and what you would expect.
    I'm not using RabbitAdmin.

    Quote Originally Posted by Dave Syer View Post
    Thanks for the git tips. There is a tag on M3 as far as I can see (f45dea87bb0389aa714e08c03e1be3cd2dd4f463).
    I just double checked and fetched the latest commits, and I have no M3 tag. I don't have commit f45dea, either. Commit 83f0f48db2471cfcf9035dca4849029924d4f27c with message "[maven-release-plugin] prepare release 1.0.0.M3" certainly looks like the one created by the maven release plugin. If you check the poms, that's the revision where they were changed to 1.0.0.M3. It has your name on it, in fact. I'm fetching changes from https://github.com/SpringSource/spring-amqp.git. Perhaps some commits and/or the tag haven't been pushed to that repo? Did you use the release plugin or did you do the release manually? If you manually create a tag, you have to use "git push tag <tag name>" or "git push --tags". Only branches are pushed if you don't specify tag or --tags.

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •