Hi Guys,
I've an ActiveMQ JMS Topic and 3 Subscribers setup and visible in JMX but messages are never consumed from the Topic.
I've extracted the parts of the configuration below that I think are important where I've tried to setup a ActiveMQ Topic and 3 of listeners consume messages from the topic (the full XML config is at the end of the post).
I believe I can see the listeners and the messages enqueued on the topic but they are never consumed.
The classes configured as listeners all implement the MessageListener interface and I've also tried to add it as a MDP. I can see the classes being initialized, that code also included at the end.
Here's the elements from the config XML that setup the listener:
Code:<jms:listener-container container-type="default" connection-factory="jmsConsumerConnectionFactory" acknowledge="auto" destination-type="topic"> <jms:listener destination="smarter.jms.topic" ref="asyncFCPolicyHandler" subscription="subscription"/> <jms:listener destination="smarter.jms.topic" ref="asyncTPOPolicyHandler" /> </jms:listener-container> <bean id="mdpMessageListener" class="ie.smarter.jms.AsyncFCPolicyHandler" />
I've used a little tool to inspect the JMX console and I can see the Topic is setup, there are 3 subscribers to the topic but messages are never taken off the topic, based on the length of the "Enqueue Count".
Any help is appreciated on where I've gone wrong! I am sure it is something dumb I am doing
Here's the output of the JMX console using JMXTerm (changed the list order of 'get *' for simpler reading):
Here's my full XML config:Code:Oisin-Kims-MacBook-Air:SpringJMS oisin$ java -jar jmxterm-1.0-alpha-4-uber.jar Welcome to JMX terminal. Type "help" for available commands. $>jvms 9712 ( ) - org.eclipse.jdt.internal.junit.runner.RemoteTestRunner -version 3 -port 54604 -testLoaderClass org.eclipse.jdt.internal.junit4.runner.JUnit4TestLoader -loaderpluginname org.eclipse.jdt.junit4.runtime -classNames ie.smarter.jms.JmsMessageListenerTest 9693 ( ) - jmxterm-1.0-alpha-4-uber.jar $>open 9712 #Connection to 9712 is opened $>domain org.apache.activemq #domain is set to org.apache.activemq $>bean org.apache.activemq:BrokerName=localhost,Destination=smarter.jms.topic,Type=Topic $>get * #mbean = org.apache.activemq:BrokerName=localhost,Destination=smarter.jms.topic,Type=Topic: Name = smarter.jms.topic; ConsumerCount = 3; EnqueueCount = 100; QueueSize = 100; Subscriptions = [ org.apache.activemq:BrokerName=localhost,Type=Subscription,persistentMode=Non-Durable,destinationType=Topic,destinationName=smarter.jms.topic,clientId=ID_Oisin-Kims-MacBook-Air.local-54612-1319298591637-3_1,consumerId=ID_Oisin-Kims-MacBook-Air.local-54612-1319298591637-2_1_3_1, org.apache.activemq:BrokerName=localhost,Type=Subscription,persistentMode=Non-Durable,destinationType=Topic,destinationName=smarter.jms.topic,clientId=ID_Oisin-Kims-MacBook-Air.local-54612-1319298591637-3_1,consumerId=ID_Oisin-Kims-MacBook-Air.local-54612-1319298591637-2_1_2_1, org.apache.activemq:BrokerName=localhost,Type=Subscription,persistentMode=Non-Durable,destinationType=Topic,destinationName=smarter.jms.topic,clientId=ID_Oisin-Kims-MacBook-Air.local-54612-1319298591637-3_1,consumerId=ID_Oisin-Kims-MacBook-Air.local-54612-1319298591637-2_1_1_1 ]; ProducerFlowControl = true; MemoryPercentUsage = 0; MemoryLimit = 67108864; MaxProducersToAudit = 1024; MaxAuditDepth = 2048; MaxPageSize = 200; UseCache = true; DispatchCount = 0; DequeueCount = 0; InFlightCount = 0; ExpiredCount = 0; ProducerCount = 0; MemoryUsagePortion = 1.0; MaxEnqueueTime = 0; MinEnqueueTime = 0; AverageEnqueueTime = 0.0;
Code:<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:amq="http://activemq.apache.org/schema/core" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schem...ring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schem...ng-context.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schem...spring-jms.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.5.0.xsd"> <context:component-scan base-package="ie.smarter.jms" /> <!-- Embedded ActiveMQ Broker --> <amq:broker id="broker" useJmx="true" persistent="false"> <amq:transportConnectors> <amq:transportConnector uri="tcp://localhost:0" /> </amq:transportConnectors> </amq:broker> <!-- ActiveMQ Destination --> <amq:topic id="topic.destination" physicalName="smarter.jms.topic" name="smarter.jms.topic"/> <!-- JMS ConnectionFactory to use, configuring the embedded broker using XML --> <amq:connectionFactory id="jmsFactory" brokerURL="vm://localhost" /> <!-- JMS Producer Configuration --> <bean id="jmsProducerConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory" depends-on="broker" p:targetConnectionFactory-ref="jmsFactory" /> <bean id="jmsProducerTemplate" class="org.springframework.jms.core.JmsTemplate" p:connectionFactory-ref="jmsProducerConnectionFactory" p:defaultDestination-ref="topic.destination" p:pubSubDomain ="true" /> <!-- JMS Consumer Configuration --> <bean id="jmsConsumerConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory" depends-on="broker" p:targetConnectionFactory-ref="jmsFactory" /> <jms:listener-container container-type="default" connection-factory="jmsConsumerConnectionFactory" acknowledge="auto" destination-type="topic"> <jms:listener destination="smarter.jms.topic" ref="asyncFCPolicyHandler" subscription="subscription"/> <jms:listener destination="smarter.jms.topic" ref="asyncTPOPolicyHandler" /> </jms:listener-container> <!-- Adding a 3rd Message Driven POJO (MDP) listener --> <bean id="mdpMessageListener" class="ie.smarter.jms.AsyncFCPolicyHandler" /> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="jmsConsumerConnectionFactory"/> <property name="destination" ref="topic.destination"/> <property name="messageListener" ref="mdpMessageListener" /> <property name="pubSubDomain" value="true" /> </bean> <!-- Counter for consumer to increment and test to verify count --> <bean id="counter" class="java.util.concurrent.atomic.AtomicInteger" /> </beans>
Here's a sample class that is a listener and also tried as a MDP:
Code:package ie.smarter.jms; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * Consumes messages from a JMS topic. * * @author Oisin Kim */ @Component public class AsyncFCPolicyHandler implements MessageListener { private static final Logger logger = LoggerFactory.getLogger(AsyncFCPolicyHandler.class); @Autowired private AtomicInteger counter = null; static { logger.info("AsyncFCPolicyHandler initalised"); } /** * Implementation of <code>MessageListener</code>. */ public void onMessage(Message message) { try { long policyId = message.getLongProperty(JmsMessageProducer.POLICY_ID); if (logger.isInfoEnabled()) logger.info("(1) Handling FC request to processed message for Policy ["+ policyId+"]"); //this increments the AtomicInteger counter.incrementAndGet(); } catch (JMSException e) { logger.error(e.getMessage(), e); } } }


Reply With Quote
