Hello,
I have a strange problem using spring/JMS with ActiveMQ. The onMessage function delivers a wrong message object content, like a mixup with the previous delivered message.
My example is quite simple: just a ping-pong like program with multiple ball. Each ball has an id and the count of exchange. When listener pool receive a message (a ball), it does 3 steps:
- check, for this ball-id, if the count of exchange is what we are waiting for
- increment this count by 2, save it locally
- send a new ball message with this id and the exchange count +1
Here is treatment part of the process done by the listener:
All the beans are in the same file. Here the bean used by the listener:Code:public int treatMessage (final SimpleMessage simpleMsg) { int out = 0; int t = simpleMsg.getCounter (); int numM = simpleMsg.getNumMsg (); String str; try { // check if the number received is what we are expected if (t != JMSListener.nbMsg [this.listener_id] [numM]) { // bad number out = -2; str = "ER for id:" + this.listener_id + "/numMsg:" + numM + " JMS:" + simpleMsg.getRealMsg ().getJMSMessageID () + " receive:" + t + " / waited:" + JMSListener.nbMsg [this.listener_id] [numM] + " / next:"; if (t < JMSListener.nbMsg [this.listener_id] [numM]) { // seems we have already done this one JMSListener.logger.error (str + "nop ... replay"); out = -1; } else { // strange, but we sync our number counter and will continue ping pong JMSListener.nbMsg [this.listener_id] [numM] = t + 2; JMSListener.logger.error (str + JMSListener.nbMsg [this.listener_id] [numM]); } } else { // all ok, will continue ping pong JMSListener.nbMsg [this.listener_id] [numM] = t + 2; JMSListener.logger.trace ("OK for id:" + this.listener_id + "/numMsg:" + numM + " JMS:" + simpleMsg.getRealMsg ().getJMSMessageID () + " receive:" + t + " / next:" + JMSListener.nbMsg [this.listener_id] [numM]); } // fake work long jCnt = (long) (1000.0 * Math.random ()); for (long j = 0; j < jCnt; ++j) { Math.random (); } if (out != -1) { // don't send ping pong if we have detected replay this.sender.send (new SimpleMessage (numM, t + 1)); } } catch (Throwable e) { JMSListener.logger.error ("ER for id:" + this.listener_id + "/numMsg:" + numM + " get exception: ", e); out = -3; } return out; }
And here is the function use to do the test:Code:<bean id="listenerContainerA" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="concurrentConsumers" value="50" /> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueA" /> <property name="messageListener" ref="messageListenerA" /> <property name="sessionTransacted" value="true" /> <property name="sessionAcknowledgeModeName" value="AUTO_ACKNOWLEDGE" /> </bean> <bean id="listenerContainerB" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="concurrentConsumers" value="50" /> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueB" /> <property name="messageListener" ref="messageListenerB" /> <property name="sessionTransacted" value="true" /> <property name="sessionAcknowledgeModeName" value="AUTO_ACKNOWLEDGE" /> </bean> <bean id="queueSenderA" class="org.spring_zip.JMSSender" autowire="byName" lazy-init="true"> <property name="jmsTemplate" ref="jmsTemplate" /> <property name="toQueue" ref="queueB" /> <property name="replyQueue" ref="queueA" /> </bean> <bean id="queueSenderB" class="org.spring_zip.JMSSender" autowire="byName" lazy-init="true"> <property name="jmsTemplate" ref="jmsTemplate" /> <property name="toQueue" ref="queueA" /> <property name="replyQueue" ref="queueB" /> </bean> <bean id="messageListenerA" class="org.spring_zip.JMSListener" autowire="byName"> <property name="id" value="0" /> <property name="sender" ref="queueSenderA" /> </bean> <bean id="messageListenerB" class="org.spring_zip.JMSListener" autowire="byName"> <property name="id" value="1" /> <property name="sender" ref="queueSenderB" /> </bean>
It works good during a while then fails because the incoming message content (the ball) is the same than the previous message (the real content of this message seem to be lost). If I look for the incoming JMS message ID in my logs to know when it was sent and what was its content, I see that the sent message content was good but the pulled version of this message it is a fake one.Code:try { // set default counter value for (int i = 0; i < JMSListener.NB_ID; ++i) { JMSListener.nbMsg [this.messageListenerA.getId ()] [i] = 2; JMSListener.nbMsg [this.messageListenerB.getId ()] [i] = 1; } // send init message for (int i = 0; i < JMSListener.NB_ID; ++i) { this.queueSenderA.send (new SimpleMessage (i, 1)); } // start the listeners this.messageListenerA.start (); this.messageListenerB.start (); // wait until first replay OR 100 iteration int step = 0; while (this.messageListenerA.getNbReplay () == 0 && this.messageListenerB.getNbReplay () == 0 && step < 100) { Thread.sleep (1000); ++step; } // stop listener this.messageListenerA.stop (); this.messageListenerB.stop (); // check if number of message exchanged are quite OK for (int i = 0; i < JMSListener.NB_ID; ++i) { int actualA = JMSListener.nbMsg [this.messageListenerA.getId ()] [i]; int actualB = JMSListener.nbMsg [this.messageListenerB.getId ()] [i]; this.logger.debug ("CHECKING " + i + " A:" + actualA); this.logger.debug ("CHECKING " + i + " B:" + actualB); Assert.assertTrue ("BAD EXCHANGE ! message were lost!", Math.abs (actualA - actualB) <= 2); } // display listener statistics this.logger.debug ("Listener A:" + this.messageListenerA); this.logger.debug ("Listener B:" + this.messageListenerB); this.logger.debug ("overall time:" + (System.currentTimeMillis () - this.startDate)); Thread.sleep (5000); Assert.assertTrue ("REPLAY OCCURED !", this.messageListenerA.getNbReplay () == 0 && this.messageListenerB.getNbReplay () == 0); } catch (Throwable e) { this.logger.error ("Test get this exception: ", e); Assert.fail (); }
We tried with the same test without Spring and was OK. We done this test with one Spring pool per JVM (2 prog launched separatly) and it failed too. I attach a zip file with the most simple version of this test (as a Maven project).
Do you have any ideas ?
Thanks !


Reply With Quote