PDA

View Full Version : How to stop a MessageChannel



Thomas Ziem
Jun 13th, 2008, 10:49 AM
Hello,

I'm playing with SI for a while and I like it very much.
But I have one question:

The MessageChannel interface provides the methods receive() and receive(long timeout). Both methods will block until a message is available. When a consumer thread calls the first method or the second with a long timeout how can the thread be interrupted when the MessageBus in which the channel is registered will be stopped?

Thank's,
Thomas

iwein
Jun 13th, 2008, 03:49 PM
/**
* Receive the first available message from this channel. If the channel
* contains no messages, this method will block.
*
* @return the first available message or <code>null</code> if the
* receiving thread is interrupted.
*/

The thread will automatically be interrupted when you stop the MessageBus I would expect. Doesn't this happen?

Thomas Ziem
Jun 14th, 2008, 07:44 AM
... no, I have checked this:


public void testStopMessageChannel() throws Exception {
MessageBus bus = new MessageBus();
final MessageChannel channel = new SimpleChannel();
bus.registerChannel("myChannel", channel);
bus.start();
final CountDownLatch latch = new CountDownLatch(1);
Runnable consumer = new Runnable() {
public void run() {
channel.receive();
latch.countDown();
}
};
Thread consumerThread = new Thread(consumer);
consumerThread.start();
Thread.sleep(500);
bus.stop();
latch.await(1500, TimeUnit.MILLISECONDS);
Assert.assertEquals("message channel not stopped", 0, latch.getCount());
}

- Thomas

mbogoevici
Jun 14th, 2008, 10:42 AM
... no, I have checked this:


public void testStopMessageChannel() throws Exception {
MessageBus bus = new MessageBus();
final MessageChannel channel = new SimpleChannel();
bus.registerChannel("myChannel", channel);
bus.start();
final CountDownLatch latch = new CountDownLatch(1);
Runnable consumer = new Runnable() {
public void run() {
channel.receive();
latch.countDown();
}
};
Thread consumerThread = new Thread(consumer);
consumerThread.start();
Thread.sleep(500);
bus.stop();
latch.await(1500, TimeUnit.MILLISECONDS);
Assert.assertEquals("message channel not stopped", 0, latch.getCount());
}

- Thomas

Thomas,

The threads which are managed by the MessageBus (for dispatching messages) will be interrupted, but if you are consuming messages from external threads, i.e. directly from the channel, the MessageBus doesn't know anything about those threads, therefore it can't interrupt them.

The MessageChannels, by the way, can be used in the absence of a MessageBus, which is something that, I think, explains better, why an external consumer is completely decoupled from the MessageBus.

In this case having a shorter wait time, backed by a retry mechanism, could give you a guarantee that the consumer will not block indefinitely.
In fact, in this case, if you know when the message bus is stopped, you can interrupt the consumers as well, as the whole process is under your control.

Please let me know if this clarifies things,
Marius

mbogoevici
Jun 15th, 2008, 10:02 AM
Thomas,

this might be of some help to you, as it addresses the issue of being notified when the MessageBus stops.

http://jira.springframework.org/browse/INT-256

Marius

Thomas Ziem
Jun 16th, 2008, 03:14 AM
Hi Marius,

thanks for your reply.
I understand that the MessageBus doesn't know about external threads. The solution for my problem will be that I will use a short timeout and then check if the MessageBus was already stopped.
Another solution could be an implementation of an cancelable MessageChannel. Together with a listener approach, the listener could stop all MessageChannels when the MessageBus was stopped. This presumes that you can ask the MessageBus (ChannelRegistry) about all registered channels.

Or is there another solution for that? My consumer thread is an Ajax thread within a web environment. The Ajax client starts a request to the web server where the thread will be blocked until a message is available in the MessageChannel. The MessageBus will be stopped from another thread.
I also could register a MessageHandler for the specific channel, but than I have to queue the message again. Therefore the MessageHandler must implement the Lifecycle interface. The MessageBus than will stop all registered MessageHandler. But I don't like the double queuing of messages.

Regards,
Thomas

mbogoevici
Jun 19th, 2008, 11:30 PM
Hi Thomas,

What currently happens, is that the Message Bus will stop the subscriptions to the channels. The channels themselves cannot be stopped as they're passive elements (to understand better, the model is one of a Blocking Queue). What does stopping the subscription effectively mean, is that the messages will not be passed on to endpoints (and from there to reply channels). Now, if you're having external components listening to the channels, they will block until a message is available there.

What you can do in this case, is to simply register a MessageBusListener (newly added) and be notified whenever the message bus has been stopped.
Thus, you can interrupt your client threads from the MessageBusListener itself (which will cause receive() to throw an InterruptedException).

Hope that helps,
Marius

Thomas Ziem
Jun 20th, 2008, 02:08 AM
Hi Marius,

to interrupt the clients, I have to register these clients (threads) to the MessageBusListener before calling receive(). Am I right?

- Thomas