May 4th, 2011, 07:58 AM
I have some niggling doubts about the implementation of your producers - in particular I think the ReturnListener has to be a property of the ConnectionFactory if it is supported at all in the framework. Did you actually try this with a realistic failure scenario? The listener callback should come on the Connection thread I think, so it's not going to be part of the ongoing transaction if there was one in the producer. I don't know if that would affect your real use case, but it makes this quite a slippery slope to introduce a new feature in Spring AMQP, and I'd be a bit worried about your own implementations if I was passing them to others for general use. We might need an abstraction that makes it clearer what the limitations of ReturnListener are.
May 5th, 2011, 12:22 PM
Yes, the ReturnListener callback does occur on the AMQP Connection thread, but that is not an issue in our case. We are not trying to manage it all in a transaction, but do want to know the message was unroutable so the system can take specific action to deal with it.
The current implementation we are using resets the ReturnListener on the channel just prior to ConnectionFactoryUtils.releaseResources(resourceHo lder). This has worked in "realistic failure scenarios", but has by means gone through exhaustive testing. I imagine this approach may be problematic given Channel.returnListener is accessed on a different thread. I'm not familiar enough with what's happening in the RabbitMQ client code to know how much of an issue this is. Given those potential issues, I see your point that it may be best to associate the ReturnListener with the ConnectionFactory.
If Spring AMQP provided clean support for mandatory/immediate that would be ideal. If not then some direction on how to implement a workaround would be very helpful (especially with the use of CachingConnectionFactory).
May 6th, 2011, 06:07 AM
'basic.publish' (from client to broker) and 'basic.return' (from broker to client when publish had mandatory/immediate flags and either route was not found or there is no active consumer on routed queue) are both asynchronous calls.
That means 'basic.return' back to client can happen at some time in the future.
Current Spring implementation caches not only connections but also channels.
It is possible that two parallel threads use the same channel at the same time. One thread might expect message delivery failure, while another can cause the channel or connection to be closed (due to some kind of exception). In that case ReturnListener will not be notified even if its message was unroutable or not consumed immediately.
So possible solution for you could be that you use your 'SendChannelCallback' and make sure that the same channel is not used by another thread in parallel.
May 9th, 2011, 01:47 PM
Sorry for the delay in responding.
Is it possible to "make sure that the same channel is not used by another thread in parallel" while using CachingConnectionFactory? Or would this require a move to SingleConnectionFactory?
May 10th, 2011, 02:51 AM
Both ConnectionFactory implementations already segregate channels from the moment they are logically created to the moment they are logically closed. Tomas was hinting that you might be able to implement some blocking to force the producer to wait for the basic.return before continuing (maybe after sending a batch of messages to make it more efficient) - as long as you delay the call to close() you know that no-one else is using the channel.
Your problem is actually much deeper. There is literally no way to be sure that you will ever get a basic.return callback - it's asynchronous and the Connection can be closed without warning, e.g. on a hard error that your producer actually wasn't responsible for. If you are OK with that, then we can think about how you might add the callback in the ConnectionFactory.
Your code samples were pretty basic. What would your real-life ReturnListener actually do? How would it get a reference to the business data that was undelivered?
The RabbitMQ broker engineers, by the way, refer to immediate and mandatory flags as "misfeatures" in the spec, and say they reserve the right to remove those features from RabbitMQ at any time. If I were you I'd try harder to find an alternative solution.
May 12th, 2011, 03:05 AM
OK, we need to wrap this up for 1.0. Here's what I propose:
1. RabbitTemplate.doSend() becomes protected, so you can set the flags on basic.publish if you need to
2. We add a ChannelListener interface and allow you to register implementations in the ConnectionFactory, then you can add the ReturnListener in a sensible place
Does that work?
Jun 23rd, 2011, 05:38 PM
My apologies for missing your post on the 12th, I just saw it today as I returned to revisit the topic while working through some rabbit refinements. I must have missed the email.
So, yes the changes you propose would be helpful. I hope I didn't miss the boat here -- saw issue for ChannelListener so I'm hoping that's a good sign.
Oct 5th, 2011, 12:50 PM
I know this thread is a few months old at this point and that you've taken steps to allow the mandatory flag to be set if desired (by inheriting from rabbittemplate). I think we have a use case for which the mandatory flag would be appropriate, but please feel free to suggest alternatives. Our situation is as follows:
We have a high volume queue for which we want to distribute the load across our cluster. The solution we came up with is for each instance of our application to create an anonymous/non-durable/auto-delete queue that is bound to a common exchange. Upon startup, our apps will 'register' their anonymous queues, i.e. will send a notification to a topic exchange to which producers are listening that identifies the exchange and queue name that producers can use to send messages. Producers will then, for this particular exchange, round robin messages to each queue. The goal was to have an even distribution of queues across the cluster (since queues are sticky to the node they were created on), and thus an even distribution of messages.
Since the queues are anonymous/non-durable/auto-delete, they'll disappear when our app shuts down. We have a mechanism for sending an 'unregister' method on shutdown, but that assumes a graceful shutdown. It's possible that the producer could end up sending messages to queues that have been deleted. We'd like some way for the producer to be notified that a consumer has vanished. We could go with a heartbeat, but something like the functionality of the mandatory flag seems simpler.
I don't want to use a feature that could be deprecated at some point in the future, so I'm open to suggestions. Thoughts?
Oct 7th, 2011, 09:27 AM
If I understand correctly I think you are just duplicating the work of the broker, balancing work across as many consumers as are registered on a particular queue - i.e. why don't you just use a single queue? Or did I miss something?
Oct 7th, 2011, 09:35 AM
Problem we found with a single queue, was that in our clustered environment, all messages effectively pass through the broker on which the queue was created. This lead to high memory usage on that broker. So high that we hit the high memory water mark and never recovered. We had persistent messages turned off, and verified that there were no un-acked messages. Seemed to be a problem of scale.