Here are logs and rabbitmqctl output when recreating the issue I'm raising. We have a three node cluster. Hostnames and IPs have been changed/masked to protect the innocent. In regards to my previous message, handleCancelOk is not what must be handled, otherwise "Received cancellation notice for ..." would have shown up in the test app log. I believe it is handleCancel.
At this point there is an HA queue named "reconnecting.consumer.testqueue" declared and mirrored across all nodes in the cluster. You can see from the rabbitmqctl output below that the master is on rabbitnode2. It is mirrored on rabbitnode1 and rabbitnode3. Make note of the consumer count and the consumer tag.
[root@rabbitnode1.dev ~]# rabbitmqctl list_queues name arguments pid consumers slave_pids
Listing queues ...
reconnecting.consumer.testqueue [{"x-ha-policy","all"}] <rabbit@rabbitnode2.3.15556.289> 1 [<rabbit@rabbitnode1.3.27093.347>, <rabbit@rabbitnode3.1.15506.233>]
...done.
[root@rabbitnode1.dev ~]# rabbitmqctl list_consumers
Listing consumers ...
reconnecting.consumer.testqueue <rabbit@rabbitnode1.3.28066.347> amq.ctag-Asm0iXLsDi1--nlpRaKU0b true
...done.
Now I stop the rabbitnode2 node containing the master
[root@rabbitnode2.dev ~]# rabbitmqctl stop_app
Stopping node rabbit@rabbitnode2 ...
...done.
Tail of /var/log/rabbitmq/rabbit@rabbitnode2.log showing the node going offline
=INFO REPORT==== 17-Jan-2012::16:01:31 ===
closing TCP connection <0.16379.289> from xxx.xxx.xxx.xxx:xxxxx
=INFO REPORT==== 17-Jan-2012::16:01:31 ===
application: rabbit
exited: stopped
type: temporary
=INFO REPORT==== 17-Jan-2012::16:01:31 ===
application: os_mon
exited: stopped
type: temporary
=INFO REPORT==== 17-Jan-2012::16:01:31 ===
application: mnesia
exited: stopped
type: temporary
Now the rabbitmqctl output after taking down rabbitnode2. You can see that rabbitnode1 was promoted to master. Note the consumer count of 0.
[root@rabbitnode1.dev ~]# rabbitmqctl list_queues name arguments pid consumers slave_pids
Listing queues ...
reconnecting.consumer.testqueue [{"x-ha-policy","all"}] <rabbit@rabbitnode1.3.27093.347> 0 [<rabbit@rabbitnode3.1.15506.233>]
...done.
[root@rabbitnode1.dev ~]# rabbitmqctl list_consumers
Listing consumers ...
no consumers listed for "reconnecting.consumer.testqueue"
...done.
Excerpt from log of test app. Note the consumer tag and time. As you can see, spring-amqp thinks the consumer is still consuming. Neither the connection nor the channel were shutdown. These are just logs of the consumer polling. It keeps going.
16:01:29.922 [SimpleAsyncTaskExecutor-1] TRACE o.s.a.r.l.SimpleMessageListenerContainer - Waiting for message from consumer.
16:01:29.922 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tag=[amq.ctag-Asm0iXLsDi1--nlpRaKU0b], channel=Cached Rabbit Channel: AMQChannel(amqp://rabbituser@yyy.yyy.yyy.yyy:yyyy/,1), acknowledgeMode=AUTO local queue size=0
16:01:30.923 [SimpleAsyncTaskExecutor-1] TRACE o.s.a.r.l.SimpleMessageListenerContainer - Waiting for message from consumer.
16:01:30.923 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tag=[amq.ctag-Asm0iXLsDi1--nlpRaKU0b], channel=Cached Rabbit Channel: AMQChannel(amqp://rabbituser@yyy.yyy.yyy.yyy:yyyy/,1), acknowledgeMode=AUTO local queue size=0
16:01:31.923 [SimpleAsyncTaskExecutor-1] TRACE o.s.a.r.l.SimpleMessageListenerContainer - Waiting for message from consumer.
16:01:31.923 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tag=[amq.ctag-Asm0iXLsDi1--nlpRaKU0b], channel=Cached Rabbit Channel: AMQChannel(amqp://rabbituser@yyy.yyy.yyy.yyy:yyyy/,1), acknowledgeMode=AUTO local queue size=0
16:01:32.924 [SimpleAsyncTaskExecutor-1] TRACE o.s.a.r.l.SimpleMessageListenerContainer - Waiting for message from consumer.
16:01:32.924 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tag=[amq.ctag-Asm0iXLsDi1--nlpRaKU0b], channel=Cached Rabbit Channel: AMQChannel(amqp://rabbituser@yyy.yyy.yyy.yyy:yyyy/,1), acknowledgeMode=AUTO local queue size=0
16:01:33.924 [SimpleAsyncTaskExecutor-1] TRACE o.s.a.r.l.SimpleMessageListenerContainer - Waiting for message from consumer.
16:01:33.924 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tag=[amq.ctag-Asm0iXLsDi1--nlpRaKU0b], channel=Cached Rabbit Channel: AMQChannel(amqp://rabbituser@yyy.yyy.yyy.yyy:yyyy/,1), acknowledgeMode=AUTO local queue size=0
16:01:34.925 [SimpleAsyncTaskExecutor-1] TRACE o.s.a.r.l.SimpleMessageListenerContainer - Waiting for message from consumer.
16:01:34.925 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tag=[amq.ctag-Asm0iXLsDi1--nlpRaKU0b], channel=Cached Rabbit Channel: AMQChannel(amqp://rabbituser@yyy.yyy.yyy.yyy:yyyy/,1), acknowledgeMode=AUTO local queue size=0
16:01:35.925 [SimpleAsyncTaskExecutor-1] TRACE o.s.a.r.l.SimpleMessageListenerContainer - Waiting for message from consumer.
16:01:35.925 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tag=[amq.ctag-Asm0iXLsDi1--nlpRaKU0b], channel=Cached Rabbit Channel: AMQChannel(amqp://rabbituser@yyy.yyy.yyy.yyy:yyyy/,1), acknowledgeMode=AUTO local queue size=0
16:01:36.926 [SimpleAsyncTaskExecutor-1] TRACE o.s.a.r.l.SimpleMessageListenerContainer - Waiting for message from consumer.
16:01:36.926 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tag=[amq.ctag-Asm0iXLsDi1--nlpRaKU0b], channel=Cached Rabbit Channel: AMQChannel(amqp://rabbituser@yyy.yyy.yyy.yyy:yyyy/,1), acknowledgeMode=AUTO local queue size=0