Page 1 of 2 12 LastLast
Results 1 to 10 of 19

Thread: Testing Guranteed Delivery

  1. #1

    Default Testing Guranteed Delivery

    Good afternoon.
    I am trying to test guaranteed delivery using spring amqp.
    Making sure that:
    1. durable queue is used.
    2. delivery mode as PERSISTENT_MODE
    3. channelTransacted.

    To test the guaranteed delivery scenario I am doing the following:
    - publish 100 messages
    - In the consumer I am incrementing the counter.

    - When I see the count of 10 messages, bringing down RabbitMQ broker.

    - Bring up RabbitMQ broker, using RabbitMQ Management utility checking how many messages_ready are there.
    I found that there are no messages_ready ( in other words messages_ready count is zero).

    - Bring up the consumer expecting to process rest of the 90 messages.
    Since there are messages_ready in the queue I don't see any count print stataments.
    This means I lost 90 messages when I brought down RabbitMQ broker.

    I have repeated this test by publishing 20000 messages.
    Before I bring down RabbitMQ broker, 645 messages are processed.
    Bring down Broker and bring it up and checked messages_ready in the queue. The count is 17609.

    17609 + 645 = 18254 messages in tact out of 20000.
    1746 messages are missing.

    Could you please let me know if I am doing something wrong in my queueing configuration.

    Following is the queueing configuration I am using:
    ================================================== =

    @Configuration
    public abstract class AbstractMDBRabbitConfiguration extends AbstractRabbitConfiguration{

    protected abstract void configureMDBTemplate(DMBTemplate template);

    @Value("${rabbitmq.user}")
    private String rabbitMQUser;

    @Value("${rabbitmq.password}")
    private String rabbitMQPassword;

    @Value("${rabbitmq.host}")
    private String rabbitMQHost;



    @Bean
    public ConnectionFactory connectionFactory() {
    SingleConnectionFactory connectionFactory = new SingleConnectionFactory(rabbitMQHost);
    connectionFactory.setUsername(rabbitMQUser);
    connectionFactory.setPassword(rabbitMQPassword);
    return connectionFactory;
    }


    public DMBTemplate rabbitTemplate() {
    DMBTemplate template = new DMBTemplate(connectionFactory());
    template.setMessageConverter(messageConverter());
    configureMDBTemplate(template);
    return template;
    }


    @Bean
    public MessageConverter messageConverter() {
    return new SimpleMessageConverter();
    }
    }
    ================================================== ==============================================
    @Configuration
    public class SecondQConfiguration extends AbstractMDBRabbitConfiguration{

    private static final String MDB_SECQ_EXCHANGE_NAME = "mdb.testq.exchange";

    private static final String MDB_SECQ_NAME = "mdb.testq.queue";

    private static final String MDB_SECQ_ROUTING_KEY = MDB_SECQ_NAME;


    @Override
    protected void configureMDBTemplate(DMBTemplate template) {
    template.setExchange(MDB_SECQ_EXCHANGE_NAME);
    template.setRoutingKey(MDB_SECQ_NAME);
    }

    @Bean
    public DMBTemplate secondQTemplate() {
    return rabbitTemplate();
    }

    @Bean
    public Queue mdbSecQ() {
    Queue q = new Queue(MDB_SECQ_NAME);
    q.setAutoDelete(false);
    q.setDurable(true);---------------------> Making sure that queue is durable
    return q;
    }


    @Bean
    public TopicExchange mdbSecQExchange() {
    TopicExchange t = new TopicExchange(MDB_SECQ_EXCHANGE_NAME);
    t.setAutoDelete(false);
    t.setDurable(true);
    return t;
    }


    @Bean
    public Binding mdbSecQBinding() {
    return BindingBuilder.from(mdbSecQ()).to(mdbSecQExchange( )).with(MDB_SECQ_ROUTING_KEY);
    }
    }
    ================================================== ================================================== ======
    public class DMBTemplate extends RabbitTemplate {

    public DMBTemplate() {
    super();
    }

    public DMBTemplate(ConnectionFactory connectionFactory) {
    super(connectionFactory);
    }

    @Override
    public void convertAndSend(Object object) throws DMBMessagingException {
    try {
    super.convertAndSend(object);
    } catch (AmqpException amqpe) {
    throw new DMBMessagingException(amqpe);
    }
    }


    @Override
    public void convertAndSend(String exchange, String routingKey,
    final Object message) throws AmqpException {
    MessageProperties props =new MessageProperties();
    props.setDeliveryMode(MessageDeliveryMode.PERSISTE NT); -------> setting delivery mode as PERSISTENT
    MessageConverter msgConverter = getMessageConverter();
    send(exchange, routingKey, getMessageConverter().toMessage(message, props));
    }
    }

    ================================================== ================================================== ==========
    Handler that processes messages:
    ================================
    public class SecondQConsumer implements IDMBProcessor{
    private int count = 0;

    public void handleMessage(DMBMessage msg) {
    DMBMessagePayload pl = msg.getMessagePayload();
    String message = (String)pl.getMessage();
    count++;
    System.out.println("count:"+ count);
    }

    }

    ================================================== ================================================== =====
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:jaxws="http://cxf.apache.org/jaxws"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schem...-beans-3.0.xsd
    http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">

    <!-- pick up rabbit broker configuration -->

    <context:component-scan base-package="com.test">
    </context:component-scan>



    <contextroperty-placeholder location="file:///var/test/amqp.properties"/>




    <bean id="messageListenerContainer" class="org.springframework.amqp.rabbit.listener.Si mpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="queueName" value="mdb.testq.queue"/>
    <property name="concurrentConsumers" value="1" />
    <property name="messageListener" ref="messageListenerAdapter" />
    <property name="channelTransacted" value="true" />

    <property name="transactionManager" ref="txnManager" />

    </bean>

    <bean id="messageListenerAdapter" class="org.springframework.amqp.rabbit.listener.ad apter.MessageListenerAdapter">
    <property name="delegate" ref="testHandler" />
    <property name="messageConverter" ref="messageConverter" />
    </bean>



    <bean id="testHandler" class="com.test.SecondQConsumer">
    </bean>

    <bean id="txnManager" class="org.springframework.batch.support.transacti on.ResourcelessTransactionManager" />


    </beans>


    To make sure messages rolled back on exception, using channelTransacted = true and using transactionManager as
    org.springframework.batch.support.transaction.Reso urcelessTransactionManager.
    ================================================== ================================================== ======================

    Thanks
    Venkat

  2. #2
    Join Date
    Jun 2005
    Posts
    4,230

    Default

    I can't see anything wrong there. The transaction manager is not adding anything and you should remove it, but it shouldn't break anything either. You could also set the prefetch count in the container to limit the number of messages delivered before any acks are sent.

    It looks like you are using an older version so if you can use a snapshot, or wait till tomorrow and pick up 1.0.0.m3 we might make more progress (some bugs in the message listener container were fixed recently).

    One question: when you say you "bring down the broker" what exactly do you mean? How do you know that exactly 10 messages were delivered when you did that?

  3. #3

    Default

    Good morning Dave.
    I was running the Consumer and when 10 messages are processed by SecondQConsumer which is handler (I am making sure this by printing the count in this handler), I stop RabbitMQ broker which is Running from C:\Program Files\RabbitMQ\rabbitmq_server2.0.0\sbin. In other words killing rabbitmq-server.bat. Once RabbitMQ broker is down, I bring it up and run the command: rabbitmqctl list_queues messages_ready (from C:\Program Files\RabbitMQ\rabbitmq_server2.0.0\sbin. This should show me the number of messages ready to be consumed by the handler. Since I am using Persistent Mode as delivery mode, my expectation is that even if the RabbitMQ broker is down, messages are saved and this number has to be 90. Since 10 of them are already processed.

    As per your suggestion I will try with 1.0.0.M3.
    Dave I have one question. If I remove Transaction Manger, then I should remove channelTransacted property also. By removing these two I believe the rollback mechanism will not work if an exception is thrown in handleMessage of the handler.

    Thanks
    Venkat

  4. #4
    Join Date
    Jun 2005
    Posts
    4,230

    Default

    Quote Originally Posted by vveludan View Post
    This should show me the number of messages ready to be consumed by the handler.
    And what was the result? Were there any unacked messages?

    As per your suggestion I will try with 1.0.0.M3.
    If you can try a snapshot, you will have a chance to find any problems before M3 is released. Anyway, thanks for trying it and reporting on your experience.

    If I remove Transaction Manger, then I should remove channelTransacted property also.
    No, those two properties are independent (otherwise we wouldn't have provided both). Your transaction manager does nothing, so it can be removed, until your transaction needs it (probably never, I would guess, unless you start using Spring Batch stuff inside your listener).
    Last edited by Dave Syer; Mar 7th, 2011 at 03:21 AM. Reason: formatting

  5. #5

    Default

    Good afternoon Dave.

    And what was the result? Were there any unacked messages?

    Dave I took the snap shot of M3 and retried the test.
    - Published 10 messages
    - After consuming 5 messages, shutdown the rabbitmq broker. Message Conatiner Listener stopped throwing java.net.ConnectException: Connection refused: connect (which is expected).
    - Brought up rabbitmq broker. Used the command: rabbitmqctl list_queues messages_ready messages_unacknowledged. It is found that both messages_ready and messages_unacknowledged count is zero.

    You are right, by just setting channelTransacted I was able to test the functionality of rolling back the message when an exception is thrown in the handler.

    Thanks
    Venkat

  6. #6
    Join Date
    Jun 2005
    Posts
    4,230

    Default

    With M3 if you don't set the acknowledgeMode=AUTO the broker will not expect any acks, so it will send all the messages (in blocks of 100 by default I think). You are probably receiving them all, but not giving the listener a chance to process them. Can you try again with the acknowledgeMode=AUTO. (Maybe we should make this the default if the channel is transacted - I actually thought it would be an error, but apparently not.)

  7. #7

    Default

    Good morning Dave. As per your suggestion
    I have set autoAcknowledgeMode = AUTO:
    <bean id="messageListenerContainer" class="org.springframework.amqp.rabbit.listener.Si mpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="queueName" value="mdb.testq.queue"/>
    <property name="concurrentConsumers" value="1" />
    <property name="messageListener" ref="messageListenerAdapter" />
    <property name="channelTransacted" value="true" />
    <property name="acknowledgeMode" value="AUTO"/>
    </bean>


    With this configuration I have rerun the test.
    Stopped the broker after processing 5 messages. After brining back RabbitMQ broker, checked for messages_ready and messages_unacknowledged and found that the messages_ready count is 10 (instead of 5, in other words all messages were rolled back). The count of messages_unacknowledged count is zero. At this point I started the Consumer (Message Listener Container). For some reason it was not processing any messages from the queue though the messages_ready count is zero. When I waited for some time and checked again for messages_ready and messages_unacknowledged, the messages_ready count is changed to and messages_unacknowledged count changed to 1. But I didn't see anything processed in handler (The print count was not printed to the console of Message Listener Container).

    Thanks
    Venkat

  8. #8

    Default

    Good afternoon Dave in my previous post I found the following typo mistakes:

    At this point I started the Consumer (Message Listener Container). For some reason it was not processing any messages from the queue though the messages_ready count is zero.--> 10When I waited for some time and checked again for messages_ready and messages_unacknowledged, the messages_ready count is changed to 9 and messages_unacknowledged count changed to 1.

    Thanks
    Venkat

  9. #9
    Join Date
    Jun 2005
    Posts
    4,230

    Default

    I'm not really sure how the SingleConnectionFactory would behave if the broker isn't stable. Try the CachingConnectionFactory (which should be the default for everyone from M3 onwards). At least you aren't losing any messages now, so the container is getting something right.

  10. #10

    Default

    Good morning Dave.
    Dave I tried with CachingConnectionFactory. Messages were not getting consumed on start of Listener.
    I am finding another strange issue.
    With autoAcknowledgeMode=TRUE, when the broker was down, messages were rolled back. They were not getting consumed on start of spring-amqp Message Listener. At this point I used a consumer code with plain RabbitMQ API to consume all messages.
    When the messages are cleared I tried to reproduce the problem.
    - Publish 100 messages
    - Bring the broker down after 10 messages are consumed.
    - Bring up the broker
    - check messages_ready. I see messages are zero.
    This is strange, if I consume messages with RabbitMQ API consumer,
    can't reproduce the problem.

    Thanks
    Venkat

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •