View Full Version : Weird behavior of messageListener();
johanneshiemer
Jan 9th, 2012, 03:01 AM
Hi,
I have question regarding a weird behavior of the messageListener();. As the reference manual refers, the messageListener is asynchronous. I defined to different queues and routing keys, which properties look as follows:
amqp.schedule.routingKeyName=service.schedule
amqp.schedule.routingKey=service.schedule
amqp.schedule.queue=service.schedule
amqp.schedule.prefetchCount=10
amqp.compose.routingKeyName=service.compose
amqp.compose.routingKey=service.compose
amqp.compose.queue=service.compose
amqp.compose.prefetchCount=2
For each of them I defined a separate configuration looking like this:
@Value("${amqp.compose.routingKeyName}")
private String ROUTING_KEY_NAME;
@Value("${amqp.compose.routingKey}")
private String ROUTING_KEY;
@Value("${amqp.compose.queue}")
private String QUEUE;
@Value("${amqp.compose.prefetchCount}")
private int prefetchCount;
@Override
public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
rabbitTemplate.setRoutingKey(ROUTING_KEY_NAME);
}
@Bean
public SimpleMessageListenerContainer compositionListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory() );
container.setQueues(compositionQueue());
container.setMessageListener(compositionMessageLis tenerAdapter());
container.setAcknowledgeMode(AcknowledgeMode.MANUA L);
container.setPrefetchCount(prefetchCount);
container.setErrorHandler(errorHandler());
return container;
}
@Bean
public MessageListenerAdapter compositionMessageListenerAdapter() {
return new MessageListenerAdapter(amqpListener, jsonMessageConverter());
}
The problem is, although the compositionMessageListenerAdapter refers to amqpListener, which implements ChannelAwareMessageListener and should be asynchronous, it does not care about the prefetchCounts in the properties file.
Perhaps I have a lack of understanding, but what I was expecting from the prefetchCount was that for example, if the prefetchCount was set to 10, then onMessage(..) would be executed ten times..is this not the case?
Any help appreciated!
Mark Fisher
Jan 9th, 2012, 06:30 AM
In the excerpt you posted, the 'amqp.compose.prefetchCount' property has a value of 2, not 10.
johanneshiemer
Jan 9th, 2012, 06:32 AM
Hi Mark,
yes. And I think I found the solution to my problem. The thing was I mixed up concurrentConsumers with prefetchCount. What I initially tried to achieve was having 2 composer consumers and 10 schedule consumers.
Thanks for your help. :-)
johanneshiemer
Jan 9th, 2012, 07:48 AM
Hi Mark,
coming back, I have some issues with my two consumers. Essentially both consumers are receiving the same message. Taking a look here: http://www.rabbitmq.com/tutorials/tutorial-two-java.html this should not happen by default. In my version the output looks like this:
Sample (with 2 consumers)
5972ccd9-2cbe-42d0-9545-ee1b2fb0be34
executing COMPOSE: TIMEOUT 10 seconds
5972ccd9-2cbe-42d0-9545-ee1b2fb0be34
executing COMPOSE: TIMEOUT 10 seconds
73e0ae06-23ad-456b-ab75-8acc05eff378
executing COMPOSE: TIMEOUT 10 seconds
73e0ae06-23ad-456b-ab75-8acc05eff378
executing COMPOSE: TIMEOUT 10 seconds
48efb80e-c55b-4bde-b8f7-4c052667c111
executing COMPOSE: TIMEOUT 10 seconds
45cceb20-2286-43b4-ad1a-d37723fe1354
executing COMPOSE: TIMEOUT 10 seconds
In the third iteration you can see the messages are not the same for the first time.
Any idea why this happens?
Configuration of SimpleMessageListenerContainer looks like this:
prefetchCount = 2;
concurrentConsumers = 2;
@Bean
public SimpleMessageListenerContainer compositionListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory() );
container.setQueues(compositionQueue());
container.setMessageListener(compositionMessageLis tenerAdapter());
container.setAcknowledgeMode(AcknowledgeMode.AUTO) ;
container.setPrefetchCount(prefetchCount);
container.setErrorHandler(errorHandler());
container.setConcurrentConsumers(concurrentConsume rs);
return container;
}
johanneshiemer
Jan 9th, 2012, 12:21 PM
Hi,
I kept on working on the stuff. I figured out some quite interesting things. First of all the behavior is completely right if I remove the following piece of code:
restClient.prepare(host, userName, password, useSSL);
restClient.connect();
configurationType = configuration.get(job.getParentId());
from my onMessage() method there are no duplicate messages across the consumer (separation through the UUID):
e83147e2-e641-4346-9165-efee015f73b3 executing COMPOSE Mon Jan 09 19:19:29 CET 2012
124e8c95-71b9-4adb-ae20-ba27a9dece8c executing COMPOSE Mon Jan 09 19:19:29 CET 2012
46810668-57ee-4d3e-babf-b68ca5c1d2c0 executing COMPOSE Mon Jan 09 19:19:29 CET 2012
f8b6d0e1-d0b7-4bf6-99df-d90454fc6c06 executing COMPOSE Mon Jan 09 19:19:39 CET 2012
0dc14720-e9cf-447f-b9f3-836a0b6d3adb executing COMPOSE Mon Jan 09 19:19:39 CET 2012
4ce08229-e408-4f6f-8dac-e3a5078e8631 executing COMPOSE Mon Jan 09 19:19:39 CET 2012
Adding the code again, create the duplicates:
834f06dc-7028-475d-a482-e9ac1631123a executing COMPOSE Mon Jan 09 19:17:15 CET 2012
834f06dc-7028-475d-a482-e9ac1631123a executing COMPOSE Mon Jan 09 19:17:15 CET 2012
834f06dc-7028-475d-a482-e9ac1631123a executing COMPOSE Mon Jan 09 19:17:15 CET 2012
34e0cd91-d652-4db6-b1f2-e6279888658d executing COMPOSE Mon Jan 09 19:17:27 CET 2012
34e0cd91-d652-4db6-b1f2-e6279888658d executing COMPOSE Mon Jan 09 19:17:27 CET 2012
34e0cd91-d652-4db6-b1f2-e6279888658d executing COMPOSE Mon Jan 09 19:17:27 CET 2012
Being honest, I do not know why. The restClient is attached to the implementation of ChannelAwareListener like this:
@Autowired
private IRestClient restClient;
Please help me, this is driving me nuts..!
johanneshiemer
Jan 9th, 2012, 01:18 PM
Hi,
me again. I am beginning to think that this is a bug. It is quite easy to reproduce the issue again and again.
Could anyone of the team please take a look at it?
Dave Syer
Jan 10th, 2012, 02:27 AM
It sounds like maybe your message listener is probably failing and causing the messages to rollback. That wouldn't be a bug. But there isn't enough information about your client to really see what is happening. Can you post the implementation of your amqpListener? Look at the logs from your consumer to see if it is failing?
johanneshiemer
Jan 10th, 2012, 02:41 AM
Hi Dave,
thanks a lot for your response. First of all here is my implementation of the amqpListener:
@Component
public class ComposeListener implements IComposeListener {
private Exception serviceException;
private Job job;
@Autowired(required = true)
private IAMQPListenerHelper listenerHelper;
@Autowired(required = true)
private volatile RabbitTemplate rabbitTemplate;
@Value("${amqp.status.routingKeyName}")
protected String STATUS_RK;
@Value("${amqp.baseExchange}")
protected String EXCHANGE_NAME;
@Autowired
private IRestClient restClient;
@Value("${host.address}")
private String host;
@Value("${host.username}")
private String userName;
@Value("${host.password}")
private String password;
@Value("${host.useSSL}")
private boolean useSSL;
/**
* @return the serviceException
*/
public Exception getServiceException() {
return serviceException;
}
/**
* @param serviceException the serviceException to set
*/
public void setServiceException(Exception serviceException) {
this.serviceException = serviceException;
}
@Override
public void onMessage(Message message, Channel channel) throws Exception {
/**
* TODO Check JOB for its validity
*/
try {
job = listenerHelper.convertMessage(message);
if (!job.getCommand().equals(Type.ACKNOWLEDGE.toStrin g())) {
restClient.prepare(host, userName, password, useSSL);
restClient.connect();
}
} catch (Exception ex) {
setServiceException(ex);
}
if (job.getCommand().equals(Type.COMPOSE.toString())) {
System.out.println(job.getId() + " executing COMPOSE " + new Date().toString());
// try {
// compose.execute(platformType, configurationType);
// } catch (Exception ex) {
// setServiceException(ex);
// }
Random rand = new Random();
//Thread.sleep(rand.nextInt(10000));
Thread.sleep(10000);
}
}
I attached the output to log.txt as it otherwise would disrupt the whole thread.
Thanks a lot for your help!
UPDATE:
Another bit of information. It seems to be, that the behavior is not related to the restClient. If I replace the restClient with Thread.Sleep(10000); then the same output with duplicate message occurs as well:
af646cfc-c879-4757-a6dc-ddf6f18230a8 executing COMPOSE Tue Jan 10 10:20:02 CET 2012
af646cfc-c879-4757-a6dc-ddf6f18230a8 executing COMPOSE Tue Jan 10 10:20:02 CET 2012
af646cfc-c879-4757-a6dc-ddf6f18230a8 executing COMPOSE Tue Jan 10 10:20:02 CET 2012
johanneshiemer
Jan 10th, 2012, 06:13 AM
Hi,
as I got not reply yet from Dave, I kept on digging. An interesting thing I found is, that each Consumer gets a separate message, but when converting the message from JSON to object the error occurs:
{"id":"6e9b12ab-a405-41cd-85eb-22dfb507f7da","command":"COMPOSE","parentId":"3c9ace8c-3fdf-4e11-bf1c-00dc02dcc364","parentEntity":"CONFIGURATION","startDate":null,"endDate":null,"status":0,"statusMessage":null,"user":null,"parent":null}
{"id":"13092f44-8947-4e2a-8725-6780ae360f0f","command":"COMPOSE","parentId":"3c9ace8c-3fdf-4e11-bf1c-00dc02dcc364","parentEntity":"CONFIGURATION","startDate":null,"endDate":null,"status":0,"statusMessage":null,"user":null,"parent":null}
{"id":"f269d809-0dda-4219-9827-b80538bfff39","command":"COMPOSE","parentId":"3c9ace8c-3fdf-4e11-bf1c-00dc02dcc364","parentEntity":"CONFIGURATION","startDate":null,"endDate":null,"status":0,"statusMessage":null,"user":null,"parent":null}
6e9b12ab-a405-41cd-85eb-22dfb507f7da executing COMPOSE Tue Jan 10 13:10:53 CET 2012
6e9b12ab-a405-41cd-85eb-22dfb507f7da executing COMPOSE Tue Jan 10 13:10:53 CET 2012
6e9b12ab-a405-41cd-85eb-22dfb507f7da executing COMPOSE Tue Jan 10 13:10:53 CET 2012
As you can see, the first message was taken...
The stub to test this looks like this:
@Override
public void onMessage(Message message, Channel channel) throws Exception {
/**
* TODO Check JOB for its validity
*/
try {
String value = new String(message.getBody());
System.out.println(value);
JsonMessageConverter jasonMsgConverter = new JsonMessageConverter();
job = (Job)jasonMsgConverter.fromMessage(message);
Dave/Mark, am I doing something wrong here?
Mark Fisher
Jan 10th, 2012, 06:50 AM
Have you tried just these two lines of code in isolation passing in a Message instance with the same content as in the app?:
JsonMessageConverter jasonMsgConverter = new JsonMessageConverter();
job = (Job)jasonMsgConverter.fromMessage(message);
johanneshiemer
Jan 10th, 2012, 06:52 AM
Hi Mark,
yes and an interesting fact is, that if you take a look at my ComposeListener above you can see the second var: private Job job;. If I put the variable into the onMessage(Message message...) method, then the messages are correct. So it seems that the issue is related to the Job var. But I don't have any idea why...do you have a clue?
Below the output mit local var in the method:
{"id":"af646cfc-c879-4757-a6dc-ddf6f18230a8","command":"COMPOSE","parentId":"3c9ace8c-3fdf-4e11-bf1c-00dc02dcc364","parentEntity":"CONFIGURATION","startDate":null,"endDate":null,"status":0,"statusMessage":null,"user":null,"parent":null}
{"id":"bd8cc583-acbf-4075-8bdc-1fd94d2a50a3","command":"COMPOSE","parentId":"3c9ace8c-3fdf-4e11-bf1c-00dc02dcc364","parentEntity":"CONFIGURATION","startDate":null,"endDate":null,"status":0,"statusMessage":null,"user":null,"parent":null}
{"id":"13092f44-8947-4e2a-8725-6780ae360f0f","command":"COMPOSE","parentId":"3c9ace8c-3fdf-4e11-bf1c-00dc02dcc364","parentEntity":"CONFIGURATION","startDate":null,"endDate":null,"status":0,"statusMessage":null,"user":null,"parent":null}
af646cfc-c879-4757-a6dc-ddf6f18230a8 executing COMPOSE Tue Jan 10 13:53:38 CET 2012
13092f44-8947-4e2a-8725-6780ae360f0f executing COMPOSE Tue Jan 10 13:53:38 CET 2012
bd8cc583-acbf-4075-8bdc-1fd94d2a50a3 executing COMPOSE Tue Jan 10 13:53:38 CET 2012
Mark Fisher
Jan 10th, 2012, 06:59 AM
Yes, that's exactly what the problem is; since you are using an instance var there to store state passed in with each message, the code is not thread safe.
Dave Syer
Jan 10th, 2012, 07:49 AM
ClassCastException? The logs from the client should tell you (I think it might be DEBUG level, not sure).
Powered by vBulletin® Version 4.2.1 Copyright © 2013 vBulletin Solutions, Inc. All rights reserved.