Results 1 to 4 of 4

Thread: Asynchronous request reply correlation

  1. #1
    Join Date
    Jan 2008
    Posts
    11

    Default Asynchronous request reply correlation

    Does Spring Integration support Asynchronous request reply correlation?

    Given the following sequence:

    - Send Request on channel 1
    - Do some asynchronous processing
    - Received Response on channel 2

    Does Spring integration have some facility to corrolate the response with the request or do I have to cache manually my original request until I get the response back and do the correlation by hand.

    If it does,
    how do we specify that the message received on the response channel will be a reponse and not a request?
    how does it handle errors (no reply). Do we have to do some sort of garbage collection of pending reponse to request?

    Any code sample or examples (always appreciated :.)?

    Thanks,

    Simon

  2. #2
    Join Date
    Aug 2008
    Posts
    20

    Default

    Yes, SI (Spring Integration) can do the heavy lifting for asynch correlation, but there's work to do on your part (in my limited experience with SI) to get it to work. I'm using SI 1.0m5, so this might be different from the version you are using (I'm also assuming you are using JMS).

    I created a producer:

    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans:beans ...>
    
        <context:annotation-config/>
    
        <annotation-driven/>
    
        <message-bus/>
        
        <channel id="eventsChannel">
           	<interceptor ref="outgoingInterceptor"/>
        </channel>
        
        <channel id="replyChannel">
            <interceptor ref="incomingInterceptor"/>
        </channel>
        
        <beans:bean id="outgoingInterceptor" class="org.springframework.integration.transformer.MessageTransformingChannelInterceptor">
            <beans:constructor-arg ref="eventToXmlTransformer"/>
        </beans:bean>
        
        <beans:bean id="incomingInterceptor" class="com.xnife.event.common.example.interceptor.HandleCorrelationId"/>
    
        <jms-target id="jmsTarget" destination-name="com.xnife.events"/>
        
        <beans:bean id="eventToXmlTransformer" class="com.xnife.event.common.example.transformer.EventToXml"/>
        
        <jms-gateway 
            request-channel="replyChannel" 
            connection-factory="connectionFactory" 
            destination-name="com.xnife.events.reply"
            expect-reply="false"/>
         
        <channel-adapter channel="eventsChannel" target="jmsTarget"/> 
    
        <beans:bean id="doSomethingService" class="com.xnife.event.producer.example.internal.service.DoSomethingServiceImpl"/>
    
        <gateway id="publishEventServiceService"
             service-interface="com.xnife.event.producer.example.service.PublishEventService"
             request-channel="eventsChannel"
             reply-channel="replyChannel"/>
    
        <beans:bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <!--  we assume the activemq broker is running on the local host at the default port  --> 
            <beans:property name="brokerURL" value="tcp://localhost:61616"/>
        </beans:bean>
        
        <beans:bean id="producer" class="com.xnife.event.producer.example.Producer"/>
     
    </beans:beans>
    and a consumer:

    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans:beans ...>
    
        <context:annotation-config/>
    
        <annotation-driven/>
    
        <message-bus/>
        
        <channel id="eventsChannel">
            <interceptor ref="incomingInterceptor"/>
        </channel>
        
        <channel id="replyChannel"/>
        
        <service-activator input-channel="eventsChannel" ref="eventProcessorService" method="processEvent" output-channel="replyChannel"/>
    
        <beans:bean id="eventProcessorService" class="com.xnife.event.consumer.example.internal.service.EventProcessorServiceImpl"/>
            
        <beans:bean id="incomingInterceptor" class="org.springframework.integration.transformer.MessageTransformingChannelInterceptor">
            <beans:constructor-arg ref="xmlToEventTransformer"/>
        </beans:bean>
        
        <beans:bean id="xmlToEventTransformer" class="com.xnife.event.common.example.transformer.XmlToEvent"/>
        
        <jms-source id="jmsSource" connection-factory="connectionFactory" destination-name="com.xnife.events"/>
        
        <channel-adapter source="jmsSource" channel="eventsChannel"/> 
        
        <jms-target id="jmsTarget" connection-factory="connectionFactory" destination-name="com.xnife.events.reply"/>
        
        <channel-adapter channel="replyChannel" target="jmsTarget"/>
        
        <jms-gateway 
          id="jmsInbound" 
          connection-factory="connectionFactory"
          destination-name="com.xnife.events" 
          request-channel="eventsChannel"/>
         
        <beans:bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <!--  we assume the activemq broker is running on the local host at the default port  --> 
            <beans:property name="brokerURL" value="tcp://localhost:61616"/>
        </beans:bean>
        
    </beans:beans>
    From a correlation point-of-view, it's the interceptors that are interesting.

    I'm using objects within the code, but XML for the JMS payload. The producer's outgoing interceptor adds the SI message ID to the XML which will be copied onto the reply message sent by the consumer. This is because SI uses its internal message ID as the expected correlation ID.

    SI will not propagate the IDs used internally to the JMS message ID/correlation ID.

    The producer has another interceptor on the incoming channel so that the ID from the XML can be used to set the message's correlation ID. Watch out, the correlation ID should be an UUID object, not a string - that cost me plenty of time in debugging!

  3. #3
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,840

    Default

    First of all, thanks for providing this useful post. It also reveals to me that we are not quite making things simple enough yet

    There is one improvement in M6 that might be relevant depending on how you are using the correlationId. Now, the JMSCorrelationId is copied to/from the Spring Integration MessageHeaders when receiving/sending with JmsSource/JmsTarget. Its key in the MessageHeaders is JmsHeaders.CORRELATION_ID. However, the JMS Message id/correlationId are kept separate from the Spring Integration Message id/correlationId. That is intentional, but if you want to copy the Spring Integration Message id to a JMSCorrelationId, something like the following should do the trick (in M6):
    Code:
    Message outboundMessage = MessageBuilder.fromMessage(inboundMessage)
            .setHeader(JmsHeaders.CORRELATION_ID, inboundMessage.getHeaders().getId())
            .build();

  4. #4
    Join Date
    Aug 2008
    Posts
    20

    Default

    The M6 change helps a little, but I feel there's something off in the abstraction around handling the correlation stuff.

    I see why you'd want to keep the message and JMS correlation seperate, so that's not the issue.

    I'm thinking there needs to be some mapper or strategy interface for handling the correlation because having to copy the SI message ID is, in my opinion, breaking encapsulation.

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •