Results 1 to 7 of 7

Thread: Listen on multiple queues in Consumer

Hybrid View

  1. #1
    Join Date
    Feb 2011
    Posts
    12

    Default Listen on multiple queues in Consumer

    Hi,

    I have a windows application (written in Java) that will run 24/7.
    This application will act as a Consumer for RabbitMQ server.

    Different producers (other applications) will be populating messages into the Topic Exchange queue using different routing keys. My application/consumer has to create different queues (using some particular binding key that matches the routing key of producers) and listen for messages from those queues indefinitely.

    My question is, can the MessageListener.onMessage() method be invoked separately for each of these different queues created in the consumer, so that they listen for messages indefinitely.

    If yes, what is the approach to do this?

  2. #2
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,844

    Default

    Can you please clarify what you mean by this?: "so that they listen for messages indefinitely".

    Thanks,
    Mark

  3. #3
    Join Date
    Feb 2011
    Posts
    12

    Default

    Thanks Mark for looking into this.

    What I meant by "so that they listen for messages indefinitely" was, once we create a Queue, it has to listen (all the time) for incoming messages in that particular queue.
    The following code snippet from Non-Spring RabbitMQ consumer has "while(true)", which listens for messages all the time. Can we have the same functionality in Spring-AMQP as well?

    In Non-Spring RabbitMQ, the consumer code will have something like,

    QueueingConsumer consumer = new QueueingConsumer(chan);
    chan.basicConsume(queueName, false, consumer);
    while (true) {
    QueueingConsumer.Delivery delivery;
    try {
    delivery = consumer.nextDelivery();
    } catch (InterruptedException ie) {
    continue;
    }
    }

  4. #4
    Join Date
    Jun 2005
    Posts
    4,232

    Default

    That loop is basically the core of SimpleMessageListenerContainer. Did you try using that? It allows you to listen to multiple queues with one or more listeners. There are many integration tests and a couple of samples in Spring AMQP.

  5. #5
    Join Date
    Feb 2011
    Posts
    12

    Default

    Thanks Dave for your response.

    I did try using 'SimpleMessageListenerContainer' in the application code as shown below.

    But it seems the "new Binding()" is not working with the specified "RoutingKey" pattern in consumer code.
    When the producer sends message to the exchange 'Exchange1' with a queueName, irrespective of the RoutingKey, the message is always retrieved by onMessage() listener.

    Ideally, the message should only be retrieved, if the routingKey pattern matches with the 'routingKey' specified by the producer while publishing the message.

    Can you please help me to resolve this issue.

    Consumer Code:
    ===========

    public class Consumer {
    public static void main(String[] args) {
    AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ConsumerConfigu ration.class);
    RabbitAdmin amqpAdmin = (RabbitAdmin)context.getBean("amqpAdmin");

    Binding binding = new Binding(new Queue("TestQueue1"), new TopicExchange("Exchange1"), "TEST.ROUTING.#");
    amqpAdmin.declareBinding(binding);

    Binding binding1 = new Binding(new Queue("TestQueue2"), new TopicExchange("Exchange1"), "ROUTING.*");
    amqpAdmin.declareBinding(binding1);

    ConnectionFactory connectionFactory = (ConnectionFactory)context.getBean("connectionFact ory");
    List<String> queueList = new ArrayList<String>();
    queueList.add("TestQueue1");
    queueList.add("TestQueue2");

    Queue[] queues = new Queue[queueList.size()];
    for(int q = 0; q < queueList.size(); q++) {
    queues[q] = new Queue(queueList.get(q));
    }

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueues(queues);
    ConsumerLookup lookup = new ConsumerLookup();
    container.setMessageListener(lookup);
    container.start();
    }
    }


    ===========================

    public class ConsumerLookup implements MessageListener{

    @Override
    public void onMessage(Message msg) {

    System.out.println("RECEIVED TEXT:" + new String(msg.getBody()));
    }
    }


    =============================================

    @Configuration
    public class ConsumerConfiguration extends AbstractRabbitConfiguration {

    @Bean
    public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    return connectionFactory;
    }

    @Override
    public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    return template;
    }
    }

    ===================================
    Last edited by ponsu; Mar 28th, 2011 at 07:30 AM.

  6. #6
    Join Date
    Jun 2005
    Posts
    4,232

    Default

    The only problem I see there is that you don't declare the queues and exchanges. So I suspect that maybe you have some old durable bindings still declared. What is the result of running `rabbitmqctl list_bindings`?

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •