Messages not ACK'd with AcknowledgeMode.AUTO without setting SimpleMessageListenerCon
I've noticed an issue when writing unit tests that run both Publisher and Consumer threads.
In this scenario(but not if I run consumer and producer as separate processes), when I set the AcknowledgeMode to AUTO on a SimpleMessageListenerContainer, the broker does not appear to recieve an ACK for the recevied message, and after the test shuts down the message is requeued.
But if I set the scope of the SimpleMessageListenerContainer to prototype, I get the correct behaviour.
This is not done in the samples in the docs<http://static.springsource.org/spring-amqp/reference/html/> so I'm wondering if this is some kind of Channel thread-safety issue, or if I'm just doing something wrong?
If it is just the case that SimpleMessageListenerContainer should be scoped as prototype, then can this be documented. And also, if using the <rabbit:listener-container> XML configuration, how would I set the scope?
This was with environment:
spring-amqp -1.0.0-RELEASE
RabbitMQ-2.6.0
Java-1.6.0_26
See sample code below:
Code:
package example;
import static org.junit.Assert.*;
import java.util.concurrent.ArrayBlockingQueue;
import org.apache.log4j.Logger;
import org.junit.Before;
import org.junit.Test;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
public class ExampleTest
{
private static final Logger log = Logger.getLogger(ExampleTest.class);
@Autowired
private AmqpTemplate tmpl;
@Autowired
private SimpleMessageListenerContainer container;
@Autowired
private ExampleMessageHandler msgHandler;
@Before
public void setup()
{
ApplicationContext context = new AnnotationConfigApplicationContext(ExampleConfiguration.class);
context.getAutowireCapableBeanFactory().autowireBean(this);
}
@Test(timeout=5000)
public void testPublishAndConsume() throws Exception
{
// Create and start a Consumer thread.
container.start();
// Publish a job.
String sent = "msg1";
log.debug("Publishing message - " + sent);
tmpl.send(new Message(sent.getBytes(), new MessageProperties()));
log.debug("Message published - " + sent);
// Wait for job to be received by consumer.
String received = msgHandler.waitForMessage();
log.debug("Recieved message = " + String.valueOf(received));
assertNotNull(received);
assertEquals(sent, received);
}
}
class ExampleMessageHandler implements MessageListener
{
private final static Logger log = Logger.getLogger(ExampleMessageHandler.class);
private ArrayBlockingQueue<String> receivedStack = new ArrayBlockingQueue<String>(100);
public void onMessage(Message msg)
{
// Add to stack in non-blocking way.
String body = new String(msg.getBody());
log.debug("Received job - " + String.valueOf(body));
try {
receivedStack.put(body);
}
catch (InterruptedException e) {
System.err.println("Thread interrupted before passed job could be stored in stack.");
}
}
public String waitForMessage() throws InterruptedException
{
// Call take() which blocks until a message is in the stack.
return receivedStack.take();
}
}
@Configuration
class ExampleConfiguration
{
private String host = "msrabbitmqqa.morningstar.com";
private int port = 5672;
private String username = "test";
private String password = "test";
private String vhost = "test";
private String queueName = "test-queue";
private String exchangeName = "test-exchange";
@Bean
public ConnectionFactory connectionFactory()
{
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(vhost);
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin()
{
return new RabbitAdmin(connectionFactory());
}
@Bean
public AmqpTemplate amqpTemplate()
{
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setRoutingKey("");
template.setQueue(this.queueName);
template.setExchange(exchangeName);
return template;
}
@Bean
public Queue queue()
{
return new Queue(this.queueName, true, false, false);
}
@Bean
public Exchange exchange()
{
return new DirectExchange(exchangeName, true, false);
}
@Bean
public Binding binding()
{
return BindingBuilder.bind(queue()).to((DirectExchange) exchange()).with("");
}
@Bean ExampleMessageHandler messageListener()
{
return new ExampleMessageHandler();
}
// NOTE: If uncomment below Scope line, then example works. As-is, the broker
// received no ack from the Container even though have set mode to AUTO.
@Scope(value = "prototype")
@Bean
public SimpleMessageListenerContainer listenerContainer()
{
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueNames(this.queueName);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setMessageListener(messageListener());
return container;
}
}
thanks
Mike