I'm seeing an issue with concurrent consumption of JMS messages. I've written a simple test which demonstrates what is occurring. It seems that threads are NOT being assigned to waiting consumers as they become free, they are being assigned based on the order they were originally used.
Simple example (with concurrentConsumers=3, 5 concurrent requests are send initially):
1st consumer/thread starts and sleeps for 10 seconds, completes.
2nd consumer/thread starts and sleeps for 5 seconds, completes.
3rd consumer/thread starts and sleeps for 3 seconds, completes.
4th REQUEST waits for the 1st consumer/thread to complete before being processed
5th REQUEST waits for the 2nd consumer/thread to complete
I would expect that the 4th Request would be handled when the 3rd request completes (3rd request completes before 1st and 2nd) and that the 5th Request would be handled when the 2nd Request completes.
I'm using ActiveMQ 5.4.1, Spring 3.0.4. Following is a log showing the behavior with 10 consumers (and 13 initially queued messages), followed by my configuration.
This seems pretty basic, am I missing something? This is certainly not the default behavior I would expect.
--- --- ---
08:45:03,864 | INFO | ad-1 | uniworks.core.ThreadTester | ThreadTester: Sending Requests
08:45:03,904 | INFO | er-8 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 15000
08:45:03,919 | INFO | er-7 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 12000
08:45:03,949 | INFO | er-3 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 11000
08:45:03,968 | INFO | er-2 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 10000
08:45:03,990 | INFO | er-4 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 9000
08:45:04,017 | INFO | r-10 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 8000
08:45:04,038 | INFO | er-1 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 7000
08:45:04,056 | INFO | er-5 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 6000
08:45:04,084 | INFO | er-6 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 5000
08:45:04,106 | INFO | er-9 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 4000
08:45:04,160 | INFO | ad-1 | uniworks.core.ThreadTester | ThreadTester: All Requests Sent
08:45:08,107 | INFO | er-9 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 4000
08:45:09,085 | INFO | er-6 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 5000
08:45:10,056 | INFO | er-5 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 6000
08:45:11,038 | INFO | er-1 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 7000
08:45:12,017 | INFO | r-10 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 8000
08:45:12,990 | INFO | er-4 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 9000
08:45:13,968 | INFO | er-2 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 10000
08:45:14,949 | INFO | er-3 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 11000
08:45:14,954 | INFO | er-3 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 1000
08:45:15,919 | INFO | er-7 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 12000
08:45:15,924 | INFO | er-7 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 2000
08:45:15,954 | INFO | er-3 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 1000
08:45:17,924 | INFO | er-7 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 2000
08:45:18,904 | INFO | er-8 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 15000
08:45:18,908 | INFO | er-8 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 3000
08:45:21,909 | INFO | er-8 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 3000
--- --- ---
<jms:listener-container
container-type="default"
connection-factory="pooledConnectionFactory"
acknowledge="auto"
concurrency="10-10">
<jms:listener destination="springThreadTestService" ref="threadTestServiceImpl" method="onMessage" />
</jms:listener-container>
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFacto ry">
<property name="brokerURL" value="${jms.url}"/>
</bean>
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFa ctory">
<property name="maxConnections" value="25"/>
<property name="maximumActive" value="500"/>
<property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>


Reply With Quote