Page 1 of 2 12 LastLast
Results 1 to 10 of 15

Thread: How to indicate that a JMSTemplate.receive() should use a DurableSubscriber

  1. #1
    Join Date
    May 2012
    Location
    NY
    Posts
    23

    Default How to indicate that a JMSTemplate.receive() should use a DurableSubscriber

    I want to do this without using the xml configuration. I am toying with Spring JMS to see if it meets my needs and if it does to adopt it for our project. Is there anyway of doing a synchronous receive using the

    Code:
    JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); 
    jmsTemplate.receive()
    so that it is equivalent to the pure JMS call:

    Code:
    MessageConsumer consumer = session.createDurableSubscriber(topic, "durable name"); 
    Message message = consumer.receive();
    Without needing to configure through xml?

  2. #2
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,040

    Default

    Again, it's better to use a DefaultListenerContainer for this use case, where you can simply configure that you want a durable subscription by calling setSubscriptionDurable and setDurableSubscriptionName.

    With the JmsTemplate you'd have to use one of the methods with a SessionCallback and create your own durable consumer and receive from it,
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  3. #3
    Join Date
    May 2012
    Location
    NY
    Posts
    23

    Default

    Thanks for the reply Gary,

    The problem is the kind of use case I have. I want a single connection to be MessageConsumer for Topic A and the same connection to be messageProducer for Topic B. If I use DefaultMessageListenerContainer the code looks like this:

    Code:
    dmlc = (DefaultMessageListenerContainer) factory.getBean("messageListener");
    		dmlc.start();
    When start() is called the createConnection() is called on the SingleConnectionFactory for this thread

    However I want access to the same connection in the onMessage() method because I want to perform a publish() within the same session.(enables the session commit() to affects both topic A and B simultaneously) This is not possible because the onMessage() is on a different thread and it results in SingleConnectionFactory creating a new connection for me.

    If I instead use synchronous receive() I lose the benefits of using the messageListenerContainer and will have to do the error handling and recovery by hand.

  4. #4
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,040

    Default

    But if you do the send() on the container's thread - the one that calls onMessage() (and the session is transacted), any JmsTemplate sends will all be done on the same session.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  5. #5
    Join Date
    May 2012
    Location
    NY
    Posts
    23

    Default

    Yes that makes sense. Am just wondering if every onMessage() is on a new thread. If thats the case I will have a new JMS Connection on every call because the SingleConnectionFactory gives a single connection per thread. Let me implement this and see if it works. Thanks a lot.

  6. #6
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,040

    Default

    No; the SingleConnectionFactory caches a single connection period.

    Each thread (controlled by concurrentConsumers) runs in a separate session on that single connection.

    If you use the subclass (CachingConnectionFactory), the sessions, consumers, producers are also cached - avoiding the overhead. By default, only one session is cached.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  7. #7
    Join Date
    May 2012
    Location
    NY
    Posts
    23

    Default

    I see, I tested this and I get a

    javax.jms.InvalidClientIDException: clientId already exists for every send within an onMessage() (except for the first onMessage()).
    I can see that all the onMessage() calls take place in the same thread. However it seems the jmsTemplate.send() is trying to call createConnection(). I tried using both SingleConnectionFactory and CachingConnectionFactory.

    Portion of stack trace for the javax.jms.InvalidClientIDException exception:

    Code:
    	at org.springframework.jms.connection.SingleConnectionFactory.doCreateConnection(SingleConnectionFactory.java:342)
    	at org.springframework.jms.connection.SingleConnectionFactory.initConnection(SingleConnectionFactory.java:288)
    	at org.springframework.jms.connection.SingleConnectionFactory.createConnection(SingleConnectionFactory.java:225)
    	at org.springframework.jms.support.JmsAccessor.createConnection(JmsAccessor.java:184)
    	at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:456)
    	at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:534)


    I also wrote a CustomSingleConnection factory that extends SingleConnectionFactory

    Code:
    public class CustomConnectionFactory extends SingleConnectionFactory{
    
    	ConnectionFactory factory = null;
    	Connection connection = null;
    	int count = 0;
    
    	public CustomConnectionFactory(ConnectionFactory factory){
    		super(factory);
    		this.factory = factory;
    	}
    
    	public Connection createConnection()
    	throws JMSException{
    		count++;
    		System.out.println("Create connection was called  " + count + " times in the thread " + Thread.currentThread().getId());
    		if(null==connection){
    			System.out.println("Connection was NULL creating new connection!");
    			connection = factory.createConnection();
    			System.out.println("Connection created with clientId " + connection.getClientID());
    		}
    		connection.setExceptionListener(new ExceptionListener() {
    			public void onException(JMSException ex) {
    				System.out.println("Connection got closed!!!!!!!!!!!!!" + ex.getMessage());
    
    			}
    		});
    		return connection;
    	}
    
    }
    CustomConnectionFactory works for me because the connection gets closed at the end of every send() call and recreated at the next send() call. But in any case I would'nt want a new connection to be opened everytime a send() is called.
    This is what my config looks like:

    Code:
    	<bean id="pubCachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    		<property name="targetConnectionFactory" ref="pubConnectionFactory" />
    		<property name="reconnectOnException" value="true"/>
    		<property name="sessionCacheSize" value="1" />
    	</bean>
    Any idea why the CachingConnectionFactory would not simply reuse the previously created connection?

  8. #8
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,040

    Default

    Did you set sessionTransacted to true on the DMLC?
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  9. #9
    Join Date
    May 2012
    Location
    NY
    Posts
    23

    Default

    Quote Originally Posted by Gary Russell View Post
    But if you do the send() on the container's thread - the one that calls onMessage() (and the session is transacted), any JmsTemplate sends will all be done on the same session.
    Can I also control the commit() and rollback() of the receive() from within the onMessage(). Sometimes I might need to rollback() or commit() a transaction manually

  10. #10
    Join Date
    May 2012
    Location
    NY
    Posts
    23

    Default

    Quote Originally Posted by Gary Russell View Post
    Did you set sessionTransacted to true on the DMLC?
    Yes here's my config:

    Code:
    <bean id="stagingMessageListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    		<property name="connectionFactory" ref="subCachingConnectionFactory"/>
    		<property name="destination" ref="stagingTopic"/>
    		<property name="messageListener" ref="messageListener"/>
    		<property name="sessionAcknowledgeModeName" value="AUTO_ACKNOWLEDGE"/>
    		<property name="subscriptionDurable" value="true"/>
    		<!-- TODO: name the durable -->
    		<property name="durableSubscriptionName" value="durable name" />
    		<!--Each message listener invocation will operate within an active JMS transaction, 
    			with message reception rolled back in case of listener execution failure-->
    		<property name="sessionTransacted" value="true" />
    		<!--cache a shared JMS Connection, a JMS Session, and a JMS MessageConsumer (CACHE_CONSUMER = 3)-->
    		<property name="cacheLevel" value="3"/>
    		<!-- use only a single MessageConsumer -->
    		<property name="maxConcurrentConsumers" value="1"/>
    		<property name="autoStartup" value="false"/>
    		
    	</bean>

Tags for this Thread

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •