Results 1 to 6 of 6

Thread: JMS listener-container concurrentConsumers Issue

  1. #1
    Join Date
    Apr 2009
    Posts
    7

    Default JMS listener-container concurrentConsumers Issue

    I'm seeing an issue with concurrent consumption of JMS messages. I've written a simple test which demonstrates what is occurring. It seems that threads are NOT being assigned to waiting consumers as they become free, they are being assigned based on the order they were originally used.

    Simple example (with concurrentConsumers=3, 5 concurrent requests are send initially):

    1st consumer/thread starts and sleeps for 10 seconds, completes.
    2nd consumer/thread starts and sleeps for 5 seconds, completes.
    3rd consumer/thread starts and sleeps for 3 seconds, completes.
    4th REQUEST waits for the 1st consumer/thread to complete before being processed
    5th REQUEST waits for the 2nd consumer/thread to complete

    I would expect that the 4th Request would be handled when the 3rd request completes (3rd request completes before 1st and 2nd) and that the 5th Request would be handled when the 2nd Request completes.

    I'm using ActiveMQ 5.4.1, Spring 3.0.4. Following is a log showing the behavior with 10 consumers (and 13 initially queued messages), followed by my configuration.

    This seems pretty basic, am I missing something? This is certainly not the default behavior I would expect.

    --- --- ---

    08:45:03,864 | INFO | ad-1 | uniworks.core.ThreadTester | ThreadTester: Sending Requests
    08:45:03,904 | INFO | er-8 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 15000
    08:45:03,919 | INFO | er-7 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 12000
    08:45:03,949 | INFO | er-3 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 11000
    08:45:03,968 | INFO | er-2 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 10000
    08:45:03,990 | INFO | er-4 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 9000
    08:45:04,017 | INFO | r-10 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 8000
    08:45:04,038 | INFO | er-1 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 7000
    08:45:04,056 | INFO | er-5 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 6000
    08:45:04,084 | INFO | er-6 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 5000
    08:45:04,106 | INFO | er-9 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 4000
    08:45:04,160 | INFO | ad-1 | uniworks.core.ThreadTester | ThreadTester: All Requests Sent
    08:45:08,107 | INFO | er-9 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 4000
    08:45:09,085 | INFO | er-6 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 5000
    08:45:10,056 | INFO | er-5 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 6000
    08:45:11,038 | INFO | er-1 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 7000
    08:45:12,017 | INFO | r-10 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 8000
    08:45:12,990 | INFO | er-4 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 9000
    08:45:13,968 | INFO | er-2 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 10000
    08:45:14,949 | INFO | er-3 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 11000
    08:45:14,954 | INFO | er-3 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 1000
    08:45:15,919 | INFO | er-7 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 12000
    08:45:15,924 | INFO | er-7 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 2000
    08:45:15,954 | INFO | er-3 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 1000
    08:45:17,924 | INFO | er-7 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 2000
    08:45:18,904 | INFO | er-8 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 15000
    08:45:18,908 | INFO | er-8 | uniworks.core.ThreadTestServiceImpl | Starting Thread: duration of pause = 3000
    08:45:21,909 | INFO | er-8 | uniworks.core.ThreadTestServiceImpl | Ending Thread: duration of pause = 3000

    --- --- ---

    <jms:listener-container
    container-type="default"
    connection-factory="pooledConnectionFactory"
    acknowledge="auto"
    concurrency="10-10">
    <jms:listener destination="springThreadTestService" ref="threadTestServiceImpl" method="onMessage" />
    </jms:listener-container>


    <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFacto ry">
    <property name="brokerURL" value="${jms.url}"/>
    </bean>

    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFa ctory">
    <property name="maxConnections" value="25"/>
    <property name="maximumActive" value="500"/>
    <property name="connectionFactory" ref="jmsConnectionFactory"/>
    </bean>

  2. #2
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,840

    Default

    I think you *might* be seeing prefetch behavior, i.e. the 4th request was actually prefetched by that first consumer. Can you run this same example with a prefetch of 0?

  3. #3
    Join Date
    Apr 2009
    Posts
    7

    Default

    I tried setting prefetch="0", was not allowed, threw a maxMessagesPerTask can't be 0 exception. Tried setting prefetch=1 also, didn't affect how it executed.

    prefetch seemed like a likely candidate... any other ideas? maybe not being configured with a TransactionManager?

    Any other ideas / areas to look at?

  4. #4
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,840

    Default

    Sorry, I was actually thinking about the ActiveMQ prefetch setting - not the value on the MessageListener container.

  5. #5
    Join Date
    Apr 2009
    Posts
    7

    Default solved

    Thanks! Setting the prefetch on the ActiveMQ/jms uri to 0 did fix the issue. It also made some of the confusing startup sequence go away.

    tcp://localhost:61616?jms.prefetchPolicy.all=0

  6. #6
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,840

    Default

    Glad to hear it! That's a common source of confusion. Of course, you might want to consider different prefetch values now that you understand what's happening when prefetch is enabled.

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •