Results 1 to 1 of 1

Thread: onmessage mix message content

  1. #1
    Join Date
    Jan 2008
    Posts
    1

    Unhappy onmessage mix message content

    Hello,
    I have a strange problem using spring/JMS with ActiveMQ. The onMessage function delivers a wrong message object content, like a mixup with the previous delivered message.

    My example is quite simple: just a ping-pong like program with multiple ball. Each ball has an id and the count of exchange. When listener pool receive a message (a ball), it does 3 steps:
    - check, for this ball-id, if the count of exchange is what we are waiting for
    - increment this count by 2, save it locally
    - send a new ball message with this id and the exchange count +1

    Here is treatment part of the process done by the listener:
    Code:
       public int treatMessage (final SimpleMessage simpleMsg)
       {
          int out = 0;
          int t = simpleMsg.getCounter ();
          int numM = simpleMsg.getNumMsg ();
          String str;
    
          try {
             // check if the number received is what we are expected
             if (t != JMSListener.nbMsg [this.listener_id] [numM]) { // bad number
                out = -2;
                str = "ER for id:" + this.listener_id + "/numMsg:" + numM + " JMS:"
                         + simpleMsg.getRealMsg ().getJMSMessageID () + " receive:" + t + " / waited:"
                         + JMSListener.nbMsg [this.listener_id] [numM] + " / next:";
    
                if (t < JMSListener.nbMsg [this.listener_id] [numM]) { // seems we have already done this one
                   JMSListener.logger.error (str + "nop ... replay");
                   out = -1;
                }
                else { // strange, but we sync our number counter and will continue ping pong
                   JMSListener.nbMsg [this.listener_id] [numM] = t + 2;
                   JMSListener.logger.error (str + JMSListener.nbMsg [this.listener_id] [numM]);
                }
             }
             else { // all ok, will continue ping pong
                JMSListener.nbMsg [this.listener_id] [numM] = t + 2;
                JMSListener.logger.trace ("OK for id:" + this.listener_id + "/numMsg:" + numM + " JMS:"
                         + simpleMsg.getRealMsg ().getJMSMessageID () + " receive:" + t + " / next:"
                         + JMSListener.nbMsg [this.listener_id] [numM]);
             }
    
             // fake work
             long jCnt = (long) (1000.0 * Math.random ());
             for (long j = 0; j < jCnt; ++j) {
                Math.random ();
             }
    
             if (out != -1) { // don't send ping pong if we have detected replay
                this.sender.send (new SimpleMessage (numM, t + 1));
             }
          }
          catch (Throwable e) {
             JMSListener.logger.error ("ER for id:" + this.listener_id + "/numMsg:" + numM + " get exception: ", e);
             out = -3;
          }
    
          return out;
       }
    All the beans are in the same file. Here the bean used by the listener:
    Code:
    	<bean id="listenerContainerA" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    		<property name="concurrentConsumers" value="50" />
    		<property name="connectionFactory" ref="connectionFactory" />
    		<property name="destination" ref="queueA" />
    		<property name="messageListener" ref="messageListenerA" />
    		<property name="sessionTransacted" value="true" />
    		<property name="sessionAcknowledgeModeName" value="AUTO_ACKNOWLEDGE" />
    	</bean>
    
    	<bean id="listenerContainerB" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    		<property name="concurrentConsumers" value="50" />
    		<property name="connectionFactory" ref="connectionFactory" />
    		<property name="destination" ref="queueB" />
    		<property name="messageListener" ref="messageListenerB" />
    		<property name="sessionTransacted" value="true" />
    		<property name="sessionAcknowledgeModeName" value="AUTO_ACKNOWLEDGE" />
    	</bean>
    
    	<bean id="queueSenderA" class="org.spring_zip.JMSSender" autowire="byName" lazy-init="true">
    		<property name="jmsTemplate" ref="jmsTemplate" />
    		<property name="toQueue" ref="queueB" />
    		<property name="replyQueue" ref="queueA" />
    	</bean>
    
    	<bean id="queueSenderB" class="org.spring_zip.JMSSender" autowire="byName" lazy-init="true">
    		<property name="jmsTemplate" ref="jmsTemplate" />
    		<property name="toQueue" ref="queueA" />
    		<property name="replyQueue" ref="queueB" />
    	</bean>
    
    	<bean id="messageListenerA" class="org.spring_zip.JMSListener" autowire="byName">
    		<property name="id" value="0" />
    		<property name="sender" ref="queueSenderA" />
    	</bean>
    
    	<bean id="messageListenerB" class="org.spring_zip.JMSListener" autowire="byName">
    		<property name="id" value="1" />
    		<property name="sender" ref="queueSenderB" />
    	</bean>
    And here is the function use to do the test:
    Code:
          try {
             // set default counter value
             for (int i = 0; i < JMSListener.NB_ID; ++i) {
                JMSListener.nbMsg [this.messageListenerA.getId ()] [i] = 2;
                JMSListener.nbMsg [this.messageListenerB.getId ()] [i] = 1;
             }
    
             // send init message
             for (int i = 0; i < JMSListener.NB_ID; ++i) {
                this.queueSenderA.send (new SimpleMessage (i, 1));
             }
    
             // start the listeners
             this.messageListenerA.start ();
             this.messageListenerB.start ();
    
             // wait until first replay OR 100 iteration
             int step = 0;
             while (this.messageListenerA.getNbReplay () == 0 && this.messageListenerB.getNbReplay () == 0 && step < 100) {
                Thread.sleep (1000);
                ++step;
             }
    
             // stop listener
             this.messageListenerA.stop ();
             this.messageListenerB.stop ();
    
             // check if number of message exchanged are quite OK
             for (int i = 0; i < JMSListener.NB_ID; ++i) {
                int actualA = JMSListener.nbMsg [this.messageListenerA.getId ()] [i];
                int actualB = JMSListener.nbMsg [this.messageListenerB.getId ()] [i];
                this.logger.debug ("CHECKING " + i + " A:" + actualA);
                this.logger.debug ("CHECKING " + i + " B:" + actualB);
                Assert.assertTrue ("BAD EXCHANGE ! message were lost!", Math.abs (actualA - actualB) <= 2);
             }
    
             // display listener statistics
             this.logger.debug ("Listener A:" + this.messageListenerA);
             this.logger.debug ("Listener B:" + this.messageListenerB);
             this.logger.debug ("overall time:" + (System.currentTimeMillis () - this.startDate));
             Thread.sleep (5000);
    
             Assert.assertTrue ("REPLAY OCCURED !", this.messageListenerA.getNbReplay () == 0
                      && this.messageListenerB.getNbReplay () == 0);
          }
          catch (Throwable e) {
             this.logger.error ("Test get this exception: ", e);
             Assert.fail ();
          }
    It works good during a while then fails because the incoming message content (the ball) is the same than the previous message (the real content of this message seem to be lost). If I look for the incoming JMS message ID in my logs to know when it was sent and what was its content, I see that the sent message content was good but the pulled version of this message it is a fake one.

    We tried with the same test without Spring and was OK. We done this test with one Spring pool per JVM (2 prog launched separatly) and it failed too. I attach a zip file with the most simple version of this test (as a Maven project).

    Do you have any ideas ?

    Thanks !
    Attached Files Attached Files
    Last edited by bdm; Mar 4th, 2008 at 10:07 AM.

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •