PDA

View Full Version : Any Advice on a Synchronous JMS Design Approach



enortham
Jan 11th, 2006, 11:06 PM
I have a requirement where I need to take several JMS messages and combine them into one merged xml message that is sent off to another app server. I was thinking of writing a class that starts a thread on the init-method and stops it on the destory-method. When the thread is running it will call receive on the jmsTemplate until it batches enough messages or reaches a certain timeout. I also will make sure everything happens in a transaction so that should the server be shut down the messages will be resent.

Is this a good aproach? Are there any other built in mechanisms for handling synchronous messaging? Any feedback is appreciated.

Eric

jstrachan
Jan 12th, 2006, 06:38 PM
Firstly a few gotchas with JmsTemplate...
http://www.activemq.org/JmsTemplate+Gotchas

Are the multiple messages you wish to receive all from the same JMS provider - if so you don't need to use XA transactions, you can just use regular JMS transactions.

Secondly are all the messages from the same consumer or are they from multiple consumers?

Either way it may well be simpler to write this yourself using straight forward JMS using a transacted session...

Connection conn = factory.createConnection();
Session session = conn.createSession(true, 0);

// create the producer for the result
MessageProducer producer = session.createProducer(someDestination);

// create however many consumers you need
MessageConsumer consumer1 = session.createConsumer(destination, selector);
...

// the loop
void doStuff() {
try {
Message m1 = consumer1.receive();
...
Message mN = consumerN.receive();

Message response = ...; // combine them here
producer.send(response);
session.commit();
}
catch (Exception e) {
session.rollback();
}
}

enortham
Jan 12th, 2006, 07:01 PM
Thanks. I took into account the gotchas and went ahead and started implementing it. I was mainly hoping to verify that I wasn't going to go off and implement something when there is already something available. I'm not a big fan of having to create and manage a thread but it doesn't seem like there is another prebuilt option.

As for the implementation I tried to use the sample code and xml snippets for spring to implement the solution and am now able to send and receive messages but for some reason I can't cause Spring to rollback the transaction. Here's a snippet of my application xml.



<bean id="eventListener" parent="jmsAbstractServiceProxy">
<property name="transactionAttributes">
<props>
<prop key="process*">PROPAGATION_REQUIRED</prop>
</props>
</property>
<property name="target">
<bean class="EventListener" init-method="init" destroy-method="destroy">
<property name="jmsTemplate">
<ref bean="jmsReceiveTemplate"/>
</property>
</bean>
</property>
</bean>

<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionM anager">
<property name="connectionFactory" ref="jmsFactory" />
</bean>

<bean id="jmsAbstractServiceProxy" class="org.springframework.transaction.interceptor.Transa ctionProxyFactoryBean"
abstract="true">
<property name="transactionManager">
<ref bean="jmsTransactionManager"/>
</property>
</bean>

<bean id="jmsFactory" class="org.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory">
<bean class="org.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://localhost:61616</value>
</property>
</bean>
</property>
</bean>
<bean id="jmsReceiveTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<ref local="jmsFactory"/>
</property>
<property name="sessionTransacted">
<value>true</value>
</property>
</bean>




And the java code. Notice where I threw the IllegalArgumentException just to test the rollback. I assume that I should see the log messages over and over again as the message is resent.




import javax.jms.JMSException;
import javax.jms.TextMessage;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.jms.core.JmsTemplate;

