Page 1 of 3 123 LastLast
Results 1 to 10 of 27

Thread: Using ActiveMq Topics Instead of Queues

  1. #1
    Join Date
    Sep 2008
    Location
    Chiva - Valencia - Spain
    Posts
    33

    Default Using ActiveMq Topics Instead of Queues

    Hi all,

    I'm trying to configure an integration between 2 activemq queues using JMS. In both queues is published topics for possible subscribers. I have that app-context configuration:

    Code:
    		<!-- Channel Definition -->
    		
    		<!-- This channel supports all the error messages from other channels -->		
    		<integration:channel id="errorChannel"></integration:channel>
    		
    		
    		<!-- Correlator channels -->		 
    		<integration:channel id="channel1"></integration:channel>
    		<!-- End correlator channels -->
    		
    		<!-- End Channel Definition -->
    		
    		<!-- Channel adapter definitions -->		
    		
    		<spring-jms:inbound-channel-adapter id="adapter1" channel="channel1"
    			connection-factory="connectionFact" destination="topicOut">
    		</spring-jms:inbound-channel-adapter>
    
    		<spring-jms:outbound-channel-adapter id="adapter2" channel="channel1"	connection-factory="connectionFact" destination="topicIn">
    		</spring-jms:outbound-channel-adapter>
    	
    		<!-- End channel adapter definitions -->
    		
    		<!-- ActiveMq Configuration -->
    		 
    		<bean id="connectionFact" class="org.apache.activemq.pool.PooledConnectionFactory" 
    			destroy-method="stop">
    			<property name="connectionFactory">
    				<bean class="org.apache.activemq.ActiveMQConnectionFactory">
    					<property name="brokerURL">
    					<value>tcp://localhost:61616</value>
    					</property>
    				</bean>
    			</property>
    		</bean>
    			
    		<!-- Topics -->    	    	
        	<bean id="topicOut" class="org.apache.activemq.command.ActiveMQTopic">
            	<property name="physicalName" value="${topic.event.out}"/>
        	</bean>
        	    	
        	<bean id="topicIn" class="org.apache.activemq.command.ActiveMQTopic">
            	<property name="physicalName" value="${topic.event.in}"/>
        	</bean>				
    		<!-- End Topics -->
    And I'm getting that error:

    Code:
    ERROR org.springframework.integration.channel.MessagePublishingErrorHandler  - failure occurred in messaging task with message: {message description}
    org.springframework.integration.message.MessageDeliveryException: Dispatcher has no subscribers.
    	at org.springframework.integration.dispatcher.SimpleDispatcher.dispatch(SimpleDispatcher.java:39)
    	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:56)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:116)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:94)
    	at org.springframework.integration.channel.MessageChannelTemplate.doSend(MessageChannelTemplate.java:222)
    	at org.springframework.integration.channel.MessageChannelTemplate.send(MessageChannelTemplate.java:179)
    	at org.springframework.integration.endpoint.SourcePollingChannelAdapter.doPoll(SourcePollingChannelAdapter.java:78)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.innerPoll(AbstractPollingEndpoint.java:233)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.poll(AbstractPollingEndpoint.java:217)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:210)
    	at org.springframework.integration.scheduling.SimpleTaskScheduler$ErrorHandlingRunnableWrapper.run(SimpleTaskScheduler.java:307)
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
    	at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
    	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
    	at org.springframework.integration.scheduling.SimpleTaskScheduler$TriggeredTask.run(SimpleTaskScheduler.java:256)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    	at java.lang.Thread.run(Thread.java:619)
    Can anyone help me?¿

    Thanks A Lot!!!!

    NEStabur
    Last edited by nestabur; Nov 21st, 2008 at 02:38 AM.

  2. #2
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,840

    Default

    It looks like you need to add a handler subscribed to the 'errorChannel' or else add a <queue/> sub-element so that it is not a dispatching channel (that is what throws the exception - dispatcher has no subscribers). Also, note that a default 'errorChannel' (with a queue) is created behind the scenes automatically, so you can leave that out unless you have some custom configuration requirements.

  3. #3
    Join Date
    Sep 2008
    Location
    Chiva - Valencia - Spain
    Posts
    33

    Default

    Thanks Mark, it works. But now I dont receive anything from the ouput of the channel ...

    The code is the same ...

  4. #4
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,840

    Default

    Can you provide some more details. How are you passing a Message?

  5. #5
    Join Date
    Sep 2008
    Location
    Chiva - Valencia - Spain
    Posts
    33

    Default

    As follows:

    Code:
    _jmsTemplate.send(_des, new MessageCreator() {
    					public Message createMessage(Session session)
    							throws JMSException {
    						Message msg = session.createMessage();
    						msg.setStringProperty("message", "test");
    						return msg;
    					}

  6. #6
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,840

    Default

    And, how are you receiving the JMS reply?

    Also, you may want to enable DEBUG-level logging for now to see if that provides more insight.

  7. #7
    Join Date
    Sep 2008
    Location
    Chiva - Valencia - Spain
    Posts
    33

    Default

    Hi Mark,

    I'm not receiving any response from the channel, even when I activate the channel error all the messages are delivered there and the message is empty ...

    Here my send message class:
    Code:
    _jmsTemplate.send(_des, new MessageCreator() {
    
    					public Message createMessage(Session session)
    							throws JMSException {
    						// TODO Auto-generated method stub
    						Message msg = session.createMessage();
    						msg.setStringProperty("message", "test");
    						return msg;
    					}
    				});
    				try {
    					sleep(_delay);
    				}
    				catch (InterruptedException e) {
    					// TODO Auto-generated catch block
    					WriteLog(e.getMessage());
    				}

  8. #8
    Join Date
    Sep 2008
    Location
    Chiva - Valencia - Spain
    Posts
    33

    Default

    I've activated the DEBUG level but spring didn't write anything in the log.

    Here my trace:

    Code:
    log4j:ERROR Could not find value for key log4j.appender.myMail
    log4j:ERROR Could not instantiate appender named "myMail".
    0    [main] INFO  org.springframework.context.support.ClassPathXmlApplicationContext  - Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@bfc8e0: display name [org.springframework.context.support.ClassPathXmlApplicationContext@bfc8e0]; startup date [Fri Nov 21 16:43:17 CET 2008]; root of context hierarchy
    84   [main] INFO  org.springframework.beans.factory.xml.XmlBeanDefinitionReader  - Loading XML bean definitions from class path resource [ApplicationContextInfraestructure.xml]
    353  [main] INFO  org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor  - Initializing ThreadPoolExecutor
    367  [main] INFO  org.springframework.beans.factory.support.DefaultListableBeanFactory  - Overriding bean definition for bean 'errorChannel': replacing [Root bean: class [org.springframework.integration.channel.QueueChannel]; scope=singleton; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null] with [Generic bean: class [org.springframework.integration.channel.QueueChannel]; scope=singleton; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null]
    383  [main] INFO  org.springframework.beans.factory.support.DefaultListableBeanFactory  - Overriding bean definition for bean 'correlatedBasicEventOutput': replacing [Generic bean: class [org.springframework.integration.config.ConsumerEndpointFactoryBean]; scope=singleton; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null] with [Generic bean: class [org.springframework.integration.config.ConsumerEndpointFactoryBean]; scope=singleton; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null]
    384  [main] INFO  org.springframework.beans.factory.support.DefaultListableBeanFactory  - Overriding bean definition for bean 'correlatedBasicEventInput': replacing [Generic bean: class [org.springframework.integration.endpoint.SourcePollingChannelAdapter]; scope=singleton; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null] with [Generic bean: class [org.springframework.integration.config.ConsumerEndpointFactoryBean]; scope=singleton; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null]
    387  [main] INFO  org.springframework.beans.factory.xml.XmlBeanDefinitionReader  - Loading XML bean definitions from class path resource [testcontext.xml]
    442  [main] INFO  org.springframework.context.support.ClassPathXmlApplicationContext  - Bean factory for application context [org.springframework.context.support.ClassPathXmlApplicationContext@bfc8e0]: org.springframework.beans.factory.support.DefaultListableBeanFactory@16546ef
    525  [main] INFO  org.springframework.beans.factory.config.PropertyPlaceholderConfigurer  - Loading properties file from class path resource [config.properties]
    642  [main] INFO  org.springframework.integration.endpoint.PollingConsumer  - started correlatedBasicEventInput
    644  [main] INFO  org.springframework.integration.endpoint.PollingConsumer  - started correlatedBasicEventOutput
    645  [main] INFO  org.springframework.beans.factory.support.DefaultListableBeanFactory  - Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@16546ef: defining beans [propertyConfigurer,errorChannel,org.springframework.integration.channel.MessagePublishingErrorHandler#0,taskScheduler,com.s2grupo.triton.integration.interceptors.ChannelInterceptor#0,com.s2grupo.triton.integration.interceptors.ChannelInterceptor#1,correlatedBasicEvent,correlatedBasicAlarm,org.springframework.integration.jms.JmsDestinationPollingSource#0,correlatedBasicEventInput,org.springframework.integration.jms.JmsSendingMessageHandler#0,correlatedBasicEventOutput,org.springframework.integration.jms.JmsDestinationPollingSource#1,correlatedBasicAlarmInput,org.springframework.integration.jms.JmsSendingMessageHandler#1,org.springframework.integration.jms.JmsSendingMessageHandler#2,tritonJmsFactory,emasJmsFactory,alarmTopicOut,alarmTopicIn,eventTopicOut,eventTopicIn,errorTopic,eventTopicTest,jmsFactory,testConnectionFactory,listenerTest,jmsContainer,errorListener,jmsErrorContainer]; root of factory hierarchy
    665  [main] INFO  org.springframework.integration.endpoint.SourcePollingChannelAdapter  - started correlatedBasicAlarmInput
    1007 [main] INFO  org.springframework.integration.scheduling.SimpleTaskScheduler  - started org.springframework.integration.scheduling.SimpleTaskScheduler@113beb5
    1007 [main] INFO  Main  - ====> Cargando test de envio.
    1008 [task-scheduler-2] INFO  Main  - Receiving in channelerrorChannel
    1008 [Thread-3] INFO  Main  - Escribiendo...
    1008 [main] INFO  Main  - ====> Cargado test de envio.
    1048 [Thread-3] INFO  Main  - Mensage test enviado
    2009 [task-scheduler-5] INFO  Main  - Receiving in channelerrorChannel
    3009 [task-scheduler-4] INFO  Main  - Receiving in channelerrorChannel
    3061 [Thread-3] INFO  Main  - Escribiendo...
    3072 [Thread-3] INFO  Main  - Mensage test enviado
    4010 [task-scheduler-5] INFO  Main  - Receiving in channelerrorChannel
    5010 [task-scheduler-2] INFO  Main  - Receiving in channelerrorChannel

  9. #9
    Join Date
    Sep 2008
    Location
    Chiva - Valencia - Spain
    Posts
    33

    Default

    As you can see in the post, all the messages are delivered to channel error.

  10. #10
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,840

    Default

    in your error channel handler, grab the Message payload (message.getPayload()). It will be an Exception instance. Then, call printStackTrace() on that.

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •