Page 1 of 2 12 LastLast
Results 1 to 10 of 19

Thread: MessageRejectedWhileStoppingException while stopping

  1. #1
    Join Date
    Jun 2009
    Posts
    26

    Default MessageRejectedWhileStoppingException while stopping

    Hi,
    I'm using spring-amqp along with RabbitMQ message broker which receives the messages from the queue and process them. also, I need to manage the incoming traffic from the MessageConsumer which reads from the queue and pass it to the core of application, so I created a InputTrafficMonitor bean which decides whether to stop/start the incoming traffic (from the message consumers) with stop/starting the injected SimpleMessageListenerContainer bean ( with calling the corresponding stop/start methods - I don't know if it's the right way to do that or not);
    so I started a stress-test and found that some messages is being lost in the case of stopping the messageListenerContainer ! I checked the logs and found bunch of MessageRejectedWhileStoppingException s as follows :

    Code:
    2012-10-29 14:55:19,004 WARN  [amqp.rabbit.listener.SimpleMessageListenerContainer:557] - Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.rabbit.listener.MessageRejectedWhileStoppingException: Message listener container was stopping when a message was received

    I. any clue how to handle/solve this problem ?
    II. is there any better way to pause the incoming traffic from the consumers rather than stop/starting the MessageListenerContainer ?

    thanks in advance
    Last edited by nima; Oct 29th, 2012 at 03:27 AM.

  2. #2
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,036

    Default

    Did you change the container's acknowledgeMode from its default (AUTO)?

    When the exception is thrown the container, by default, tells the broker to requeue the message.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  3. #3
    Join Date
    Jun 2009
    Posts
    26

    Default

    Hi Gary.
    nope ! the acknowledgeMode is being set to AUTO
    thanks again

  4. #4
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,036

    Default

    Hmmm... interesting. I just ran some tests and the message isn't "lost", it's sitting in Rabbit in an un-ackd state and is not redelivered after restarting the container with container.start(). When I kill the JVM, the message goes back to 'ready' and is delivered when I restart the JVM.

    I'll take a look to see what's happening; please open a JIRA ticket...

    https://jira.springsource.org/browse/AMQP
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  5. #5
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,036

    Default

    I have figured out what's going on; we have a race condition where the reject is done before the consumer is actually canceled, so messages can be delivered to the "old" (stopped) consumer and remain in an un-ackd state.

    The solution is to immediately cancel the consumer when the stop() is received so when we reject any in-flight messages because the container is stopped, we don't get any unintended redeliveries.

    I should be able to get the fix into the 1.1.3 release which is currently due this week.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  6. #6
    Join Date
    Jun 2009
    Posts
    26

    Default

    Hi Gary, and thanks for your reply
    I'm so happy to hear that you found the problem. but, ...

    so messages can be delivered to the "old" (stopped) consumer and remain in an un-ackd state
    well, I ran a simple test with 300,000 incoming messages and tried to randomly stop/start the MessageListenerContainer (40 times happened), and I found that 299,519 messages has been received correcty, just 18 of them were in unacked state (based on info. in web-based management console) , and so, 463 of them were lost. I found exactly 463 MessageRejectedWhileStoppingExceptions in my log. and the (18) unacked messages were added to the final received messages after restaring my application. so, the lost messages were not in unacked state.

    thanks again for your help, buddy

  7. #7
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,036

    Default

    Hmmm... I don't see any code that would cause that behavior - I'll run some more tests. Can you share all your container properties? (concurrentConsumers, prefetchCount etc).

    Also, what does your test listener do with the delivery - any time delays etc to simulate processing, or just increment a counter?
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  8. #8
    Join Date
    Jun 2009
    Posts
    26

    Default

    Code:
    <bean id="incomingMessageListenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
            <property name="connectionFactory" ref="rabbitmqConnectionFactory" />
            <property name="queueNames" value="${incoming.queue.name}" />
            <property name="messageListener" ref="incomingMessageListener" />
            <property name="errorHandler" ref="errorHandler" />
            <property name="acknowledgeMode" value="AUTO" />
            <property name="defaultRequeueRejected" value="false" />
            <property name="concurrentConsumers" value="${incoming.consumer.count}" />
        </bean>
    oops ! I'm so so sorry, I think I found it ! I've set the defaultRequeueRejected property to false (days before) inorder to prevent endless loop in the case of any exception ( I cannot remember why, but I think maybe in the case of invalid object) ; I haven't test it yet, but the name implies !

    again, I deeply apologize

    P.S may I ask your opinion about my second question which is the approach for "traffic management" ?; I mean is there any better way to handle the incoming traffic from the MessageConsumer (which reads from the queue) rather than having a ManagerBean which stop/starts the MessageListenerContainer based on condition ?

    thank you for your patience and great help

  9. #9
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,036

    Default

    That's good to know; thanks.

    Actually, I am glad you pressed me because I did find another condition that could cause messages to remain un-ack'd.

    Even after I corrected that, I still, very rarely, saw unack'd (but never lost) messages; I suspect a race condition in the rabbit client itself but I need to do some research there.

    In the meantime, I have found that if we physically close the channel used by the consumer, rather than returning it to the cache, we never end up with unack'd messages and the stop/start scenario works fine.

    I would really appreciate it if you could run your test against my branch. I'd also be interested to hear how your test differs from mine. My test case is in StopStartIntegrationTests.java.

    If you are able to do that....

    Code:
    git clone https://github.com/garyrussell/spring-amqp.git
    git checkout AMQP-275a
    ./gradlew install
    Then, update your POM to use version 1.2.0.BUILD-SNAPSHOT of spring-rabbit.

    You should see zero unack'd messages via the web admin; there should be no need to kill the JVM.


    Regarding avoiding the "infinite loop" problem, consider using a StatelessRetryInterceptorFactoryBean (or stateful if your messages have messageIds) to insert a retry interceptor into the listener's advice chain. By configuring the interceptor to use a RejectAndDontRequeueRecoverer, the message will be permanently rejected after N (default = 3) attempts. You can configure the queue to send such rejections to a Dead Letter Exchange for later investigation.


    Stopping/starting the container is a reasonable approach to achieve what you are doing. Another alternative might be to add an Advice to the listener's advice chain that simply pauses the container's thread. Personally, I'd prefer to stop the container (once my fix is applied).
    Last edited by Gary Russell; Oct 31st, 2012 at 02:53 PM.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  10. #10
    Join Date
    Jun 2009
    Posts
    26

    Default

    sure I'm on a trip right now, I'll do that as soon as I get back to home (or work)
    best wishes

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •