Good afternoon.
I am trying to test guaranteed delivery using spring amqp.
Making sure that:
1. durable queue is used.
2. delivery mode as PERSISTENT_MODE
3. channelTransacted.
To test the guaranteed delivery scenario I am doing the following:
- publish 100 messages
- In the consumer I am incrementing the counter.
- When I see the count of 10 messages, bringing down RabbitMQ broker.
- Bring up RabbitMQ broker, using RabbitMQ Management utility checking how many messages_ready are there.
I found that there are no messages_ready ( in other words messages_ready count is zero).
- Bring up the consumer expecting to process rest of the 90 messages.
Since there are messages_ready in the queue I don't see any count print stataments.
This means I lost 90 messages when I brought down RabbitMQ broker.
I have repeated this test by publishing 20000 messages.
Before I bring down RabbitMQ broker, 645 messages are processed.
Bring down Broker and bring it up and checked messages_ready in the queue. The count is 17609.
17609 + 645 = 18254 messages in tact out of 20000.
1746 messages are missing.
Could you please let me know if I am doing something wrong in my queueing configuration.
Following is the queueing configuration I am using:
================================================== =
@Configuration
public abstract class AbstractMDBRabbitConfiguration extends AbstractRabbitConfiguration{
protected abstract void configureMDBTemplate(DMBTemplate template);
@Value("${rabbitmq.user}")
private String rabbitMQUser;
@Value("${rabbitmq.password}")
private String rabbitMQPassword;
@Value("${rabbitmq.host}")
private String rabbitMQHost;
@Bean
public ConnectionFactory connectionFactory() {
SingleConnectionFactory connectionFactory = new SingleConnectionFactory(rabbitMQHost);
connectionFactory.setUsername(rabbitMQUser);
connectionFactory.setPassword(rabbitMQPassword);
return connectionFactory;
}
public DMBTemplate rabbitTemplate() {
DMBTemplate template = new DMBTemplate(connectionFactory());
template.setMessageConverter(messageConverter());
configureMDBTemplate(template);
return template;
}
@Bean
public MessageConverter messageConverter() {
return new SimpleMessageConverter();
}
}
================================================== ==============================================
@Configuration
public class SecondQConfiguration extends AbstractMDBRabbitConfiguration{
private static final String MDB_SECQ_EXCHANGE_NAME = "mdb.testq.exchange";
private static final String MDB_SECQ_NAME = "mdb.testq.queue";
private static final String MDB_SECQ_ROUTING_KEY = MDB_SECQ_NAME;
@Override
protected void configureMDBTemplate(DMBTemplate template) {
template.setExchange(MDB_SECQ_EXCHANGE_NAME);
template.setRoutingKey(MDB_SECQ_NAME);
}
@Bean
public DMBTemplate secondQTemplate() {
return rabbitTemplate();
}
@Bean
public Queue mdbSecQ() {
Queue q = new Queue(MDB_SECQ_NAME);
q.setAutoDelete(false);
q.setDurable(true);---------------------> Making sure that queue is durable
return q;
}
@Bean
public TopicExchange mdbSecQExchange() {
TopicExchange t = new TopicExchange(MDB_SECQ_EXCHANGE_NAME);
t.setAutoDelete(false);
t.setDurable(true);
return t;
}
@Bean
public Binding mdbSecQBinding() {
return BindingBuilder.from(mdbSecQ()).to(mdbSecQExchange( )).with(MDB_SECQ_ROUTING_KEY);
}
}
================================================== ================================================== ======
public class DMBTemplate extends RabbitTemplate {
public DMBTemplate() {
super();
}
public DMBTemplate(ConnectionFactory connectionFactory) {
super(connectionFactory);
}
@Override
public void convertAndSend(Object object) throws DMBMessagingException {
try {
super.convertAndSend(object);
} catch (AmqpException amqpe) {
throw new DMBMessagingException(amqpe);
}
}
@Override
public void convertAndSend(String exchange, String routingKey,
final Object message) throws AmqpException {
MessageProperties props =new MessageProperties();
props.setDeliveryMode(MessageDeliveryMode.PERSISTE NT); -------> setting delivery mode as PERSISTENT
MessageConverter msgConverter = getMessageConverter();
send(exchange, routingKey, getMessageConverter().toMessage(message, props));
}
}
================================================== ================================================== ==========
Handler that processes messages:
================================
public class SecondQConsumer implements IDMBProcessor{
private int count = 0;
public void handleMessage(DMBMessage msg) {
DMBMessagePayload pl = msg.getMessagePayload();
String message = (String)pl.getMessage();
count++;
System.out.println("count:"+ count);
}
}
================================================== ================================================== =====
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jaxws="http://cxf.apache.org/jaxws"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schem...-beans-3.0.xsd
http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<!-- pick up rabbit broker configuration -->
<context:component-scan base-package="com.test">
</context:component-scan>
<contextroperty-placeholder location="file:///var/test/amqp.properties"/>
<bean id="messageListenerContainer" class="org.springframework.amqp.rabbit.listener.Si mpleMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="queueName" value="mdb.testq.queue"/>
<property name="concurrentConsumers" value="1" />
<property name="messageListener" ref="messageListenerAdapter" />
<property name="channelTransacted" value="true" />
<property name="transactionManager" ref="txnManager" />
</bean>
<bean id="messageListenerAdapter" class="org.springframework.amqp.rabbit.listener.ad apter.MessageListenerAdapter">
<property name="delegate" ref="testHandler" />
<property name="messageConverter" ref="messageConverter" />
</bean>
<bean id="testHandler" class="com.test.SecondQConsumer">
</bean>
<bean id="txnManager" class="org.springframework.batch.support.transacti on.ResourcelessTransactionManager" />
</beans>
To make sure messages rolled back on exception, using channelTransacted = true and using transactionManager as
org.springframework.batch.support.transaction.Reso urcelessTransactionManager.
================================================== ================================================== ======================
Thanks
Venkat


roperty-placeholder location="file:///var/test/amqp.properties"/>
Reply With Quote
