View Full Version : Asynchronous request reply correlation
SimonT
Jul 10th, 2008, 10:26 AM
Does Spring Integration support Asynchronous request reply correlation?
Given the following sequence:
- Send Request on channel 1
- Do some asynchronous processing
- Received Response on channel 2
Does Spring integration have some facility to corrolate the response with the request or do I have to cache manually my original request until I get the response back and do the correlation by hand.
If it does,
how do we specify that the message received on the response channel will be a reponse and not a request?
how does it handle errors (no reply). Do we have to do some sort of garbage collection of pending reponse to request?
Any code sample or examples (always appreciated :.)?
Thanks,
Simon
stevedick
Aug 21st, 2008, 09:57 AM
Yes, SI (Spring Integration) can do the heavy lifting for asynch correlation, but there's work to do on your part (in my limited experience with SI) to get it to work. I'm using SI 1.0m5, so this might be different from the version you are using (I'm also assuming you are using JMS).
I created a producer:
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans ...>
<context:annotation-config/>
<annotation-driven/>
<message-bus/>
<channel id="eventsChannel">
<interceptor ref="outgoingInterceptor"/>
</channel>
<channel id="replyChannel">
<interceptor ref="incomingInterceptor"/>
</channel>
<beans:bean id="outgoingInterceptor" class="org.springframework.integration.transformer.Messag eTransformingChannelInterceptor">
<beans:constructor-arg ref="eventToXmlTransformer"/>
</beans:bean>
<beans:bean id="incomingInterceptor" class="com.xnife.event.common.example.interceptor.HandleC orrelationId"/>
<jms-target id="jmsTarget" destination-name="com.xnife.events"/>
<beans:bean id="eventToXmlTransformer" class="com.xnife.event.common.example.transformer.EventTo Xml"/>
<jms-gateway
request-channel="replyChannel"
connection-factory="connectionFactory"
destination-name="com.xnife.events.reply"
expect-reply="false"/>
<channel-adapter channel="eventsChannel" target="jmsTarget"/>
<beans:bean id="doSomethingService" class="com.xnife.event.producer.example.internal.service. DoSomethingServiceImpl"/>
<gateway id="publishEventServiceService"
service-interface="com.xnife.event.producer.example.service.PublishEv entService"
request-channel="eventsChannel"
reply-channel="replyChannel"/>
<beans:bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<!-- we assume the activemq broker is running on the local host at the default port -->
<beans:property name="brokerURL" value="tcp://localhost:61616"/>
</beans:bean>
<beans:bean id="producer" class="com.xnife.event.producer.example.Producer"/>
</beans:beans>
and a consumer:
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans ...>
<context:annotation-config/>
<annotation-driven/>
<message-bus/>
<channel id="eventsChannel">
<interceptor ref="incomingInterceptor"/>
</channel>
<channel id="replyChannel"/>
<service-activator input-channel="eventsChannel" ref="eventProcessorService" method="processEvent" output-channel="replyChannel"/>
<beans:bean id="eventProcessorService" class="com.xnife.event.consumer.example.internal.service. EventProcessorServiceImpl"/>
<beans:bean id="incomingInterceptor" class="org.springframework.integration.transformer.Messag eTransformingChannelInterceptor">
<beans:constructor-arg ref="xmlToEventTransformer"/>
</beans:bean>
<beans:bean id="xmlToEventTransformer" class="com.xnife.event.common.example.transformer.XmlToEv ent"/>
<jms-source id="jmsSource" connection-factory="connectionFactory" destination-name="com.xnife.events"/>
<channel-adapter source="jmsSource" channel="eventsChannel"/>
<jms-target id="jmsTarget" connection-factory="connectionFactory" destination-name="com.xnife.events.reply"/>
<channel-adapter channel="replyChannel" target="jmsTarget"/>
<jms-gateway
id="jmsInbound"
connection-factory="connectionFactory"
destination-name="com.xnife.events"
request-channel="eventsChannel"/>
<beans:bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<!-- we assume the activemq broker is running on the local host at the default port -->
<beans:property name="brokerURL" value="tcp://localhost:61616"/>
</beans:bean>
</beans:beans>
From a correlation point-of-view, it's the interceptors that are interesting.
I'm using objects within the code, but XML for the JMS payload. The producer's outgoing interceptor adds the SI message ID to the XML which will be copied onto the reply message sent by the consumer. This is because SI uses its internal message ID as the expected correlation ID.
SI will not propagate the IDs used internally to the JMS message ID/correlation ID.
The producer has another interceptor on the incoming channel so that the ID from the XML can be used to set the message's correlation ID. Watch out, the correlation ID should be an UUID object, not a string - that cost me plenty of time in debugging!
Mark Fisher
Aug 21st, 2008, 10:18 AM
First of all, thanks for providing this useful post. It also reveals to me that we are not quite making things simple enough yet :)
There is one improvement in M6 that might be relevant depending on how you are using the correlationId. Now, the JMSCorrelationId is copied to/from the Spring Integration MessageHeaders when receiving/sending with JmsSource/JmsTarget. Its key in the MessageHeaders is JmsHeaders.CORRELATION_ID. However, the JMS Message id/correlationId are kept separate from the Spring Integration Message id/correlationId. That is intentional, but if you want to copy the Spring Integration Message id to a JMSCorrelationId, something like the following should do the trick (in M6):
Message outboundMessage = MessageBuilder.fromMessage(inboundMessage)
.setHeader(JmsHeaders.CORRELATION_ID, inboundMessage.getHeaders().getId())
.build();
stevedick
Aug 25th, 2008, 09:33 AM
The M6 change helps a little, but I feel there's something off in the abstraction around handling the correlation stuff.
I see why you'd want to keep the message and JMS correlation seperate, so that's not the issue.
I'm thinking there needs to be some mapper or strategy interface for handling the correlation because having to copy the SI message ID is, in my opinion, breaking encapsulation.
Powered by vBulletin® Version 4.2.1 Copyright © 2013 vBulletin Solutions, Inc. All rights reserved.