View Full Version : confused about mesage gateway with JMS
snicoll
Dec 21st, 2009, 07:15 AM
Hi,
I am trying to port an example of the Spring Batch samples to JMS. The sample is available here[1]
Basically, the ChunkMessageChannelItemWriter sends a request objet to the request channel and wait the responses coming back in the response channel.
The message endpoint is the ChunkProcessorChunkHandler.
I would like to upgrade this to a JMS-based example so that:
* The request and response objects are handled by JMS queues
* The message endpoint is an MessageListener where the number of concurrent theads can be configured
(Long story short: upgrade this sample to a multi-thread / multi-process) sample.
I have tried the following but the execution is executed in a single thread (probably missing an indirection)
<integration:annotation-config/>
<integration:channel id="requests"/>
<integration:channel id="replies">
<integration:queue/>
</integration:channel>
<integration:poller max-messages-per-poll="1" id="defaultPoller"
default="true">
<integration:interval-trigger interval="3000"/>
</integration:poller>
<bean id="messagingGateway" class="org.springframework.integration.gateway.SimpleMess agingGateway">
<property name="requestChannel" ref="requests"/>
<property name="replyChannel" ref="replies"/>
<property name="replyTimeout" value="1000"/>
</bean>
<jms:message-driven-channel-adapter id="jmsIn" destination="batchChunkInQueue" channel="requests"/>
<jms:outbound-channel-adapter id="jmsOut" destination="batchChunkOutQueue" channel="replies"/>
<integration:service-activator input-channel="requests" output-channel="replies" ref="chunkHandler"/>
Any advice or a reference to the documentation would be appreciated.
Thanks!
[1] https://src.springframework.org/svn/spring-batch/trunk/spring-batch-integration/src/test/resources/org/springframework/batch/integration/chunk/ChunkStepIntegrationTests-context.xml
Mark Fisher
Dec 21st, 2009, 08:39 AM
The default number of concurrent consumers for the underlying message listener container is 1. You can modify that setting through the 'concurrent-consumers' and/or 'max-concurrent-consumers' attributes of the "message-driven-channel-adapter" element.
Please post back to let us know if that solves your problem.
snicoll
Dec 21st, 2009, 09:11 AM
Nope it did not. I think I am missing some base concepts here.
Here's what I want:
* Request is a JMS queue
* Response is a JMS queue
* ChunkHandler receives an object from the request queue and sends back a response to the response queue (Service Activator)
* Regarding the messaging gateway, a call to send should send a JMS message on the request queue and a call to receive should act as a standard MessageListener on the response queue
* The number of concurrent ChunkHandler instances reading for message can be configured (on-the-fly configuration as well)
<integration:channel id="requests"/>
<integration:channel id="replies">
<integration:queue/>
</integration:channel>
<integration:poller max-messages-per-poll="1" id="defaultPoller"
default="true">
<integration:interval-trigger interval="3000"/>
</integration:poller>
<bean id="messagingGateway" class="org.springframework.integration.gateway.SimpleMess agingGateway">
<property name="requestChannel" ref="requests"/>
<property name="replyChannel" ref="replies"/>
<property name="replyTimeout" value="1000"/>
</bean>
<jms:message-driven-channel-adapter id="jmsIn" destination="batchChunkInQueue"
channel="requests" concurrent-consumers="5"/>
<jms:outbound-channel-adapter id="jmsOut" destination="batchChunkOutQueue" channel="replies"/>
<integration:service-activator input-channel="requests" output-channel="replies" ref="chunkHandler"/>
Thanks
snicoll
Dec 22nd, 2009, 05:28 AM
OK. things are improving a bit but I am still puzzled that I can't find a working example of a JMS request/reply mechanism with SI.
Here's the updated config (probably ugly and overcomplicated)
<integration:annotation-config/>
<integration:channel id="chunkOutput"/>
<integration:channel id="chunkStatus"/>
<bean id="messagingGateway" class="org.springframework.integration.gateway.SimpleMess agingGateway">
<property name="requestChannel" ref="chunkOutput"/>
<property name="replyChannel" ref="chunkStatus"/>
<property name="replyTimeout" value="1000"/>
</bean>
<jms:outbound-gateway id="chunkSender"
request-channel="chunkOutput"
request-destination="batchChunkInQueue"/>
<jms:inbound-gateway id="chunkResponseReceiver" request-channel="chunkStatus"
request-destination="batchChunkOutQueue"/>
<!-- Messages are received from the batchChunkInQueue and send to the chunkRequests channel. These
are processed by the chunkHandler service activator -->
<integration:channel id="chunkRequests"/>
<integration:channel id="chunkResponses"/>
<jms:inbound-gateway id="chunkReceiver" request-channel="chunkRequests"
request-destination="batchChunkInQueue"/>
<jms:outbound-gateway id="chunkResponseSender"
request-channel="chunkResponses"
request-destination="batchChunkOutQueue"/>
<integration:service-activator input-channel="chunkRequests" output-channel="chunkResponses" ref="chunkHandler"/>
All works well until the receiver is supposed to dispatch the response
2009-12-22 11:20:29 [org.springframework.jms.listener.DefaultMessageLis tenerContainer#0-1] DirectChannel [DEBUG] preSend on channel 'chunkStatus', message: [Payload=ChunkResponse: jobId=1, stepContribution=[StepContribution: read=0, written=0, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], successful=false][Headers={springintegration_jms_messageId=ID:llnp18 6-2335-1261477138812-2:3:1:1:1, springintegration_replyChannel=org.springframework .integration.channel.MessageChannelTemplate$Tempor aryReplyChannel@30cd64, springintegration_timestamp=1261477229031, springintegration_id=d77a2055-13e4-4327-a38c-7f4e4995c380, springintegration_jms_redelivered=false, springintegration_jms_replyTo=temp-queue://ID:llnp186-2335-1261477138812-2:3:1, springintegration_errorChannel=org.springframework .integration.channel.MessageChannelTemplate$Tempor aryReplyChannel@30cd64}]
2009-12-22 11:20:29 [org.springframework.jms.listener.DefaultMessageLis tenerContainer#0-1] DefaultMessageListenerContainer [WARN] Execution of JMS message listener failed
org.springframework.integration.message.MessageDel iveryException: Dispatcher has no subscribers.
at org.springframework.integration.dispatcher.Unicast ingDispatcher.doDispatch(UnicastingDispatcher.java :97)
at org.springframework.integration.dispatcher.Unicast ingDispatcher.dispatch(UnicastingDispatcher.java:9 0)
at org.springframework.integration.channel.AbstractSu bscribableChannel.doSend(AbstractSubscribableChann el.java:43)
at org.springframework.integration.channel.AbstractMe ssageChannel.send(AbstractMessageChannel.java:116)
at org.springframework.integration.channel.AbstractMe ssageChannel.send(AbstractMessageChannel.java:94)
at org.springframework.integration.channel.MessageCha nnelTemplate.doSend(MessageChannelTemplate.java:22 3)
at org.springframework.integration.channel.MessageCha nnelTemplate.doSendAndReceive(MessageChannelTempla te.java:248)
at org.springframework.integration.channel.MessageCha nnelTemplate.sendAndReceive(MessageChannelTemplate .java:215)
at org.springframework.integration.channel.MessageCha nnelTemplate.sendAndReceive(MessageChannelTemplate .java:203)
at org.springframework.integration.jms.ChannelPublish ingJmsMessageListener.onMessage(ChannelPublishingJ msMessageListener.java:209)
at org.springframework.jms.listener.AbstractMessageLi stenerContainer.doInvokeListener(AbstractMessageLi stenerContainer.java:518)
at org.springframework.jms.listener.AbstractMessageLi stenerContainer.invokeListener(AbstractMessageList enerContainer.java:479)
at org.springframework.jms.listener.AbstractMessageLi stenerContainer.doExecuteListener(AbstractMessageL istenerContainer.java:451)
at org.springframework.jms.listener.AbstractPollingMe ssageListenerContainer.doReceiveAndExecute(Abstrac tPollingMessageListenerContainer.java:323)
at org.springframework.jms.listener.AbstractPollingMe ssageListenerContainer.receiveAndExecute(AbstractP ollingMessageListenerContainer.java:261)
at org.springframework.jms.listener.DefaultMessageLis tenerContainer$AsyncMessageListenerInvoker.invokeL istener(DefaultMessageListenerContainer.java:982)
at org.springframework.jms.listener.DefaultMessageLis tenerContainer$AsyncMessageListenerInvoker.execute OngoingLoop(DefaultMessageListenerContainer.java:9 74)
at org.springframework.jms.listener.DefaultMessageLis tenerContainer$AsyncMessageListenerInvoker.run(Def aultMessageListenerContainer.java:876)
at java.lang.Thread.run(Thread.java:619)
Apparently, my message gateway is not susbribed to the chunkStatus channel?
snicoll
Dec 23rd, 2009, 08:45 AM
replying to self:
<integration:annotation-config/>
<integration:channel id="chunkOutput"/>
<integration:channel id="chunkStatus">
<integration:queue/>
</integration:channel>
<bean id="messagingGateway" class="org.springframework.integration.gateway.SimpleMess agingGateway">
<property name="requestChannel" ref="chunkOutput"/>
<property name="replyChannel" ref="chunkStatus"/>
<property name="replyTimeout" value="1000"/>
</bean>
<jms:outbound-channel-adapter id="chunkSender" destination="batchChunkInQueue" channel="chunkOutput"/>
<jms:message-driven-channel-adapter id="chunkResponseReceiver" destination="batchChunkOutQueue"
channel="chunkStatus"/>
<!-- Messages are received from the batchChunkInQueue and send to the chunkRequests channel. These
are processed by the chunkHandler service activator -->
<integration:channel id="chunkRequests"/>
<integration:channel id="chunkResponses"/>
<jms:message-driven-channel-adapter id="chunkReceiver" destination="batchChunkInQueue" channel="chunkRequests"/>
<jms:outbound-channel-adapter id="chunkResponseSender" destination="batchChunkOutQueue" channel="chunkResponses"/>
<integration:service-activator input-channel="chunkRequests" output-channel="chunkResponses" ref="chunkHandler"/>
Any comment on this config is welcome of course :)
lschmidt
Jun 10th, 2010, 04:16 PM
Thanks snicoll, this is a very helpful JMS sample configuration for a distributed scenario I was looking for. For other folks looking at the solution the queue element in the chunkStatus channel is the essential to get rid of the "dispatch has no subscriber" error message.
goel_veenu
Dec 6th, 2010, 09:25 AM
Hi,
Can any body send me the complete XMl with name spaces for this configuration. I am trying to put it one of batch code but I am having some name spaces errors.
Thanks in advace
Vipin Goel
juliaF
May 17th, 2012, 11:32 AM
Hello,
First of all thank you, snicoll, for posting your jms request-reply configuration. It took me ages to get past "dispatcher has no subscriber" issue until i stumbled upon your solution. However, in my case the client still fails to receive reply. It is blocked on waiting to receive reply and hangs indefinitely despite default-reply-timeout setting. I also tried the same example without specifying named reply-channel, so that the reply would be delivered via temporary reply queue and this never worked for me. I got "no output-channel or replyChannel header available" on the side of the service. I wonder if anyone could help debugging my configuration. Below is the project code:
Client configuration
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:si="http://www.springframework.org/schema/integration"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.1.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.1.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd" >
<import resource="hornetq-connection-factory.xml"/>
<bean id="htmlProcessorMessageTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="defaultDestinationName" value="htmlProcessorRequest" />
<property name="explicitQosEnabled" value="true"/>
<property name="deliveryMode">
<util:constant static-field="javax.jms.DeliveryMode.NON_PERSISTENT" /></property>
<property name="sessionAcknowledgeMode">
<util:constant static-field="javax.jms.Session.AUTO_ACKNOWLEDGE" />
</property>
</bean>
<si:gateway id="htmlProcessorGateway" service-interface="com.si.investigation.HtmlProcessorGateway"
default-request-channel="htmlProcessorRequestChannel" default-reply-channel="htmlProcessorReplyChannel" default-reply-timeout="10" />
<si:channel id="htmlProcessorRequestChannel" />
<int-jms:outbound-channel-adapter channel="htmlProcessorRequestChannel" jms-template="htmlProcessorMessageTemplate" />
<si:channel id="htmlProcessorReplyChannel" > <si:queue capacity = "100"/></si:channel>
</beans>
Service configuration
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:si="http://www.springframework.org/schema/integration"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.1.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.1.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.1.xsd" >
<import resource="hornetq-connection-factory.xml"/>
<bean id="htmlProcessorServiceActivator" class="com.si.vestigation.HtmlProcessorServiceActivator" />
<si:service-activator input-channel="htmlProcessorRequestChannel" ref="htmlProcessorServiceActivator"
method="reply" output-channel="htmlProcessorReplyChannel" />
<!-- Listen to incoming messages on the JMS request queue -->
<si:channel id="htmlProcessorRequestChannel" />
<int-jms:message-driven-channel-adapter id="htmlProcessorRequestChannelAdaptor"
channel="htmlProcessorRequestChannel" destination-name="htmlProcessorRequest"
connection-factory="connectionFactory" />
<si:channel id="htmlProcessorReplyChannel" />
<int-jms:outbound-channel-adapter channel="htmlProcessorReplyChannel" destination-name="htmlProcessorReply" connection-factory="connectionFactory" />
</beans>
public interface HtmlProcessorGateway {
@Gateway
String sendAndReceiveString(String msgout);
}
Many thanks
Gary Russell
May 17th, 2012, 11:47 AM
If you want to do
gateway->jms-outbound->jms-inbound->processandsendreply
It would be better to use
gateway->jms-outbound-gateway->jms-inbound-gateway->process
because the framework will take care of all the message correlation and the result of the process will get back to the originating gateway.
Hope that helps.
juliaF
May 29th, 2012, 02:28 PM
Many thanks. it really helps. I have reconfigured as you suggested. there is one problem that I cannot solve with gateways. How can I force messages to be non-persistent. I figured out how to do this with channel adapters, but not with gateways. Once again thank you for your reply.
Gary Russell
May 29th, 2012, 02:58 PM
Here are some examples from the test cases. You can control the persistence of both the request and the reply...
<jms:outbound-gateway id="jmsGateway"
request-destination-name="requestQueue"
request-channel="requestChannel"
explicit-qos-enabled="true"
delivery-persistent="true"/>
<jms:inbound-gateway id="gatewayWithReplyQos"
request-destination-name="testDestinationName"
request-channel="requestChannel"
reply-time-to-live="12345"
reply-priority="7"
reply-delivery-persistent="false"
explicit-qos-enabled-for-replies="true"/>
dgomesbr
Jul 17th, 2012, 03:12 PM
Gary,
I've stumble upon your post while trying to configure amq x spring batch. Could you post your solution or xml's associated to it on GitHub?
Gary Russell
Jul 17th, 2012, 03:17 PM
This is a really old thread, that was reawakened recently about configuring message persistence.
I suggest you start a new thread, and explain exactly what you need.
dgomesbr
Jul 17th, 2012, 03:18 PM
Thanks in advance, will do that.
Powered by vBulletin® Version 4.2.1 Copyright © 2013 vBulletin Solutions, Inc. All rights reserved.