RabbitMQ channels not getting released and connections hanging in blocked/blocking st
Good afternoon Dave.
Dave we are still using spring-amqp 1.0.0M3 version. We didn't get a chance to migrate to 1.0.0 release.
We are using channel transaction while sending and consuming messages.
For example DMBTemplate which extends RabbitTemplate is set with channel transacted as true and
also the consumer configuration is set with channel transacted as true. In addition we are injecting
transactionManager as org.springframework.batch.support.transaction.Reso urcelessTransactionManager.
In one of the previous threads you suggested not to use transactionManager since it is not doing anything.
The reason for including it is: referring to SecondQConsumer.handleMessage() method, in this method
the business logic required to split a message and send those messages to another queue.
If we don't include transactionManger what we observed is that after splitting messages and making calls to
send messages to another queue, if an exception is thrown, messages were still sent to next queue.
When we added transactionManager, we found that it is behaving as a unit of work. That's the reason why transactionManager is used.
Unfortutantely we are running into another problem. One of the developers coded two queues.
In the first queue handler, when the message is received it checks for some criteria, if the criteria is not met then
sleep for 3 seconds and send it to a delayQueue where it waits for another 3-5 seconds and come back to the first queue.
There was no business logic coded to handle a message that has bad data. When QA team performed endurance test running a testcase
for 4 concurrent users for about 8 hours, we found that about 35000 channels with transaction are hanging and not getting released.
We also found that about 40 connections some of them in blocking and some of them in blocked state. We used rabbitmqctl list_channels
to list channels and rabbitmqctl list_connections to list connections.
When QA team disabled this problematic testcase we found that only 27 channels hanging around and no connections in blocked/blocking state. In other words they have run test cases that have queues with channelTransacted as flase and no transactionMagarer injected into SimpleMessageListenerContainer.
Dave the incorrect business logic was causing rolling messages from one queue to another without getting consumed.
This we will correct it.
Could you please let me know what could be the cause of not releasing channels and letting connections in blocked/blocking state.
Do you think using transactionManager org.springframework.batch.support.transaction.Reso urcelessTransactionManager posing this problem.
I have attached configurations and classes for reference.
I appreciate and thank you for your feedback.
Thanks
Venkat
================================================== ===
@Configuration
public abstract class AbstractMDBRabbitConfiguration extends AbstractRabbitConfiguration{
protected abstract void configureMDBTemplate(DMBTemplate template);
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setChannelCacheSize(100);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public DMBTemplate rabbitTemplate() {
DMBTemplate template = new DMBTemplate(connectionFactory());
template.setMessageConverter(messageConverter());
configureMDBTemplate(template);
return template;
}
@Bean
public MessageConverter messageConverter() {
return new SimpleMessageConverter();
}
}
----------------------------------------------------
@Configuration
public class SecondQConfiguration extends AbstractMDBRabbitConfiguration{
private static final String MDB_SECQ_EXCHANGE_NAME = "mdb.testq.exchange";
private static final String MDB_SECQ_NAME = "mdb.testq.queue";
private static final String MDB_SECQ_ROUTING_KEY = MDB_SECQ_NAME;
@Override
protected void configureMDBTemplate(DMBTemplate template) {
template.setExchange(MDB_SECQ_EXCHANGE_NAME);
template.setRoutingKey(MDB_SECQ_NAME);
}
@Bean
public DMBTemplate rabbitTemplate() {
DMBTemplate template = new DMBTemplate(connectionFactory());
template.setChannelTransacted(true);
template.setMessageConverter(messageConverter());
configureMDBTemplate(template);
return template;
}
@Bean
public Queue mdbSecQ() {
Queue q = new Queue(MDB_SECQ_NAME);
q.setAutoDelete(false);
q.setDurable(true);
return q;
}
@Bean
public TopicExchange mdbSecQExchange() {
TopicExchange t = new TopicExchange(MDB_SECQ_EXCHANGE_NAME);
t.setAutoDelete(false);
t.setDurable(true);
return t;
}
@Bean
public Binding mdbSecQBinding() {
return BindingBuilder.from(mdbSecQ()).to(mdbSecQExchange( )).with(MDB_SECQ_ROUTING_KEY);
}
}
----------------------------------------------------------------------------------------------------
Utility class to get hold of rabbit template to send the message:
================================================== ===============
public class ApplicationContextUtil {
private static final AbstractApplicationContext cac3;
static {
cac3 = new ClassPathXmlApplicationContext("producer-secondq-bootstrap-config.xml");
cac3.registerShutdownHook();
}
public static DMBTemplate getDMBTemplate() {
return (DMBTemplate) cac3.getBean("rabbitTemplate");
}
}
For example DMBMessage is put on quueue "queue.one" as follows:
DMBTemplate template = ApplicationContextUtil.getDMBTemplate();
DMBMessage dmbMsg = new DMBMessage();
DMBMessagePayload payload = new DMBMessagePayload();
payload.set<essage("payload");
dmbMsg.setPayload(payload);
template.convertAndSend(dmbMsg);
-----------------------------------------------------------------------------------
Handler that processes messages:
================================
public class SecondQConsumer implements IDMBProcessor{
private int count = 0;
public void handleMessage(DMBMessage msg) {
//consume the message
DMBMessagePayload pl = msg.getMessagePayload();
String message = (String)pl.getMessage();
// Here there is logic to wait for 3 seconds and if the criteria doesn't meet put the message back into queue.
// In other cases the logic split xml message into peices and send them to another queue one by one.
// To make sure here the unit of work executes or rolled back by injecting org.springframework.batch.support.transacti on.ResourcelessTransactionManager
// into messageListenerContainer as transactionManager
}
}
-----------------------------------------------------------------------------------------------
Producer config: producer-secondq-bootstrap-config.xml
================================================== ====
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jaxws="http://cxf.apache.org/jaxws"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schem...-beans-3.0.xsd
http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<!-- pick up rabbit broker configuration -->
<context:component-scan base-package="com.test">
</context:component-scan>
</beans>
---------------------------------------------------------------------------
Consumer configuration: consumer--secondq-bootstrap-config.xml
================================================== ============
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schem...-beans-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<!-- pick up rabbit broker configuration -->
<context:component-scan base-package="com.test">
</context:component-scan>
<bean id="messageListenerContainer" class="org.springframework.amqp.rabbit.listener.Si mpleMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="queueName" value="mdb.testq.queue"/>
<property name="concurrentConsumers" value="1" />
<property name="messageListener" ref="messageListenerAdapter" />
<property name="channelTransacted" value="true" />
<property name="transactionManager" ref="txnManager" />
</bean>
<bean id="messageListenerAdapter" class="org.springframework.amqp.rabbit.listener.ad apter.MessageListenerAdapter">
<property name="delegate" ref="testHandler" />
<property name="messageConverter" ref="messageConverter" />
</bean>
<bean id="testHandler" class="com.test.SecondQConsumer">
</bean>
<bean id="txnManager" class="org.springframework.batch.support.transacti on.ResourcelessTransactionManager" />
</beans>
---------------------------------------------------------------------------------------------------------------------------