PDA

View Full Version : Throttling



PMBell
Mar 13th, 2012, 01:56 PM
Hi All,

Folks on the RabbitMQ list have made clear to me that you don't treat Rabbit's queues the way you might treat a programming language Queue object. Specifically, Rabbit's queues are not designed to do things like "peek," requeue, update in-place a queue element, etc.

I am pondering an architecture wherein consumers, perhaps many of them on different nodes, are subscribed to a single queue which is backed by a fanout exchange. If each consumer binds with the same queue name, then the exchange delivers messages in a round-robin style, each consumer getting a subset of the messages in the queue.

In the subscription approach (as opposed to "polling"), you don't have any control over the arrival of a message. Suddenly, it's "just there" in the MessageListener. Thus, I don't think I have any choice but to place this message (un-ACKed) in some type of Java object, a kind of internal queue.

In a heavy load scenario where there are many arriving messages, but few are getting processed and disposed of, couldn't this present a problem? The internal queue could grow rapidly and consume much memory.

So I am wondering, given this architecture, what are some best practices ways of throttling arrivals. For example, is it possible for the MessageListener to say to the exchange, "No, I don't want this or any other messages until I give you further notice?"

Thanks for your help.

-Paul

Gary Russell
Mar 13th, 2012, 02:11 PM
The listener container is naturally throttled by the 'concurrency' attribute. As long as you fully process each message on the container's thread (and don't do an asynch handoff to another thread), you will never receive more than 'concurrency' messages, er, well, concurrently.

'concurrency' is the attribute when using Spring-AMQP's XML namespace support - if you are coding to the APIs directly, see 'setConcurrentConsumers()'.

PMBell
Mar 13th, 2012, 02:52 PM
Hi Gary,

Yeah, I've experimented with this property and some time back posted about it to the RabbitMQ list. I had observed, and the list confirmed, that this property seems to govern the number of Java threads that feed a single consumer. Is that right?

But let me make sure I really understand what you are saying, because something elusive here is disturbing me. My architecture calls for consumers NOT to ACK the message (nor is auto-ack in effect) until the task represented by the message is completely processed. This task could take several hours. I don't ACK because I had intended to use Rabbit's message redelivery as foundational for retrying a failed task.

Also, by not ACKing I guarantee that in the event a component bounces, Rabbit will redeliver all messages that were consumed but not yet ACKed.

Is my understanding incorrect?

Let's take a simple case of concurrency equals one. Are you saying that, until I ACK that message, no other messages will be delivered to that consumer?

Thanks.

-Paul

Gary Russell
Mar 13th, 2012, 03:13 PM
Well, that is an extraordinarily long time to hold off from acking a message.

Also, I should have looked at the code before answering; I assumed it was similar to the JMS listener container internals; it does appear that the consumer puts messages into a memory Queue on arrival; each thread then pulls messages from its queue one at a time.

So, yes, you are likely to have memory issues - what's not clear to me is that you are saying it takes hours to process a message, but you also expect a high arrival rate; that doesn't seem to add up.

Maybe you'd be better off using Spring Integration, and storing the requests in a JDBC-backed channel on receipt and then have a poller pull the requests from that channel at a controlled rate.

PMBell
Mar 13th, 2012, 03:30 PM
Hey Gary,

Thanks for checking the code.

In reality, I offered the high-arrival rate example as a kind of limit case. I am not convinced that, in the real world, our app will see such behavior for just the reasons you adduce.

So there really is no throttling capability when using the subscription (MessageListener) approach - is that correct?

BTW: I searched through my posts to the RabbitMQ list and found this:

ME:I came across something that is, at first blush, a fly in the ointment. Specifically, I read that if a consumer doesn't ACK (and assuming no auto-ack), then the broker will NOT deliver another message to that consumer, until it does ACK the previous message.

REPLY:This is only true if you've used Basic.Qos to limit the number of outstanding messages the broker is willing to offer. For instance, setting the prefetch count to 1 will give the behaviour you describe. If, on the other hand, you don't issue a Basic.Qos command at all, the prefetch count will be unlimited, and the broker will stream messages to connected consumers until the TCP buffers at the sending side fill up, without regard for ACKs, delayed or otherwise.

Cordially,

Paul

Gary Russell
Mar 13th, 2012, 03:45 PM
Looks like I am wrong again; I just ran some tests and while the consumer is set up to use an internal queue; the next delivery does only appear once we send the ack for the current one; so you are good, and this is consistent with the comments you found on the list above because, by default, we set the prefetch count to 1.

If autoAck is set, the behavior is as I described, as long as you don't set auto ack, and leave the prefetch to default to 1.

So, we're back to my original statement - the concurrentConsumers property will throttle for you. Sorry for the (my) confusion.

PMBell
Mar 13th, 2012, 03:56 PM
Gary, thank you very much for your efforts on what are doubtless newb questions!

But despite these efforts, I confess some confusion here. When I first read this sentence:

"For instance, setting the prefetch count to 1 will give the behaviour you describe. If, on the other hand, you don't issue a Basic.Qos command at all, the prefetch count will be unlimited, and the broker will stream messages to connected consumers until the TCP buffers at the sending side fill up, without regard for ACKs, delayed or otherwise."

I thought, "Great, by default the broker will keep pushing messages to my consumers; so even if I don't ACK for quite some time, he'll keep giving me work to dispatch."

Are you saying this not the case by default with Spring AMQP? Despite the fact that I delay ACKing, I want the broker to keep feeding work to the consumers.

