Page 2 of 2 FirstFirst 12
Results 11 to 17 of 17

Thread: Messages not Being Sent?

  1. #11

    Default

    Dave and Gary,

    Thanks again. The news about durable bindings is very good. And Gary's suggestion re defining a TopicExchange rather than a mere Exchange did the trick.

    So I am very close to getting this demo to work.

    However, when I run the consumer (now a distinct Java application rather than a thread created by the AmqpDemo class), I am getting this error (actually, I am getting thousands of them):

    Failed to invoke target method 'handleMessage' with argument type = [class org.springframework.amqp.core.Message], value = [{(Body:'This is a test, ...'; ID:null; Content:text/plain; Headers:{}; ExchangebTopicExchange; RoutingKey:mgmt.clientID.1234; Reply:null; DeliveryMode:PERSISTENT; DeliveryTag:701)}

    I think that ultimately (or originally) this error has something to do with the type of message I am sending. Rather, what each end thinks about the type of message. Here is the producing code:

    Code:
    String strMsg = "This is a test, ...";
    		
    		byte[] msgBody = null;
    		
    		try
    		{
    			msgBody = strMsg.getBytes("UTF-8");
    		}
    		catch(Exception e)
    		{
    			System.out.println("CAUGHT EXCEPTION");
    		}
    		
    		MessageProperties msgProperties = new MessageProperties();
    		msgProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
    		Message msg = new Message(msgBody, msgProperties);
    		rabbitTemplate.send(exchangeName, "mgmt.clientID.1234", msg);
    And here is how I set up for receiving messages in the consumer Java application:

    Code:
    		Queue queue = new Queue(queueName, false, false, false); 
    		TopicExchange topicExchange = new TopicExchange("pbTopicExchange", false, false);    // non-durable, and no auto-delete
    
    		Binding b = BindingBuilder.bind(queue).to(topicExchange).with(bindingKey);
    		rabbitAdmin.declareBinding(b);
    		
    		numberConcurrentConsumers = 1;
    		ConsumerSimpleMessageListenerContainer container = new ConsumerSimpleMessageListenerContainer();
    		container.setConnectionFactory(getConnectionFactory());                  // connect to broker node
    		container.setQueueNames(queueName);                                      // set name of Q whence receive messages
    		container.setConcurrentConsumers(numberConcurrentConsumers);
    		//container.setMessageListener(new MessageListenerAdapter(new ConsumerHandler(), new SimpleMessageConverter()));
    		container.setMessageListener(new MessageListenerAdapter(new ConsumerHandler(), null));
    		container.startConsumers();
    Please note the commented out line that calls setMessageListener. I was experimenting with a SimpleMessageConverter and, in the next line, tried setting that converter to null. In each case I continue to get the error, but the phenomena varies some; specifically, the text of the error message is a bit different (if I recall correctly).

    The ConsumerSimpleMessageListenerContainer is simply this:

    Code:
    public class ConsumerSimpleMessageListenerContainer extends SimpleMessageListenerContainer
    {
    	public void startConsumers() throws Exception
    	{
    		super.doStart();
    	}
    }
    And the handler itself is simply this:

    Code:
    public class ConsumerHandler  
    {
    	public void handleMessage(String text) throws Exception
    	{
    			System.out.println("Received--------------------------: " + text);
    	}
    }
    Again, I suspect this is a simple mistake.

    Can someone point out where my misunderstanding is?

    Thanks.

    -Paul

  2. #12
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,022

    Default

    Well, explicitly setting the adapter's message converter to null won't work - that will try to find a method handleMessage(Message m) (i.e. perform no conversion).

    The SimpleMessageConverter should work - can you share the exact text of the message you are getting in that configuration?

    Have you looked at using Spring Integration; it provides a much higher-level abstraction on top of all this? There's a sample here https://github.com/SpringSource/spri...ter/basic/amqp.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  3. #13

    Default

    Gary (and others),

    Your post arrived just before I was going to post news of some progress.

    I decided to try using one of the (slightly) higher-level APIs and, rather than call .send() of a Message, I tried .convertAndSend as follows:

    Code:
    byte[] theMsg = { 0x61, 0x62, 0x63 };
    rabbitTemplate.convertAndSend("pbTopicExchange", "mgmt.clientID.1234", theMsg);
    I also restored in the consumer's "set up" code the instruction:

    Code:
    container.setMessageListener(new MessageListenerAdapter(new ConsumerHandler(), new SimpleMessageConverter()));
    These changes worked and moved me to a happier place. I can now produce and consume both String and byte[] objects.

    But I remain interested in understanding what I did wrong when trying to send a "raw" Message. I note that in getting String and byte[] to work, I needed to change the handleMessage signature accordingly. In the case of a raw Message, is it a matter of changing this signature to

    Code:
    public void handleMessage(Message msg)
    ?

    Also, I suspect that in handling such messages I must somehow inform Spring AMQP that I don't need the services of a MessageConverter. Is this correct?

    Oh, in re Spring Integration: I am keenly interested in its capabilities. Also, I have noticed with interest that a few heavy hitters from Spring Integration weigh in frequently about AMQP. Learning something about Spring Integration is on my "to do" list. I forsee its use in a project I am working on.

    Thanks for your help.

    -Paul

  4. #14
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,022

    Default

    Well, again, I'd need to see the actual error you got when using a SimpleMessageConverter and handleMessage(String s).

    With no message converter you would need the parameter type to be Message like you found. Your test case is very similar to one of the message listener adapter unit tests MessageListenerAdapterTests.testDefaultListenerMet hod(). Which works just fine with a plain text message, a SimpleMessageConverter and a handleMessage(String data) method. So I am intrigued as to why you saw an error with that combination.

    ...I must somehow inform Spring AMQP that I don't need the services of a MessageConverter. Is this correct?
    You always need a MessageConverter if your listener takes anything other than a Message as its parameter. If you want to get the whole message, you can simply implement MessageListener and do away with the adapter altogether. Even if you use an adapter, if the listener implements MessageListener (or ChannelAwareMessageListener), it is invoked without conversion.

    HTH
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  5. #15

    Default

    Dave and Gary,

    By way of follow-up....

    It was relatively easy to get the "raw" message approach working. Per Gary's instruction, I set the container's MessageListener to an instance of a class (ConsumerListener) that implements MessageListener, and did away with the adapter:

    Code:
    		
    ConsumerSimpleMessageListenerContainer container = new ConsumerSimpleMessageListenerContainer();
    container.setConnectionFactory(getConnectionFactory());   
    container.setQueueNames(queueName);                                      		
    container.setMessageListener(new ConsumerListener());
    The ConsumerListener does simply this:
    Code:
    public void onMessage(Message msg)
    	{
    		byte[] b = msg.getBody();
    		System.out.println("RECEIVED MSG=" + new String(b));
    	}
    So, via this approach I am able to handle (should the need arise) "raw" messages. And using an adapter and message converter, I can handle messages in a not-so-raw (cooked?) manner.

    I have one question about the order of component (consumer, producer) startup and the notion of a "durable binding" that Dave raised. Specifically, am I right that a "durable binding" comes to be when one binds a durable queue to a durable exchange; put otherwise, that there is no Binding constructor or setter that marks a binding "durable?"

    Thank you both for your help. This has been an enjoyable and instructive forum experience.

    Cordially,

    Paul

  6. #16
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,022

    Default

    Correct; once a durable queue is bound to an exchange, it remains that way until the binding is removed.

    Just to close out the discussion - it is generally advisable to only expose your code to the underlying infrastructure if you really need to. This makes your code

    a. portable
    b. more easily testable

    So, generally, we would advise using the adapter and making your code a simple POJO. Even if you need stuff from the message properties; it is often better to do this in a custom MessageConverter. The method doesn't even have to be 'handleMessage' (that's just the default)...

    Code:
    public void receiveOrder(Order order);
    That way, to test your code, you only need to make an Order object (you don't have synthesize, or Mock, a Message object), and your business code has no direct dependencies on the Message object.

    Further, you could then take that same object, and drop it into, say, a JMS-based system, or use it as a POJO in a spring integration flow, with no changes needed.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  7. #17

    Default

    Understood, Gary.

    Thanks again.

    -Paul

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •