Hello,
Following up on my earlier post, and based on Mark Fisher's suggestions, I worked up examples of a topic publisher and subscriber that provides for guaranteed delivery of messages from the publisher to the subscriber until the subscriber disconnects. This example works on JBoss Messaging (1.4.0.SP3 on JBoss 4.2.2.GA), and does not require a durable topic. It avoids lost messages that may occur if the publisher disconnects from the JMS server before the subscriber has reconnected.
The JNDI-referenced connection factory is passed to either a Spring SingleConnectionFactory or CachingConnectionFactory bean which in turn is used in the channel adapters. These beans do not disconnect the topic session between sends or polls. I don't know if there is any advantage between the two types of factories.
Additionally, the message-driven-channel-adapter provides for listening on the topic without having to specify a poll interval. This is convenient, but the inbound-channel-adapter with a poll interval also works OK as long as one of the above connection factories are used.
Below are examples of the spring beans:
Publisher:
Code:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:file="http://www.springframework.org/schema/integration/file"
xmlns:jms="http://www.springframework.org/schema/integration/jms"
xmlns:stream="http://www.springframework.org/schema/integration/stream"
xsi:schemaLocation="http://www.springframework.org/schema/beans
org/springframework/beans/factory/xml/spring-beans-2.5.xsd
http://www.springframework.org/schema/integration
org/springframework/integration/config/xml/spring-integration-1.0.xsd
http://www.springframework.org/schema/integration/jms
org/springframework/integration/jms/config/spring-integration-jms-1.0.xsd
http://www.springframework.org/schema/integration/file
org/springframework/integration/file/config/spring-integration-file-1.0.xsd
http://www.springframework.org/schema/integration/stream
org/springframework/integration/stream/config/spring-integration-stream-1.0.xsd">
<bean id="localConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiName" value="java:/ConnectionFactory"/>
</bean>
<bean id="localTestDestination" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiName" value="/topic/testTopic2"/>
</bean>
<int:channel id="testChannel"/>
<jms:outbound-channel-adapter
id="testOutboundAdapter"
connection-factory="localConnectionFactory"
destination="localTestDestination"
channel="testChannel"/>
</beans>
Subscriber:
Code:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:file="http://www.springframework.org/schema/integration/file"
xmlns:jms="http://www.springframework.org/schema/integration/jms"
xmlns:stream="http://www.springframework.org/schema/integration/stream"
xsi:schemaLocation="http://www.springframework.org/schema/beans
org/springframework/beans/factory/xml/spring-beans-2.5.xsd
http://www.springframework.org/schema/integration
org/springframework/integration/config/xml/spring-integration-1.0.xsd
http://www.springframework.org/schema/integration/jms
org/springframework/integration/jms/config/spring-integration-jms-1.0.xsd
http://www.springframework.org/schema/integration/file
org/springframework/integration/file/config/spring-integration-file-1.0.xsd
http://www.springframework.org/schema/integration/stream
org/springframework/integration/stream/config/spring-integration-stream-1.0.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<bean id="localConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiName" value="java:/ConnectionFactory"/>
</bean>
<!-- <bean id="persistentConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">-->
<!-- <property name="targetConnectionFactory" ref="localConnectionFactory"/>-->
<!-- </bean>-->
<bean id="persistentConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="localConnectionFactory"/>
</bean>
<bean id="localTestDestination" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiName" value="/topic/testTopic2"/>
</bean>
<int:publish-subscribe-channel id="testChannel"/>
<!-- <jms:inbound-channel-adapter-->
<!-- id="testInboundAdapter"-->
<!-- connection-factory="persistentConnectionFactory" -->
<!-- destination="localTestDestination" -->
<!-- channel="testChannel">-->
<!-- <int:poller>-->
<!-- <int:interval-trigger interval="500"/>-->
<!-- </int:poller>-->
<!-- </jms:inbound-channel-adapter>-->
<jms:message-driven-channel-adapter
id="testInboundAdapter"
connection-factory="persistentConnectionFactory"
destination="localTestDestination"
channel="testChannel"/>
<bean id="testConsumer" class="org.springframework.integration.endpoint.EventDrivenConsumer">
<constructor-arg ref="testChannel"/>
<constructor-arg ref="testMessageHandler"/>
</bean>
<bean id="testMessageHandler" class="org.cubrc.examples.jms.TestMessageHandler"/>
</beans>
Thanks for the many good suggestions,
Bob