Cordially,

Paul

Gary Russell
Mar 13th, 2012, 04:11 PM
Well, you actually have the best of both worlds; with no basicQos calls the broker would keep pushing messages regardless, and you could run out of memory.

The spring-amqp default is to set the prefetch to only 1 message (no buffering).

You can set the prefetch to whatever you want - if you set it to 10, you'll have 9 messages locally queued with the next one being ready to process immediately. If you set it to 2, you'll always have one available (as long as the time to process a message exceeds the round trip to send the ack and receive the next one.

PMBell
Mar 13th, 2012, 04:19 PM
OK, I think I get it.

Questions:



Does Spring AMQP expose a means to set this "prefetch" value, or must I use the Basic.Qos call?
Is this prefetch value "distributed" across "concurrent consumers?" For example, if I setConcurrentConsumers to 2 and prefetch value to 10 is the result two consumer threads with each thread holding 4 locally queued messages and each with one message ready to present?


Thanks.

-Paul

PS: I would most grateful if you could weigh in my "Topic Exchange Semantics" post. I've got a lot riding on it...

Gary Russell
Mar 13th, 2012, 04:32 PM
1. Yes, on the SimpleMessageListenerContainer...



/**
* Tells the broker how many messages to send to each consumer in a single request. Often this can be set quite high
* to improve throughput. It should be greater than or equal to {@link #setTxSize(int) the transaction size}.
*
* @param prefetchCount the prefetch count
*/
public void setPrefetchCount(int prefetchCount) {
this.prefetchCount = prefetchCount;
}


2. No, each consumer prefetches the prefetchCount each - this is important to understand that you might have an idle consumer that can't process a message because it's in another consumer's backlog queue. This would only really be an issue if you have a wide difference between the time it takes to process a single message. Choosing the right number is probably a work of art; you really need to tune it such that threads are not waiting for the broker, while not setting it so large that work is not stuck in a busy thread's queue while other threads are available.

If you have really long-running tasks, I would leave it at 1 because the time to get the next request is noise compared to the runtime of the task. For short running tasks, set it high.

WRT your other topic; as I am sure you can tell from my false starts above, I am learning much of the internals and rabbit myself so I will have to think about it for a while.

PMBell
Mar 13th, 2012, 04:54 PM
Hi Gary,

This has been a very interesting conversation, one that has revealed to me some of the subtleties inherent in a message queuing architecture.

Here's another perspective on the throttling question. Imagine a single queue from which one consumer is reading very slowly (if at all). But the publish side of this queue is writing data to it very quickly. The queue is non-durable and is backed by a non-durable topic exchange. This sort of reverses what we've been discussing. That is, prefetch won't be of much help here because the prefetching component is very slow. Even if we set a high prefetch value to offload data from the queue into the consumer's local memory, it still represents memory consumption, just as it would in the queue itself if we set a much lower prefetch value (I here assume that all components are on the same node, sharing the same memory).

So what seems to be needed here is the ability to throttle the producer. Are there best-practice ways of doing this; for example, can the producer obtain info about the state of the queue, e.g., # unacked messages, size of the queue, etc.?

In re your:

WRT your other topic; as I am sure you can tell from my false starts above, I am learning much of the internals and rabbit myself so I will have to think about it for a while.
Please know that I am grateful for the help, and that you're way ahead of me! That said, I will soon try to test the topic exchange behavior and post the behavior that I observe.

Cordially,

Paul

Gary Russell
Mar 13th, 2012, 05:15 PM
So what seems to be needed here is the ability to throttle the producer. Are there best-practice ways of doing this; for example, can the producer obtain info about the state of the queue, e.g., # unacked messages, size of the queue, etc.?


That's one for the Rabbit guys - but it looks like, practically, it only throttles producers under memory exhaustion conditions...

http://www.rabbitmq.com/memory.html

PMBell
Mar 13th, 2012, 05:18 PM
Cool; thanks.

I'll take it to the Rabbit guys.

-Paul

hubbaba
May 9th, 2012, 10:48 AM
Very interesting thread to read through. I'm trying to implement a way for only 1 message to be processed per second. I have something like this (I trimmed the example to make it easier to follow):

<int-amqp:inbound-channel-adapter channel="start.delay" queue-names="someQueue" concurrent-consumers="1" task-executor="limitedTaskExecutor" prefetch-count="1"/>

<task:executor id="limitedTaskExecutor" pool-size="1" queue-capacity="1"/>

<int:channel id="start.Delay"/>
<int:chain input-channel="startDelay" output-channel="someWhere">
<int:delayer default-delay="1000"/>
</int:chain>

My unit test still runs too fast. It seems that multiple messages are still being pulled from rabbit. Is there something I am missing?

Thank you very much for your help!

Ben

Gary Russell
May 9th, 2012, 11:19 AM
If you run with DEBUG logging, you should see what's happening. Make sure you have %t (thread, if using log4j).

hubbaba
May 10th, 2012, 11:18 AM
Thanks for your help Gary. I figured it out. Turns out it was the delayer. The delayer accepts the message and then it immediately returns back to the amqp inbound channel adapter. So, the queue drains very quickly with each message being delayed but running in parallel.

To fix this, I created a simple bean that sleeps for a certain amount of time and then returns the payload. Here is the method inside the bean:

@ServiceActivator
Object delay(@Payload payload, @Header("delayMillis") long delayMillis) {
sleep(delayMillis)
return payload
}

I replaced the delayer with this bean using a service activator and everything works. :)