CachingConnectionFactory leaks channels in when synchronized with a transaction?
Hello everyone,
I am switching to using transacted channels for guaranteed delivery and I started observing some weird behavior in my connections channels.
After several publishes, the broker is running out of memory because it seems that channels, when channelTransacted=true AND when on a thread with an ACTIVE transaction that the PlatformTransactionManager knows about (e.g. when the RabbitResourceHolder is synchronized with the current DB transaction).
I've stepped through the code, and realize that the channel.commit() and channel.close() operations are being deferred until after the transaction manager has committed the transaction. This is a good thing.
From ConnectionFactoryUtils.java:141
Code:
public static void releaseResources(RabbitResourceHolder resourceHolder) {
if (resourceHolder == null || resourceHolder.isSynchronizedWithTransaction()) {
return;
}
RabbitUtils.closeChannel(resourceHolder.getChannel());
RabbitUtils.closeConnection(resourceHolder.getConnection());
}
The difficulty occurs when the ResourceHolder afterComplete() fires after the transaction has been completed.
From ResourceHolderSynchronization.java:96:
Code:
if (releaseNecessary) {
releaseResource(this.resourceHolder, this.resourceKey);
}
This properly should close the channel and the connection (releasing the channel to the cachingconnectionfactory, and not closing the connection, due to reuse of the connection for amqp)
The difficulty is that:
ConnectionFactoryUtils.java$RabbitResourceSynchron ization:256
Code:
protected void releaseResource(RabbitResourceHolder resourceHolder, Object resourceKey) {
ConnectionFactoryUtils.releaseResources(resourceHolder);
}
calls releaseResources... and the resourceHolder is still marked as synchronized with the transaction. So this method is a NOP, and the channel stays unclosed. Subsequent publishes therefore do not see existing channels in the CachingConnectionFactory, and every transacted publish within the scope of an existing transaction will result in a new channel until the broker runs out of memory.
Am I missing some configuration somewhere?
My workaround (which is not acceptable for guaranteed delivery) is to use channelTransacted = false
Dependencies:
$ mvn dependency:list | grep amqp
[INFO] com.rabbitmq:amqp-client:jar:2.5.0:compile
[INFO] org.springframework.amqp:spring-amqp:jar:1.0.0.RELEASE:compile
[INFO] org.springframework.amqp:spring-rabbit:jar:1.0.0.RELEASE:compile
[INFO] org.springframework.integration:spring-integration-amqp:jar:2.1.0.M1:compile
Thanks!
--neoha