-
Jan 20th, 2008, 12:38 PM
#1
Spring 2.5.1, JMS 1.1 and distributed jobs implementation
Hi,
Imagine a standard JMS model with the following components.
-- 1 Producer (Spring based)
-- 1 Broker (ActiveMQ) hosting 1 queue
-- 5 Consumers (Spring based) listening to that one queue
Now an example usage scenario is that the producer sends 1000 messages on the queue which are consumed concurrently in a random order by the five consumers listening on that queue. So far for these requirements everything makes sense and works fine.
However now imagine that you have to execute certain actions prior to production starting and certain actions post to the consumers having completed the consumption of all 1000 messages. My question relates to how these event points can be exposed and how pre-production and post-consumption hooks can be attached to those events.
In other words the concept of a single job persists from before production begins to after all consumption is complete for the messages that were produced in a single batch. Order of message processing is not important. However I do need to know when consumption of all messages in a job is complete.
So the question is how do I know when a total of 1000 messages have been processed by one or more consumers and thereafter execute one or more post completion hooks? One example of a post completion task would be to mark that batch as complete in the database.
Obviously one very simple solution is to use a job table with each row representing a distinct job and mark progress from both producer and consumers. However due to randomised order of processing of the messages there is no way of knowing where the last message from a batch will be processed.
Is there a better solution using perhaps JmsTemplate features such as acknowledgements or message groups or dynamic destinations or Lingo async callbacks or some other way? I'd rather not reinvent the wheel if I can reuse. How do you tackle this kind of problem? Kindly elaborate on an example usage of your proposed solution.
Much appreciate your help.
Thanks.
Last edited by Narada; Jan 20th, 2008 at 12:49 PM.
-
Jan 22nd, 2008, 07:05 PM
#2
Using the JMS Message Group ID you could create a set of message group as follows:
Format is JobID - Consumer #
JMSXGroupID: 1234-1
JMSXGroupID: 1234-2
JMSXGroupID: 1234-3
JMSXGroupID: 1234-4
JMSXGroupID: 1234-5
Then alternate the sending of every message to each group, e.g.
Msg1 goes to JMSXGroupID: 1234-1, Msg 2 goes to JMSXGroupID: 1234-2., etc up to Consumer #5, then repeat
That means that each message in a group will be processed in order by the consumer of the group. But order is not guaranteed across all consumers, so you still need to correlate the groups. So the producer should add a couple of properties to tell the consumers the msg seq #, the total message count and the consumer group size.
So you would have:
Msg [996/1000, JMSXGroupID: 1234-1, ConsumerGroupSize: 5]
Msg [997/1000, JMSXGroupID: 1234-2, ConsumerGroupSize: 5]
Msg [998/1000, JMSXGroupID: 1234-3, ConsumerGroupSize: 5]
Msg [999/1000, JMSXGroupID: 1234-4, ConsumerGroupSize: 5]
Msg [1000/1000, JMSXGroupID: 1234-5, ConsumerGroupSize: 5]
Each consumer would inspect the message sequence number, the total message count and the Consumer Group Size. Then using those figures, each consumer would know if it had processed the last of the group of messages sent to it.
if (Msg Seq # == ( (Total Msgs - ConsumerGroupSize) + new Integer(MsgGroupId.substring(6,1)).intValue() ) ) {
completeGroup(JMSXGroupID);
}
When a consumer realizes it has completed processing of it's group, it would send a notification message to a job status queue where a single consumer would record completion of that group when it received the last one of the group(regardless of what order they are received in) it would either directly perform the post job action or send a msg to a queue to drive the action.
Anyways, something along these lines should work. You can check out the following URLs for more info:
http://activemq.apache.org/message-groups.html
http://www.ibm.com/developerworks/we...02_currie.html
Last edited by jwalsh; Jan 22nd, 2008 at 11:12 PM.
-
Jan 23rd, 2008, 03:54 PM
#3
JWalsh. This is simply fantastic. Thank you so much for your input.
It is really quite clever and powerful and yet so simple. I guess I ruled out message groups because I felt that they might impair performance considerably being limited to a single thread. However have N groups gives you N threads so that is certainly a solution to greater concurrency.
Now it remains to decide on how far to take this idea. The greater the number of groups the greater the concurrency but the greater the number of group completion messages you have to reconcile over a longer period of time.
Related to this is another thing that I am not sure about: how do message groups relate not only to the number of consumers but also to the number of concurrent connections on each consumer. Does a single message group essentially limit processing to a single connection on a single consumer which effectively limits it to one thread? If so, then how granular a runtime configuration is required to utilise my five consumers which each run five concurrent connections?
Thanks again. Superb feedback! :-)
-
Feb 9th, 2009, 10:48 AM
#4
This is simply fantastic...superb feedback..
staffingpower.com
sentersoftech
-
Jun 3rd, 2009, 12:32 PM
#5
Spring DefaultMessageListenerContainer not recognizing JMSXGroupId
Hi All,
I really like the approach mentioned above. However, it looks like the DefaultMessageListenerContainer is ignoring the JMSXGroupId attribute and sending all the messages to the same consumer/listener. Is there any setting I am missing?
Here is how I have configured my listener container:
<bean id="jmsContainer"
class="org.springframework.jms.listener.DefaultMes sageListenerContainer">
<property name="connectionFactory" ref="jmsFactory" />
<property name="destination" ref="delivery_queue" />
<property name="sessionTransacted" value="true" />
<property name="sessionAcknowledgeMode">
<util:constant
static-field="javax.jms.Session.CLIENT_ACKNOWLEDGE" />
</property>
<!-- If broker is down, retry after 10 minutes -->
<property name="recoveryInterval" value="60000" />
<property name="messageListener" ref="messageListener" />
</bean>
I am sending messages to the queue "delivery_queue" via the ActiveMQ admin console and have set the "Message Group" and the "Message Group sequence" attributes as described above.
Thanks,
Posting Permissions
- You may not post new threads
- You may not post replies
- You may not post attachments
- You may not edit your posts
-
Forum Rules