Results 1 to 5 of 5

Thread: Messages not being auto-acked

  1. #1
    Join Date
    Aug 2009
    Posts
    28

    Default Messages not being auto-acked

    I have yet another simple problem where for various reasons I had to change an existing, working direct queue to a fanout going to multiple servers. With trial and error I got the fanout working; I receive the messages just fine, but apparently the sender (or rabbitmq) thinks I have not ack'ed and it resends, to where I get a blizzard of messages that did not occur with the direct exchange.

    The code is slightly more complex than the original direct queue because I need the data going to multiple durable queues for when the machine goes down the data is left intact on the queue until it is up again (hence the hostname being tacked on to the queue name).

    One other tidbit is that I thought I could send to a fanout exchange without specifying the queue name, when I try to leave the queue name null on the producer, even though it is going to a fanout exchange it complains. I am wondering if this name is causing the problem, because none of the queues are named exactly that. The data _does_ get there, it just keeps getting resent.

    I tried adding the acknowledge-mode to AUTO, but still no joy.

    The following is my code and firehose trace output:


    Code:
    /* producer side */
            connection = connectionFactory.newConnection();
            channel    = connection.createChannel();
            channel.exchangeDeclare(exchangeName, "fanout", true, false, null);
    ...
            channel.basicPublish(exchangeName, queueName, null, payload.getBytes("UTF-8"));
    
    
    /* consumer side */
        <int:channel id="report.status.channel"/>
    
        <rabbit:queue id="statusQueue" name="cw.status.report.queue.#{T(java.net.InetAddress).getLocalHost().getHostName()}" durable="true"/>
    
        <rabbit:fanout-exchange name="${activity.status.exchange.name}" durable="true">
            <rabbit:bindings>
                <rabbit:binding queue="statusQueue"/>
            </rabbit:bindings>
        </rabbit:fanout-exchange>
     
        <amqp:inbound-channel-adapter channel="report.status.channel" acknowledge-mode="AUTO"
                                      queue-names="#{statusQueue.getName()}"
                                      connection-factory="rabbitConnectionFactory"/>
    
       <int:service-activator id="reportStatus" input-channel="report.status.channel"
                               ref="reportStatusHandler" method="handle">
       </int:service-activator>
    
    
    /* rabbitmq firehose output */   
    ================================================================================
    2012-1-30 12:58:59: Message received
    
    Node:         rabbit@gnuphie
    Exchange:     activity.status.exchange
    Queue:        cw.status.report.queue.gnuphie
    Routing keys: [<<"cw.status.report.queue">>]
    Properties:   []
    Payload: 
    {"id":"replace_with_jsonrpc_id","result":{"report_status":"REPSTATUS_CREATED","report_ticket":"72489c0a-a129-4061-bc8e-793220c4438f","userUUID":"2f75358c-6489-4038-9737-9a1572dfc898","response":{"report_progress":{"step":3,"totalSteps":9}}},"generate_report":{"report_ticket":"72489c0a-a129-4061-bc8e-793220c4438f","jsonrpcID":"cf6764d8-3608-494f-be37-59bbb250f93c","reportTarget":"web"}}
    
    ================================================================================
    2012-1-30 12:58:59: Message received
    
    Node:         rabbit@gnuphie
    Exchange:     activity.status.exchange
    Queue:        cw.status.report.queue.gnuphie
    Routing keys: [<<"cw.status.report.queue">>]
    Properties:   []
    Payload: 
    {"id":"replace_with_jsonrpc_id","result":{"report_status":"REPSTATUS_MISSINGORG","report_ticket":"72489c0a-a129-4061-bc8e-793220c4438f","userUUID":"2f75358c-6489-4038-9737-9a1572dfc898","response":{"report_progress":{"step":5,"totalSteps":9}}},"generate_report":{"report_ticket":"72489c0a-a129-4061-bc8e-793220c4438f","jsonrpcID":"cf6764d8-3608-494f-be37-59bbb250f93c","reportTarget":"web"}}
    
    ....
    zillions of these...

  2. #2
    Join Date
    Aug 2009
    Posts
    28

    Default

    A clue for me (and hopefully for someone else to give me a hint) is I find loads of these in the log:

    Code:
    Execution of Rabbit message listener failed, and no ErrorHandler has been set: class org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Listener threw exception
    I am not sure what the underlying error actually is; there are no other exceptions in the log. I am assuming this is a configuration error on my part that never actually get to my code before being thrown.

    I have not really touched my listener configuration (it worked with the direct exchange):


    Code:
        <rabbit:listener-container connection-factory="rabbitConnectionFactory" acknowledge="auto"
                                   advice-chain="#{rabbitConfig.adviceChain}" concurrency="#{rabbitConfig.concurrent}"/>
    I am looking at the error-handler attribute for the listener right now. Will get back if I find something. My understanding was there was a default ErrorHandler created by spring-integration, but maybe I can put something else in that will shed a little more light.

  3. #3
    Join Date
    Aug 2009
    Posts
    28

    Default

    Well I created some highly complex code:


    Code:
    /**
     * Handle amqp errors in the amqp listener.
     *
     * @author         richardd    
     */
    @Component
    public class CwAmqpErrorHandler implements ErrorHandler {
    
        /** Field description */
        private final static Logger logger = LoggerFactory.getLogger(CwAmqpErrorHandler.class);
    
        //~--- methods ------------------------------------------------------------
    
        /**
         * Handle the given error, possibly rethrowing it as a fatal exception
         *
         * @param t
         */
        @Override
        public void handleError(Throwable t) {
            logger.error("unexpected messaging error", t);
        }
    }
    and added this in my config of the inbound adapter:

    Code:
        <amqp:inbound-channel-adapter channel="report.status.channel" acknowledge-mode="AUTO"
                                      queue-names="#{statusQueue.getName()}" error-handler="cwAmqpErrorHandler"
                                      connection-factory="rabbitConnectionFactory"/>
    Which dumped out some extremely verbose stack traces that told me I had a constraint violation of some hibernate stuff. Thank you for a very public forum to showcase my stupidity.

  4. #4
    Join Date
    Aug 2009
    Posts
    28

    Default

    One other note on this, in case someone else runs into a similar problem, is the reason it worked with the direct and not the fanout was because in the direct case only one system was updating a particular records in the shared database, whereas the fanout was sending to several which were attempting to update simultaneously. The actual record of interest was not the same on the different systems, but a related record in the database schema was the same across the systems and the update attempt was conflicting.

  5. #5
    Join Date
    Aug 2012
    Posts
    1

    Default could you please provide your reportStatusHandler code

    Quote Originally Posted by gnuphie View Post
    I have yet another simple problem where for various reasons I had to change an existing, working direct queue to a fanout going to multiple servers. With trial and error I got the fanout working; I receive the messages just fine, but apparently the sender (or rabbitmq) thinks I have not ack'ed and it resends, to where I get a blizzard of messages that did not occur with the direct exchange.

    The code is slightly more complex than the original direct queue because I need the data going to multiple durable queues for when the machine goes down the data is left intact on the queue until it is up again (hence the hostname being tacked on to the queue name).

    One other tidbit is that I thought I could send to a fanout exchange without specifying the queue name, when I try to leave the queue name null on the producer, even though it is going to a fanout exchange it complains. I am wondering if this name is causing the problem, because none of the queues are named exactly that. The data _does_ get there, it just keeps getting resent.

    I tried adding the acknowledge-mode to AUTO, but still no joy.

    The following is my code and firehose trace output:


    Code:
    /* producer side */
            connection = connectionFactory.newConnection();
            channel    = connection.createChannel();
            channel.exchangeDeclare(exchangeName, "fanout", true, false, null);
    ...
            channel.basicPublish(exchangeName, queueName, null, payload.getBytes("UTF-8"));
    
    
    /* consumer side */
        <int:channel id="report.status.channel"/>
    
        <rabbit:queue id="statusQueue" name="cw.status.report.queue.#{T(java.net.InetAddress).getLocalHost().getHostName()}" durable="true"/>
    
        <rabbit:fanout-exchange name="${activity.status.exchange.name}" durable="true">
            <rabbit:bindings>
                <rabbit:binding queue="statusQueue"/>
            </rabbit:bindings>
        </rabbit:fanout-exchange>
     
        <amqp:inbound-channel-adapter channel="report.status.channel" acknowledge-mode="AUTO"
                                      queue-names="#{statusQueue.getName()}"
                                      connection-factory="rabbitConnectionFactory"/>
    
       <int:service-activator id="reportStatus" input-channel="report.status.channel"
                               ref="reportStatusHandler" method="handle">
       </int:service-activator>
    
    
    /* rabbitmq firehose output */   
    ================================================================================
    2012-1-30 12:58:59: Message received
    
    Node:         rabbit@gnuphie
    Exchange:     activity.status.exchange
    Queue:        cw.status.report.queue.gnuphie
    Routing keys: [<<"cw.status.report.queue">>]
    Properties:   []
    Payload: 
    {"id":"replace_with_jsonrpc_id","result":{"report_status":"REPSTATUS_CREATED","report_ticket":"72489c0a-a129-4061-bc8e-793220c4438f","userUUID":"2f75358c-6489-4038-9737-9a1572dfc898","response":{"report_progress":{"step":3,"totalSteps":9}}},"generate_report":{"report_ticket":"72489c0a-a129-4061-bc8e-793220c4438f","jsonrpcID":"cf6764d8-3608-494f-be37-59bbb250f93c","reportTarget":"web"}}
    
    ================================================================================
    2012-1-30 12:58:59: Message received
    
    Node:         rabbit@gnuphie
    Exchange:     activity.status.exchange
    Queue:        cw.status.report.queue.gnuphie
    Routing keys: [<<"cw.status.report.queue">>]
    Properties:   []
    Payload: 
    {"id":"replace_with_jsonrpc_id","result":{"report_status":"REPSTATUS_MISSINGORG","report_ticket":"72489c0a-a129-4061-bc8e-793220c4438f","userUUID":"2f75358c-6489-4038-9737-9a1572dfc898","response":{"report_progress":{"step":5,"totalSteps":9}}},"generate_report":{"report_ticket":"72489c0a-a129-4061-bc8e-793220c4438f","jsonrpcID":"cf6764d8-3608-494f-be37-59bbb250f93c","reportTarget":"web"}}
    
    ....
    zillions of these...



    could you please provide your reportStatusHandler code

Tags for this Thread

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •