Page 1 of 2 12 LastLast
Results 1 to 10 of 14

Thread: confused about mesage gateway with JMS

  1. #1

    Unhappy confused about mesage gateway with JMS

    Hi,

    I am trying to port an example of the Spring Batch samples to JMS. The sample is available here[1]

    Basically, the ChunkMessageChannelItemWriter sends a request objet to the request channel and wait the responses coming back in the response channel.

    The message endpoint is the ChunkProcessorChunkHandler.

    I would like to upgrade this to a JMS-based example so that:

    * The request and response objects are handled by JMS queues
    * The message endpoint is an MessageListener where the number of concurrent theads can be configured

    (Long story short: upgrade this sample to a multi-thread / multi-process) sample.

    I have tried the following but the execution is executed in a single thread (probably missing an indirection)

    Code:
    <integration:annotation-config/>
        <integration:channel id="requests"/>
        <integration:channel id="replies">
            <integration:queue/>
        </integration:channel>
    
        <integration:poller max-messages-per-poll="1" id="defaultPoller"
                            default="true">
            <integration:interval-trigger interval="3000"/>
        </integration:poller>
    
        <bean id="messagingGateway" class="org.springframework.integration.gateway.SimpleMessagingGateway">
            <property name="requestChannel" ref="requests"/>
            <property name="replyChannel" ref="replies"/>
            <property name="replyTimeout" value="1000"/>
        </bean>
    
        <jms:message-driven-channel-adapter id="jmsIn" destination="batchChunkInQueue" channel="requests"/>
        <jms:outbound-channel-adapter id="jmsOut" destination="batchChunkOutQueue" channel="replies"/>
        <integration:service-activator input-channel="requests" output-channel="replies" ref="chunkHandler"/>
    Any advice or a reference to the documentation would be appreciated.

    Thanks!

    [1] https://src.springframework.org/svn/...ts-context.xml

  2. #2
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,840

    Default

    The default number of concurrent consumers for the underlying message listener container is 1. You can modify that setting through the 'concurrent-consumers' and/or 'max-concurrent-consumers' attributes of the "message-driven-channel-adapter" element.

    Please post back to let us know if that solves your problem.

  3. #3

    Default

    Nope it did not. I think I am missing some base concepts here.

    Here's what I want:

    * Request is a JMS queue
    * Response is a JMS queue
    * ChunkHandler receives an object from the request queue and sends back a response to the response queue (Service Activator)
    * Regarding the messaging gateway, a call to send should send a JMS message on the request queue and a call to receive should act as a standard MessageListener on the response queue
    * The number of concurrent ChunkHandler instances reading for message can be configured (on-the-fly configuration as well)


    Code:
    <integration:channel id="requests"/>
        <integration:channel id="replies">
            <integration:queue/>
        </integration:channel> 
    
        <integration:poller max-messages-per-poll="1" id="defaultPoller"
                            default="true">
            <integration:interval-trigger interval="3000"/>
        </integration:poller>
    
        <bean id="messagingGateway" class="org.springframework.integration.gateway.SimpleMessagingGateway">
            <property name="requestChannel" ref="requests"/>
            <property name="replyChannel" ref="replies"/>
            <property name="replyTimeout" value="1000"/>
        </bean>
    
        <jms:message-driven-channel-adapter id="jmsIn" destination="batchChunkInQueue"
                                            channel="requests" concurrent-consumers="5"/>
        <jms:outbound-channel-adapter id="jmsOut" destination="batchChunkOutQueue" channel="replies"/>
        <integration:service-activator input-channel="requests" output-channel="replies" ref="chunkHandler"/>
    Thanks
    Last edited by snicoll; Dec 21st, 2009 at 09:24 AM.

  4. #4

    Exclamation

    OK. things are improving a bit but I am still puzzled that I can't find a working example of a JMS request/reply mechanism with SI.

    Here's the updated config (probably ugly and overcomplicated)

    Code:
     <integration:annotation-config/>   
    
        <integration:channel id="chunkOutput"/>
        <integration:channel id="chunkStatus"/>
        
         <bean id="messagingGateway" class="org.springframework.integration.gateway.SimpleMessagingGateway">
            <property name="requestChannel" ref="chunkOutput"/>
            <property name="replyChannel" ref="chunkStatus"/>
            <property name="replyTimeout" value="1000"/>
        </bean>
    
        <jms:outbound-gateway id="chunkSender"
                              request-channel="chunkOutput"
                              request-destination="batchChunkInQueue"/>
        <jms:inbound-gateway id="chunkResponseReceiver" request-channel="chunkStatus"
                             request-destination="batchChunkOutQueue"/>
    
    
        <!-- Messages are received from the batchChunkInQueue and send to the chunkRequests channel. These
             are processed by the chunkHandler service activator -->
        <integration:channel id="chunkRequests"/>
        <integration:channel id="chunkResponses"/>
    
        <jms:inbound-gateway id="chunkReceiver" request-channel="chunkRequests"
                             request-destination="batchChunkInQueue"/>
        <jms:outbound-gateway id="chunkResponseSender"
                              request-channel="chunkResponses"
                              request-destination="batchChunkOutQueue"/>
    
        <integration:service-activator input-channel="chunkRequests" output-channel="chunkResponses" ref="chunkHandler"/>
    All works well until the receiver is supposed to dispatch the response

    Code:
    2009-12-22 11:20:29 [org.springframework.jms.listener.DefaultMessageListenerContainer#0-1] DirectChannel [DEBUG] preSend on channel 'chunkStatus', message: [Payload=ChunkResponse: jobId=1, stepContribution=[StepContribution: read=0, written=0, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], successful=false][Headers={springintegration_jms_messageId=ID:llnp186-2335-1261477138812-2:3:1:1:1, springintegration_replyChannel=org.springframework.integration.channel.MessageChannelTemplate$TemporaryReplyChannel@30cd64, springintegration_timestamp=1261477229031, springintegration_id=d77a2055-13e4-4327-a38c-7f4e4995c380, springintegration_jms_redelivered=false, springintegration_jms_replyTo=temp-queue://ID:llnp186-2335-1261477138812-2:3:1, springintegration_errorChannel=org.springframework.integration.channel.MessageChannelTemplate$TemporaryReplyChannel@30cd64}]
    2009-12-22 11:20:29 [org.springframework.jms.listener.DefaultMessageListenerContainer#0-1] DefaultMessageListenerContainer [WARN] Execution of JMS message listener failed
    org.springframework.integration.message.MessageDeliveryException: Dispatcher has no subscribers.
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:97)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:90)
    	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:43)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:116)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:94)
    	at org.springframework.integration.channel.MessageChannelTemplate.doSend(MessageChannelTemplate.java:223)
    	at org.springframework.integration.channel.MessageChannelTemplate.doSendAndReceive(MessageChannelTemplate.java:248)
    	at org.springframework.integration.channel.MessageChannelTemplate.sendAndReceive(MessageChannelTemplate.java:215)
    	at org.springframework.integration.channel.MessageChannelTemplate.sendAndReceive(MessageChannelTemplate.java:203)
    	at org.springframework.integration.jms.ChannelPublishingJmsMessageListener.onMessage(ChannelPublishingJmsMessageListener.java:209)
    	at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:518)
    	at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:479)
    	at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:451)
    	at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:323)
    	at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:261)
    	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:982)
    	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:974)
    	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:876)
    	at java.lang.Thread.run(Thread.java:619)
    Apparently, my message gateway is not susbribed to the chunkStatus channel?

  5. #5

    Default

    replying to self:

    Code:
        <integration:annotation-config/>
    
    
        <integration:channel id="chunkOutput"/>
        <integration:channel id="chunkStatus">
            <integration:queue/>
        </integration:channel>
    
        <bean id="messagingGateway" class="org.springframework.integration.gateway.SimpleMessagingGateway">
            <property name="requestChannel" ref="chunkOutput"/>
            <property name="replyChannel" ref="chunkStatus"/>
            <property name="replyTimeout" value="1000"/>
        </bean>
    
    
        <jms:outbound-channel-adapter id="chunkSender" destination="batchChunkInQueue" channel="chunkOutput"/>
        <jms:message-driven-channel-adapter id="chunkResponseReceiver" destination="batchChunkOutQueue"
                                            channel="chunkStatus"/>
    
    
        <!-- Messages are received from the batchChunkInQueue and send to the chunkRequests channel. These
            are processed by the chunkHandler service activator -->
        <integration:channel id="chunkRequests"/>
        <integration:channel id="chunkResponses"/>
    
        <jms:message-driven-channel-adapter id="chunkReceiver" destination="batchChunkInQueue" channel="chunkRequests"/>
        <jms:outbound-channel-adapter id="chunkResponseSender" destination="batchChunkOutQueue" channel="chunkResponses"/>
    
        <integration:service-activator input-channel="chunkRequests" output-channel="chunkResponses" ref="chunkHandler"/>
    Any comment on this config is welcome of course

  6. #6
    Join Date
    May 2009
    Location
    Hamburg, Germany
    Posts
    9

    Default

    Thanks snicoll, this is a very helpful JMS sample configuration for a distributed scenario I was looking for. For other folks looking at the solution the queue element in the chunkStatus channel is the essential to get rid of the "dispatch has no subscriber" error message.

  7. #7
    Join Date
    Dec 2010
    Posts
    6

    Exclamation Give me complete config XML

    Hi,

    Can any body send me the complete XMl with name spaces for this configuration. I am trying to put it one of batch code but I am having some name spaces errors.

    Thanks in advace
    Vipin Goel

  8. #8
    Join Date
    May 2012
    Posts
    10

    Arrow Please help with jms request-reply configuration

    Hello,
    First of all thank you, snicoll, for posting your jms request-reply configuration. It took me ages to get past "dispatcher has no subscriber" issue until i stumbled upon your solution. However, in my case the client still fails to receive reply. It is blocked on waiting to receive reply and hangs indefinitely despite default-reply-timeout setting. I also tried the same example without specifying named reply-channel, so that the reply would be delivered via temporary reply queue and this never worked for me. I got "no output-channel or replyChannel header available" on the side of the service. I wonder if anyone could help debugging my configuration. Below is the project code:
    Client configuration
    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:context="http://www.springframework.org/schema/context"
        xmlns:util="http://www.springframework.org/schema/util"
        xmlns:jms="http://www.springframework.org/schema/jms"
        xmlns:si="http://www.springframework.org/schema/integration"
        xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
                http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
                http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.1.xsd
                http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.1.xsd
                http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd" >
    
    <import resource="hornetq-connection-factory.xml"/>
     
        <bean id="htmlProcessorMessageTemplate"  class="org.springframework.jms.core.JmsTemplate">
                    <property name="connectionFactory" ref="connectionFactory"/>
                    <property name="defaultDestinationName" value="htmlProcessorRequest" />
                    <property name="explicitQosEnabled" value="true"/> 
                    <property name="deliveryMode">
                        <util:constant static-field="javax.jms.DeliveryMode.NON_PERSISTENT" /></property>                                      
                    <property name="sessionAcknowledgeMode">
                        <util:constant static-field="javax.jms.Session.AUTO_ACKNOWLEDGE" />
                    </property>
         </bean>
    
        <si:gateway id="htmlProcessorGateway" service-interface="com.si.investigation.HtmlProcessorGateway"
              default-request-channel="htmlProcessorRequestChannel" default-reply-channel="htmlProcessorReplyChannel" default-reply-timeout="10" />
    
        <si:channel id="htmlProcessorRequestChannel" />
    
        <int-jms:outbound-channel-adapter channel="htmlProcessorRequestChannel" jms-template="htmlProcessorMessageTemplate"  />
    
        <si:channel id="htmlProcessorReplyChannel" > <si:queue capacity = "100"/></si:channel>
    
    </beans>
    Service configuration
    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:context="http://www.springframework.org/schema/context"
        xmlns:util="http://www.springframework.org/schema/util"
        xmlns:jms="http://www.springframework.org/schema/jms"
        xmlns:si="http://www.springframework.org/schema/integration"
        xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
                http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
                http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.1.xsd
                http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.1.xsd
                http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.1.xsd" >
    
        <import resource="hornetq-connection-factory.xml"/>
       
        <bean id="htmlProcessorServiceActivator" class="com.si.vestigation.HtmlProcessorServiceActivator" />
        <si:service-activator input-channel="htmlProcessorRequestChannel" ref="htmlProcessorServiceActivator"
          method="reply" output-channel="htmlProcessorReplyChannel" />
         
         <!-- Listen to incoming messages on the JMS request queue -->
        <si:channel id="htmlProcessorRequestChannel" />
        
        <int-jms:message-driven-channel-adapter id="htmlProcessorRequestChannelAdaptor"
          channel="htmlProcessorRequestChannel" destination-name="htmlProcessorRequest"
          connection-factory="connectionFactory" /> 
    
         <si:channel id="htmlProcessorReplyChannel" />  
         <int-jms:outbound-channel-adapter channel="htmlProcessorReplyChannel" destination-name="htmlProcessorReply"  connection-factory="connectionFactory" />
         
    </beans>
    Code:
    public interface HtmlProcessorGateway {
    
        @Gateway  
        String sendAndReceiveString(String msgout);
        
    }
    Many thanks

  9. #9
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,034

    Default

    If you want to do

    gateway->jms-outbound->jms-inbound->processandsendreply

    It would be better to use

    gateway->jms-outbound-gateway->jms-inbound-gateway->process

    because the framework will take care of all the message correlation and the result of the process will get back to the originating gateway.

    Hope that helps.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  10. #10
    Join Date
    May 2012
    Posts
    10

    Default

    Many thanks. it really helps. I have reconfigured as you suggested. there is one problem that I cannot solve with gateways. How can I force messages to be non-persistent. I figured out how to do this with channel adapters, but not with gateways. Once again thank you for your reply.

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •