Results 1 to 10 of 10

Thread: RabbitMQ and TopicExchange with Spring

  1. #1

    Default RabbitMQ and TopicExchange with Spring

    Hi,
    as the title says I have got a question regarding TopicExchange via Spring. My Consumer and ProducerConfiguration all are based upon the class ServiceAMQPConfiguration:

    Code:
    @Configuration
    public class ServiceAMQPConfiguration {
    	
    	protected final String vhost = "vhost";
    	protected final String vhostUser = "user";
    	protected final String vhostPassword = "passwd";
    	
    	private int port = 5672;
    	
    	protected final String queueName = "queue";
    	protected final String topicExchange = "queue.command";
    
    	@Bean
    	public ConnectionFactory connectionFactory() {
    		CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
    		connectionFactory.setVirtualHost(vhost);
    		connectionFactory.setPort(port);
    		connectionFactory.setUsername(vhostUser);
    		connectionFactory.setPassword(vhostPassword);
    		return connectionFactory;
    	}
    	
    	@Bean
    	public AmqpAdmin amqpAdmin() {
    		return new RabbitAdmin(connectionFactory());
    	}
    
    	@Bean
    	public RabbitTemplate rabbitTemplate() {
    		RabbitTemplate template = new RabbitTemplate(connectionFactory());
    		template.setQueue(this.queueName);
    		template.setExchange(topicExchange);
    		template.setRoutingKey(this.queueName);
    		return template;
    	}
    
    	@Bean
    	public Queue serviceQueue() {
    		Queue queue = new Queue(this.queueName);
    		return queue;
    	}		
    	
    }
    My Consumer Configuration looks as follows:

    Code:
    @Configuration
    public class ConsumerConfiguration extends ServiceAMQPConfiguration {
    	
    		
    	@Bean
    	public SimpleMessageListenerContainer listenerContainer() {
    		SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    		
    		container.setConnectionFactory(connectionFactory());
    		container.setQueueNames(this.queueName);
    		container.setMessageListener(messageListenerAdapter());
    		
    		return container;
    	}
    	
    	@Bean 
    	public MessageListenerAdapter messageListenerAdapter() {
    		return new MessageListenerAdapter(new ServiceBroker());		
    	}
    
    	
    }
    My producer configuration as:

    Code:
            @Bean
    	public ScheduledProducer scheduledProducer() {
    		return new ScheduledProducer();
    	}
    
    	@Bean
    	public BeanPostProcessor postProcessor() {
    		return new ScheduledAnnotationBeanPostProcessor();
    	}
    
    
    	static class ScheduledProducer {
    
    		@Autowired
    		private volatile RabbitTemplate rabbitTemplate;
    		protected final String queueName = "queue";
    		protected final String topicExchange = "queue.command";
    
    		
    		@Scheduled(fixedRate = 3000)
    		public void sendMessage() {
    			rabbitTemplate.convertAndSend(topicExchange, queueName, "This is a test message");
    		}
    	}
    So my questions are the following:

    - There are actually no messages send, why?
    - Do I have a basic misunderstanding regarding the setup?

    Could someone give me a hint, based on this code structure?

  2. #2
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,854

    Default

    Do you have a binding for that Queue to that TopicExchange already configured on the broker? If not, I think that's your missing piece. Also, if you are just mapping the literal name of the Queue (i.e. no patterns), then you might be okay with a DirectExchange. Is there a specific reason you are using a TopicExchange?

  3. #3

    Default

    Hi Mark,
    thanks for your quick reply. No atm I don't have any binding. I need to put the Binding into the Consumer, right? And queue.command should be in future queue.command.*. How do I define the Binding properly and wire it into my code with the usage of spring?

  4. #4
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,854

    Default

    Check out this Reference Manual section: http://static.springsource.org/sprin...-configuration

    That shows both XML namespace and @Bean configuration options with the latter using BindingBuilder.

    Hope that helps.
    -Mark

  5. #5

    Default

    Hi Mark,
    okay I changed my producer and consumer configuration as follows:

    Consumer:
    Code:
    @Value("${service.command.pattern}")
        private String commandRoutingKey;
    		
    	@Bean
    	public SimpleMessageListenerContainer listenerContainer() {
    		SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    		
    		container.setConnectionFactory(connectionFactory());
    		container.setQueueNames(this.queueName);
    		container.setMessageListener(messageListenerAdapter());
    		
    		return container;
    	}
    	
    	@Bean 
    	public MessageListenerAdapter messageListenerAdapter() {
    		return new MessageListenerAdapter(new ServiceBroker());		
    	}
    	
    	@Bean
        public Queue commandQueue() {
            return amqpAdmin().declareQueue();
        }
    
        @Bean
        public Binding marketDataBinding() {
            return BindingBuilder.bind(
                    commandQueue()).to(commandExchange()).with(commandRoutingKey);
        }
    Producer:
    Code:
    @Bean
    	public ScheduledProducer scheduledProducer() {
    		return new ScheduledProducer();
    	}
    
    	@Bean
    	public BeanPostProcessor postProcessor() {
    		return new ScheduledAnnotationBeanPostProcessor();
    	}
    
    
    	static class ScheduledProducer {
    
    		@Autowired
    		private volatile RabbitTemplate rabbitTemplate;
    		protected final String queueName = "service";
    		protected final String topicExchange = "service.command";
    
    		
    		@Scheduled(fixedRate = 3000)
    		public void sendMessage() {
    			rabbitTemplate.convertAndSend(topicExchange, queueName, "This is a test message");
    		}
    	}
    But still no messages... :-(

    AbstractConfiguration looks like this:

    Code:
    protected final String vhost = "";
    	protected final String vhostUser = "guest";
    	protected final String vhostPassword = "guest";
    	
    	private int port = 5672;
    	
    	protected final String queueName = "service";
    	protected final String topicExchange = "service.command";
    
    	@Bean
    	public ConnectionFactory connectionFactory() {
    		CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
    		//connectionFactory.setVirtualHost(vhost);
    		connectionFactory.setPort(port);
    		connectionFactory.setUsername(vhostUser);
    		connectionFactory.setPassword(vhostPassword);
    		return connectionFactory;
    	}
    	
    	@Bean
    	public AmqpAdmin amqpAdmin() {
    		return new RabbitAdmin(connectionFactory());
    	}
    	
    
    	@Bean
    	public RabbitTemplate rabbitTemplate() {
    		RabbitTemplate template = new RabbitTemplate(connectionFactory());
    		template.setQueue(this.queueName);
    		template.setExchange(this.topicExchange);
    		template.setRoutingKey(this.queueName);
    		return template;
    	}
    
    	@Bean
    	public Queue serviceQueue() {
    		Queue queue = new Queue(this.queueName);
    		return queue;
    	}	
    	
    	@Bean
    	public TopicExchange commandExchange() {
    		return new TopicExchange(topicExchange, true, false);
    	}
    Could you help me out with that? I don't know what I am actually doing wrong... :-(

    Thanks Johannes

  6. #6
    Join Date
    Jun 2005
    Posts
    4,241

    Default

    Are you sure that ${service.command.pattern} is being replaced with something that matches the routing key (log it in your config method maybe)? Are you sure you are actually sending messages (e.g. put a log statement in your producer)?

  7. #7

    Default

    Hi Dave,
    yes I think so on both cases.

    Question one: ${service.command.pattern} is loaded from the config file. To prove it I replaced and hardcoded it in the producer with:
    private String commandRoutingKey = "service.command.*";

    Also I changed the producer here:
    Code:
    static class ScheduledProducer {
    
    		@Autowired
    		private volatile RabbitTemplate rabbitTemplate;
    		protected final String queueName = "service";
    		protected final String topicExchange = "service.command.POWER";
    
    		
    		@Scheduled(fixedRate = 3000)
    		public void sendMessage() {
    			rabbitTemplate.convertAndSend(topicExchange, queueName, "This is a test message");
    		}
    	}
    Line important: protected final String topicExchange = "service.command.POWER";

    When I set a breakpoint in rabbitTemplate.convertAndSend(...); I can see the method being executed. I am totally out of ideas. :-(

  8. #8
    Join Date
    Jun 2005
    Posts
    4,241

    Default

    I think you are almost there, but your bindings and routing keys do not match. It's hard to keep track because we are changing things in every post, and I don't think I've seen a complete end-to-end example yet.

    Correct me if I'm wrong, you have:

    Code:
    topic exchange name = service.command.POWER (changed from last post)
    queue name = <auto-generated and irrelevant>
    binding key (between the two): service.command.*
    routing key (confusingly called "queueName" in the code) = service
    Then you send a message to the exchange with a with the routing key=service (queueName). This routing key does not match the binding, so the message is discarded. Possibly you meant to use a routing key service.command.POWER? The exchange name is irrelevant as long as the binding and the producer use the same value, just as the queue name is irrelevant as long as the consumer and binding agree.

  9. #9

    Default

    Hi,
    okay I played around a little bit, and went along the more complex code sample provided in the Spring & RabbitMQ downloadables...My configuration regarding the queues look like this:

    Code:
    In the AbstractConfiguration:
    protected final String QUEUE_NAME = "service";
    protected final String ROUTING_KEY_NAME = QUEUE_NAME;
    protected final String EXCHANGE_NAME = "service.command";
    
    In the ProducerConfiguration I added:
    private String ROUTING_KEY = "service.command.*";
    Doing a list_bindings on the RabbitMQ server results in:

    Code:
    Listing bindings ...
            exchange        650dfac8-c928-4f14-9f0b-e475822babdf    queue   650dfac8-c928-4f14-9f0b-e475822babdf    []
            exchange        service queue   service []
    service.command exchange        650dfac8-c928-4f14-9f0b-e475822babdf    queue   service.command.*   []
    ...done.
    This is not what I expected, any idea? Please help me, it is driving me nuts! :-(
    Last edited by johanneshiemer; Oct 20th, 2011 at 03:30 AM.

  10. #10

    Default

    Okay, solved got it running.

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •