I'm dealing with a situation where messages in a given context must be consumed in order, but I have many contexts. The workload is such that it must be distributed across many hosts. I had thought to implement one JMS queue for each context and have a single listener for each context on each host in the cluster. I figured a message for a given queue would be delivered to a single host, which wouldn't acknowledge the message until it had finished processing it, which would prevent any other messages from that queue from being delivered. I'm testing with openMQ purely because it was already on my laptop but will eventually be running against sonic.
I've implemented listeners in every possible way and run int problems with every one of them.
At least with openMQ, if x consumers show up on a single queue, then the first x messages will be immediately delivered. If any are rolled back, then they will be re-delivered. This allows messages to be consumed out of order. If I configure the server-side to only allow a single consumer per-queue, then the first host that starts up beings the consumer on every queue and all subsequent hosts just wind up blocked until the first host dies, then the second host takes over. So either my messages get consumed out of order or my workload doesn't get distributed across the cluster at all (I realize that a single queue would be serialized if my method worked, but with many queues, I should still have some work being performed on all hosts if I get random or round robin assignment to consumers).
This proved to be the case whether I use spring-integration via message-driven-channel-adapter sent to a service-activator or if I just use spring-jms and DefaultMessageListenerContainer and MessageListener.
I then thought that my problem is using message-driven consumers where I don't have any control over how the jms client connections are utilized. So I then switched to using inbound-channel-adapter with a poller configured with very short receive-timeout and short delay-interval. I thought that would cause no single consumer to remain attached to the queue as a consumer for very long, allowing consumers from other hosts an opportunity to switch in and do some of the work. Instead, I ran into a completely different problem:
No messages are ever acknowledged, regardless of whether an exception is being thrown by the service activator method. With messages never being acknowledged, I never get past the first message on the queue, but it doesn't look as though any context witching between the various consumers is happening, even though my poller is frequently timing out on the receive. This problem feels like a bug in spring-integration - messages should be getting acknowledged, but they are not. I have verified that I am not throwing an exception that I am unaware of by placing a try/catch around my entire service activator method.
And the ServiceActivator bean looks like this:Code:<jmsi:inbound-channel-adapter id="inPoll1" destination="jmsWorker1" channel="worker1Input" acknowledge="transacted"> <int:poller max-messages-per-poll="1" receive-timeout="1" fixed-delay="50"/> </jmsi:inbound-channel-adapter> <jmsi:inbound-channel-adapter id="inPoll2" destination="jmsWorker2" channel="worker2Input" acknowledge="transacted"> <int:poller max-messages-per-poll="1" receive-timeout="1" fixed-delay="50"/> </jmsi:inbound-channel-adapter> <jmsi:inbound-channel-adapter id="inPoll3" destination="jmsWorker3" channel="worker3Input" acknowledge="transacted"> <int:poller max-messages-per-poll="1" receive-timeout="1" fixed-delay="50"/> </jmsi:inbound-channel-adapter> <int:channel id="worker1Input"/> <int:channel id="worker2Input"/> <int:channel id="worker3Input"/> <!-- find workerBean via annotation --> <context:component-scan base-package="my.package.name"/> <!-- assign workerBean to be listener on input channels --> <int:service-activator input-channel="worker1Input" ref="workerBean"/> <int:service-activator input-channel="worker2Input" ref="workerBean"/> <int:service-activator input-channel="worker3Input" ref="workerBean"/>
Incidentally, if I change my @ServiceActivator method to take a Message as a parameter instead of the payload string, then I never receive messages at all. This prevents me from being able to use the message.acknowledge() method to try to acknowledge them manuallyCode:@MessageEndpoint public class WorkerBean { @ServiceActivator public void upperCase(String text) { try { System.out.println("JMS Work: " + text); Thread.sleep(3000); } catch (Exception ex) { throw new RuntimeException(ex); } /* roll back at random */ if (Math.random() * 4 < 1) { System.out.println("rolling back"); throw new RuntimeException("roll back"); } System.out.println("done"); } }
Can anyone out there comment on solving my core problem - consuming messages in order dspite having multiple listeners on each queue? If not, can anyone comment on using acknowledge="transacted" with service-activator? If I remote that attribute entirely, messages do get acknowledged, though they get acknowledged whether I throw an exception or not, which defeats my purpose. Adding the attribute forces no messages to acknowledge, even when no exception is thrown. That's got to be a bug, I think.
I'm using Spring-Integration 2.2.0.RELEASE


Reply With Quote