public class EventListener implements Runnable
{
/**
* The number of milliseconds to wait before logging an infor message on shutdown
*/
private static final long STOP_WAIT_TIMEOUT = 1000;

/**
* The default number of milliseconds to wait before timing out on a receive message
*/
private static final long DEFAULT_RECEIVE_TIMEOUT = 1000;

/**
* Template used to send messages
*/
private JmsTemplate jmsTemplate;

/**
* True if the listener should be running running
*/
private boolean running;

/**
* True if the listener thread has not stopped
*/
private boolean stopped;

/**
* The maximum time to wait receiving all the messages
*/
private long maximumBatchWaitTime = 5000L;

/**
* The batch size
*/
private int batchSize = 5;

/**
* The amount of time to wait before reattempting to send messages
*/
private long postJmsErrorWaitTimeout = 5000L;

/**
* Log used for logging
*/
private Log log = LogFactory.getLog(getClass());


/**
* Sets the JmsTemplate used to send messages
*
* @param jmsTemplate the JmsTemplate used to send messages
*/
public void setJmsTemplate(JmsTemplate jmsTemplate)
{
this.jmsTemplate = jmsTemplate;
jmsTemplate.setReceiveTimeout(DEFAULT_RECEIVE_TIME OUT);
}

/**
* Starts the listener thread
*/
public void init()
{
if (log.isDebugEnabled())
{
log.debug("Preparing to startup EventListener thread");
}
running = true;

new Thread(this).start();
}

/**
* Stops the listener thread
*/
public void destroy()
{
if (log.isDebugEnabled())
{
log.debug("Preparing to shutdown EventListener thread");
}
synchronized (this)
{
running = false;
while (!stopped)
{
if (log.isDebugEnabled())
{
log.debug("Waiting for EventListener to shutdown");
}
try
{
wait(STOP_WAIT_TIMEOUT);
}
catch (InterruptedException e)
{
log.warn("Thread interrupted on shutdown", e);
}
}
}
if (log.isDebugEnabled())
{
log.debug("EventListener thread stopped");
}

}

/**
* Processes listener messages asynchronously
*/
public void run()
{
if (log.isDebugEnabled())
{
log.debug("EventListener thread started");
}

while (running)
{
try
{
processMessages();
}
catch (JMSException e)
{
log.warn("JMS error processing messages; waiting [" + postJmsErrorWaitTimeout
+ "] before reattempting", e);
try
{
Thread.sleep(postJmsErrorWaitTimeout);
}
catch (InterruptedException e2)
{
log.warn("EventListener interrupted", e2);
}
}
catch (Throwable t)
{
log.error("Error in EventListener thread", t);
}
}

synchronized (this)
{
stopped = true;
notify();
}
}

/**
* Processes a batch of messages
* @throws JMSException if there is a JMS related error
*/
public void processMessages() throws JMSException
{
long startTime = System.currentTimeMillis();
int messageCount = 0;

while (running && (startTime > (System.currentTimeMillis() - maximumBatchWaitTime))
&& (messageCount < batchSize))
{
TextMessage message = (TextMessage) jmsTemplate.receive("test");
if (message != null)
{
messageCount++;
if (log.isDebugEnabled())
{
log.debug("Received Message [" + message.getText() + "]");
}
throw new IllegalArgumentException("Just a test");
}

}
}
}



Thanks for the help.


Eric

jstrachan
Jan 13th, 2006, 08:33 AM
BTW I don't get why you need to run this in a separate thread?

BTW2 its gonna be more efficient if you do multiple receive() calls from the same session; AFAIK each receive() call will create a new session/consumer (though depending on your JMS provider/J2EE container it might be clever enough to figure out you really should be using the same session if you are inside a transaction)

enortham
Jan 13th, 2006, 08:50 AM
The reason I believe it needs to be in a seperate thread is because when the application context for spring starts up it will hang on the receive and not start the application fully up since it never willl exit the while loop.

I assumed based on the JMS Gotcha link that the pollableConnectionFactory would avoid the new session/consumer being created on each receive call. One thing I just noticed is that in the documentation it states



A JMS provider which pools Connection, Session and MessageProducer instances so it can be used with tools like Spring's JmsTemplate. NOTE this implementation is only intended for use when sending messages.


If that's the case should I be using the session directly and not use the JmsTemplate?

jstrachan
Jan 13th, 2006, 09:27 PM
Yes, I recommend just using a JMS session directly for this use case - its simple & you'd have full control over the transaction & construction of session/consumer etc.

I totally understand your issue on the use of the thread now :)

One other option; create a Session and consumer, then register a MessageListener; then the JMS provider will invoke you asynchronously & you can then do the session.commit() / rollback() yourself inside the MessageListener when you've had enough messages etc