Page 1 of 2 12 LastLast
Results 1 to 10 of 17

Thread: Messages not Being Sent?

  1. #1

    Default Messages not Being Sent?

    Hi All,

    I'm fairly certain that I'm doing something simple - wrongly! I am also violating one of my cardinal rules about coding late into a Friday night.

    In any event, I have this demo app that establishes a producer thread and a consumer thread.

    The main thread init code creates a topic exchange and a queue like this:

    Code:
    		ConnectionFactory connectionFactory = getConnectionFactory();
    		rabbitTemplate = new RabbitTemplate(connectionFactory);                  // create Spring's implementation of the AmqpTemplate interface
    
    		rabbitAdmin = new RabbitAdmin(rabbitTemplate.getConnectionFactory());    // get a RabbitAdmin providing required connectionFactory ctor arg
    		
    		Exchange topicExchange = new TopicExchange("pbTopicExchange", false, false);    // non-durable, and no auto-delete
    		rabbitAdmin.declareExchange(topicExchange);
    		
    		Queue queue = new Queue("pbJobMgmtRqst", false, false, false);           // non-durable, non-exclusive, no auto-delete
    		rabbitAdmin.declareQueue(queue);                                         // NB: the RabbitAdmin object links the exchange and the queue
    The producer's run method looks like this:

    Code:
    public void run()
    	{
    		System.out.println("ENTERED PRODUCER THREAD");
    
    		ConnectionFactory connectionFactory = getConnectionFactory();
    		rabbitTemplate = new RabbitTemplate(connectionFactory);              
    
    		rabbitAdmin = new RabbitAdmin(rabbitTemplate.getConnectionFactory());    // get a RabbitAdmin providing required connectionFactory ctor arg
    		
    		Queue queue = new Queue("pbJobMgmtRqst", false, false, false);           // non-durable, non-exclusive, no auto-delete
    		rabbitAdmin.declareQueue(queue);                                         // NB: the RabbitAdmin object links the exchange and the queue
    		
    		String strMsg = "This is a test...";
    		
    		byte[] msgBody = null;
    		
    		try
    		{
    			msgBody = strMsg.getBytes("UTF-8");
    		}
    		catch(Exception e)
    		{
    			System.out.println("CAUGHT EXCEPTION");
    		}
    		
    		MessageProperties msgProperties = new MessageProperties();
    		Message msg = new Message(msgBody, msgProperties);
    		
    		System.out.println("SENDING MSG to exchange " + exchangeName);
    		
    		for ( int n=0; n < 100; n++)
    		{
    			rabbitTemplate.send(exchangeName, "mgmt.clientID.1234", msg);
    		}
    	}
    After running this (let's forget about consumer thread for the moment), I see no evidence of any messages queued against queue "pbJobMgmtRqst." That is, the RabbitMQ admin plugin shows 0 Ready, 0 Unacknowledged, 0 Total.

    The consumer has not run.

    The admin console also shows the following under Queues -> Bindings:

    Incoming to pbJobMgmtRqst
    From: (AMQP default)
    Routing key: pbJobMgmtRqst

    I found this odd because I thought that with statement

    Code:
    rabbitTemplate.send(exchangeName, "mgmt.clientID.1234", msg);
    I had specified an Exchange name of "pbTopicExchange." This is set in the producer thread's constructor.

    So before I get to the consumer issues (another post to this thread), some questions:

    a. shouldn't I be seeing in the admin console some evidence of the 100 messages I sent ?
    b. why does the admin console show "AMQP default" as the exchange?

    On a side note: I am finding discrepancies between the online documentation at

    http://static.springsource.org/sprin...1.0.x/apidocs/

    and the Spring AMQP - Reference Documentation 1.0.0.RELEASE PDF. For example, the PDF shows

    Code:
    Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");
    But the online docs give no instance of BindBuilder.bind returning a Binding. Instead, .bind returns something called a BindingBuilder.DestinationConfigurer. In fact, my Eclipse IDE changed my use of Binding to GenericArgumentsConfigurer.

    The consumer thread is the next issue but, again, I will save that for later.

    I would be grateful for any help on this.

    Thanks.

    -Paul

  2. #2
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,853

    Default

    Paul,

    The following would send a message to the Exchange named by the value in 'exchangeName' with a routing key of 'mgmt.clientID.1234':

    Code:
    rabbitTemplate.send(exchangeName, "mgmt.clientID.1234", msg);
    But from what you mentioned above, it sounds like you want to send to your 'pbJobMgmtRqst' queue, and that is only bound to the default no-name Exchange.

    So to send to it would require something like this:

    Code:
    rabbitTemplate.send("pbJobMgmtRqst", msg);
    That 2-arg version of the send method expects a 'routing-key' and the Message instance. When a Queue is bound only to the default exchange, you must send to that Exchange with the Queue's name as routing-key if you want the message to be sent to that Queue.

    Hope that helps.
    -Mark

  3. #3

    Default

    Hi Mark,

    Thank you for your reply.

    Yes, I do want to send the message to queue "pbJobMgmtRqst." So, I think that my question boils down to why isn't this queue bound to exchange "pbTopicExchange?"

    I'm a bit confused here. My producer does this:

    Code:
    rabbitTemplate.send(exchangeName, "mgmt.clientID.1234", msg);
    with variable exchangeName set to "pbTopicExchange." So why isn't the message sent to that exchange?

    From a different perspective: my understanding is that the act of binding devolves on the consumer only. That is, my "main" thread creates the exchange and the queue. My producer idempotently declares the same queue and, in sending to exchange "pbTopicExchange," specifies the aforementioned routing key. But only the consumer needs to actually bind this queue to the exchange - right?

    I feel like I am missing something very obvious....

    Thanks for your help.

    -Paul

  4. #4
    Join Date
    Jun 2005
    Posts
    4,241

    Default

    You are free to bind queues to exchanges wherever you like, but until they are bound you can't expect messages sent to the exchange to show up on the queue. That seems to be the gap here. Your producer is undoubtedly sending the messages successfully but the broker has no route for them so they will be discarded.

  5. #5

    Default

    Dave, thanks.

    I think that your analysis is dispositive.

    Does this mean that the binding entity, the consumer in my case, must be started first?

    I have been under the impression that order didn't matter in this regard, i.e., producer could start first and its messages would be held at the exchange.

    If this is not so, how does one handle a scenario in which the consumer bounces? While it's down messages sent by the producer will simply be lost.

    Perhaps tomorrow afternoon I will post here the difficulties I am having with my consumer. As noted, I am having some difficulty squaring the online DOCS against the spring AMQP 1.0.0 reference PDF. At least in part because of these difficulties, I haven't gotten the consumers bind code to work.

    This is my first experience with a topic exchange. Fanout was easy!

    Thanks for your help.

    -paul*

  6. #6
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,853

    Default

    Paul, I'd recommend a quick look at these tutorials, especially #5: http://www.rabbitmq.com/getstarted.html

    That should clarify how Queues bind to a Topic Exchange and the role of the routing key. Then, on the Spring AMQP side - OR directly in the broker config or admin UI, you would just bind your Queue to the TopicExchange with a pattern that determines which messages should be routed to that Queue. At that point, sending with RabbitTemplate is a matter of passing the exchangeName, the routingKey, and the Message.

    Hope that helps get you on the right track.
    -Mark

  7. #7

    Default

    Thanks Mark.

    I have indeed looked at that tutorial and I think I have an
    OK understanding of topic exchange, binding key and routing
    key.

    My difficulty or, rather, a difficulty concerns the binding code
    itself. I think I'm doin' it wrong. I will post it tmrw when. I
    get back from Boston.

    While I have your attention (and Dave's), which i appreciate,
    can you look at my earlier post about sharing an AmqpTemplate?

    Thanks.

    -paul

  8. #8

    Default

    Hi again,

    As promised (or threatened!) here is the consumer code. This is the constructor for my class AmqpDemoConsumer. I know that there is something wrong here.

    Code:
    public AmqpDemoConsumer(String qName, String bindKey, int concurrConsumers) throws Exception 
    {
      queueName = qName; 
      bindingKey = bindKey; 
      Queue queue = new Queue(queueName, false, false, false); 
      Exchange topicExchange = new TopicExchange("pbTopicExchange", false, false); // non-durable, and no auto-delete 
    
    //Binding b = new Binding(someQueue, someTopicExchange, "foo.*"); 
      
      GenericArgumentsConfigurer b = BindingBuilder.bind(queue).to(topicExchange).with(bindingKey); 
      numberConcurrentConsumers = concurrConsumers; 
      ConsumerSimpleMessageListenerContainer container =  new ConsumerSimpleMessageListenerContainer(); 
      container.setConnectionFactory(getConnectionFactory());     // connect to broker node 
      container.setQueueNames(queueName);                             // set name of Q whence receive messages 
      container.setConcurrentConsumers(numberConcurrentConsumers); 
      container.setMessageListener(new MessageListenerAdapter(new ConsumerHandler(), new SimpleMessageConverter())); 
      Queue[] qArray = new Queue[1]; 
      qArray[0] = queue;
      container.setQueues(qArray);
      container.startConsumers();
    }
    When I tried to use the BindingBuilder.bind call, I thought (based on the Spring AMQP 1.0.0 reference PDF) that I would get back a Binding object. When, per that documentation I first coded:

    Code:
    Binding b = BindingBuilder.bind(queue).to(topicExchange).with(bindingKey);
    my Eclipse IDE complained "type mismatch: cannot convert from BindingBuilder.GenericArgumentsConfigurer to Binding"

    This hearkens back to my question about the seeming inconsistencies between the reference PDF and the online Spring AMQP docs. Page 7 of that document shows this example:

    Code:
    Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");
    What am I doing wrong? Put otherwise, can someone show me how, using Spring AMQP, I should establish the binding to the topic exchange?

    The next issue is that, because I couldn't establish a Binding object, I never bound the pbJobMgmtRqst to the topic exchange. Based on what Mark and Dave have said, I assume that this failure is why no messages ever show up: the broker sees no outbound mapping for the messages my producer is sending - so he discards them. Is this analysis correct?

    Finally, a propos of this broker behavior, there remains the question that I asked Dave a few days ago; to wit, if the broker behavior is to discard messages for which he has no outbound mapping (i.e., no one having bound the queue to the exchange with a binding key that matches routing key), then doesn't this mean the binding entity (consumer in my case) must always be up before the producer starts?

    If so, this seems problematic to me, especially in so robust a system as Rabbit. If producing and consuming entities are significantly decoupled, to the point where they need know nothing about each other, then don't we in fact have a re-coupling here; i.e., why should producer have to think about whether or not the consumer is up??

    Thanks for your help.

    -Paul

  9. #9
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,139

    Default

    The problem is the compiler doesn't know what the Exchange type is...

    Code:
    Exchange topicExchange = new TopicExchange(...
    You need to declare topicExchange as a TopicExchange for the fluent API to work properly - take a look at the various to() methods on the DestinationConfigurer (which is returned by the bind() method) - the to() return type depends on the parameter type.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  10. #10
    Join Date
    Jun 2005
    Posts
    4,241

    Default

    Quote Originally Posted by PMBell;404960if the broker behavior is to discard messages for which he has no outbound mapping (i.e., no one having bound the queue to the exchange with a binding key that matches routing key), then doesn't this mean the binding entity (consumer in my case) must always be up [B
    before[/B] the producer starts?
    That's the AMQP protocol - it's that way by design whether we like it or not. Many users of AMQP actually don't care much about consumers missing messages. For those that do, what it means in practice is that if you can't rely on your consumers starting before your producers, then you have to create the bindings somewhere other than your consumer. Remember bindings can be durable, so once you have them set up you don't have to have this problem ever again with the same broker instance.

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •