View Full Version : Durable Topic with jms:publish-subscribe-channel
fengwu
Jan 13th, 2010, 07:54 PM
First let me say I really enjoyed working with Spring Integration, along with ActiveMQ. Good Stuff.
I am prototyping an application with spring integration 2.0.0.M1 and ActiveMQ 5.3. I am just wondering is there a way to use the nice jms:publish-subscribe-channel with a durable topic. I did not see obvious configuration options with the JMS backed channel so I started to define MessageListenerContainer etc. It looks like I did create a durable consumer that way, however, it also looked like this explicitly defined MessageListenerContainer had no effect on the JMS backed channel.
Oh, the JMS backed channel worked nicely with a regular non-durable ActiveMQ topic.
Any pointers will be much appreciated.
-Feng
fengwu
Feb 12th, 2010, 06:35 PM
Finally got it to work after shifting priorities took me to some other projects since I lasted posted the question above. I'll document my configuration in case any one is interested.
First all, here is the my consumer configuration that did not work, from a durable consumer standpoint:
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnecti onFactory">
<property name="targetConnectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
</property>
<property name="sessionCacheSize" value="10"/>
<property name="cacheProducers" value="false"/>
<property name="clientId" value="myDurableConsumer.instance.01"/>
</bean>
<!-- jms backed channel works nicely however cannot be made durable with spring integration 2.0.0.M1 -->
<jms:publish-subscribe-channel id="accounts" topic-name="topic.categorized.events" />
<si:chain input-channel="accounts" output-channel="accountsOut">
<si:transformer id="accountTransformer" ref="accountTransformerBean"/>
<si:filter ref="myFilter" method="accept"/>
</si:chain>
The above configuration does create a consumer, however it is not durable. From what I can find in docs and online, it does not seem it is possible to configure a durable subscriber this way.
What worked is the message driven inbound adapter, which I can hook it up with a DefaultMessageListenerContainer, like so:
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnecti onFactory">
<property name="targetConnectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
</property>
<property name="sessionCacheSize" value="10"/>
<property name="cacheProducers" value="false"/>
<property name="clientId" value="myDurableConsumer.instance.01"/>
</bean>
<bean id="defaultMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageLis tenerContainer">
<property name="concurrentConsumers" value="1"/>
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destinationName" value="topic.categorized.events"/>
<!--property name="messageListener" ref="notReallyNeededHere"/-->
<property name="sessionTransacted" value="true"/>
<property name="pubSubDomain" value="true"/>
<property name="subscriptionDurable" value="true"/>
<property name="durableSubscriptionName" value="mailer"/>
</bean>
<jms:message-driven-channel-adapter id="jmsInAdapter" channel="accounts" container="defaultMessageListenerContainer" />
<si:chain input-channel="accounts" output-channel="accountsOut">
<si:transformer id="accountTransformer" ref="accountTransformerBean"/>
<si:filter ref="myFilter" method="accept"/>
</si:chain>
The only minor issue that I saw are a couple WARN messages appeared in the logs, when I took the consumer application down and bring it back up again, see below. In this case, if and only if there are messages waiting on the topic for the consumer, then the following messages are logged twice at the consumer application start up, but everything else seems to work fine. Subsequent new messages do not trigger any more WARN messages.
I have not looked into the source code of DefaultMessageListenerContainer to see whether that warning can be somehow turned off. In the above configuration, technically the jmsInAdapter is the "messageListener" property for the defaultMessageListenerContainer, so ideally it should not have complained. Could this be a timing issue? Could be that at the start up, defaultMessageListenerContainer already started to receive messages while the jmsInAdapter has not been created yet by Spring. From the following log messages, it does seems that my jmsInAdapter was started after defaultMessageListenerContainer has been started and has already received messages from the topic and tried to invoke its "messageHandler".
[INFO] ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler'
[INFO] EventDrivenConsumer - started logger.adapter
[INFO] ThreadPoolTaskExecutor - Initializing ExecutorService 'requestExecutor'
[INFO] CachingConnectionFactory - Established shared JMS Connection: ActiveMQConnection {id=ID:zhanlu-61442-1266020738529-0:0,clientId=courier.instance.01,started=false}
[WARN] DefaultMessageListenerContainer - Execution of JMS message listener failed, and no ErrorHandler has been set. <java.lang.IllegalStateException: No message listener specified - see property 'messageListener'>java.lang.IllegalStateException: No message listener specified - see property 'messageListener'
at org.springframework.jms.listener.AbstractMessageLi stenerContainer.invokeListener(AbstractMessageList enerContainer.java:505)
at org.springframework.jms.listener.AbstractMessageLi stenerContainer.doExecuteListener(AbstractMessageL istenerContainer.java:467)
at org.springframework.jms.listener.AbstractPollingMe ssageListenerContainer.doReceiveAndExecute(Abstrac tPollingMessageListenerContainer.java:323)
at org.springframework.jms.listener.AbstractPollingMe ssageListenerContainer.receiveAndExecute(AbstractP ollingMessageListenerContainer.java:261)
at org.springframework.jms.listener.DefaultMessageLis tenerContainer$AsyncMessageListenerInvoker.invokeL istener(DefaultMessageListenerContainer.java:976)
at org.springframework.jms.listener.DefaultMessageLis tenerContainer$AsyncMessageListenerInvoker.execute OngoingLoop(DefaultMessageListenerContainer.java:9 68)
at org.springframework.jms.listener.DefaultMessageLis tenerContainer$AsyncMessageListenerInvoker.run(Def aultMessageListenerContainer.java:870)
at java.lang.Thread.run(Thread.java:637)
[WARN] DefaultMessageListenerContainer - Execution of JMS message listener failed, and no ErrorHandler has been set. <java.lang.IllegalStateException: No message listener specified - see property 'messageListener'>java.lang.IllegalStateException: No message listener specified - see property 'messageListener'
at org.springframework.jms.listener.AbstractMessageLi stenerContainer.invokeListener(AbstractMessageList enerContainer.java:505)
at org.springframework.jms.listener.AbstractMessageLi stenerContainer.doExecuteListener(AbstractMessageL istenerContainer.java:467)
at org.springframework.jms.listener.AbstractPollingMe ssageListenerContainer.doReceiveAndExecute(Abstrac tPollingMessageListenerContainer.java:323)
at org.springframework.jms.listener.AbstractPollingMe ssageListenerContainer.receiveAndExecute(AbstractP ollingMessageListenerContainer.java:261)
at org.springframework.jms.listener.DefaultMessageLis tenerContainer$AsyncMessageListenerInvoker.invokeL istener(DefaultMessageListenerContainer.java:976)
at org.springframework.jms.listener.DefaultMessageLis tenerContainer$AsyncMessageListenerInvoker.execute OngoingLoop(DefaultMessageListenerContainer.java:9 68)
at org.springframework.jms.listener.DefaultMessageLis tenerContainer$AsyncMessageListenerInvoker.run(Def aultMessageListenerContainer.java:870)
at java.lang.Thread.run(Thread.java:637)
[INFO] EventDrivenConsumer - started org.springframework.integration.config.ConsumerEnd pointFactoryBean#0
[INFO] EventDrivenConsumer - started org.springframework.integration.config.ConsumerEnd pointFactoryBean#1
[INFO] EventDrivenConsumer - started org.springframework.integration.endpoint.EventDriv enConsumer#0
[INFO] JmsMessageDrivenEndpoint - started jmsInAdapter
By the way, I am using Spring 3.0 RC1 and Spring Integration 2.0.0.M1.
Thanks.
ahbrown41
Jul 19th, 2010, 09:35 AM
I am very interested in this topic. Can anyone confirm if there is a better way not that both tools have advanced a few iterations? This appears to be a scarce subject. What about ActiveMQ "Virtual Destinations"?
Thanks in advance!
Powered by vBulletin® Version 4.2.1 Copyright © 2013 vBulletin Solutions, Inc. All rights reserved.