Results 1 to 10 of 10

Thread: Replacing Spring AMQP Request-Reply With Spring Integration

  1. #1
    Join Date
    Mar 2008
    Posts
    11

    Unhappy Replacing Spring AMQP Request-Reply With Spring Integration

    All, I have a working application that currently uses Spring AMQP. When handling requests from the web tier, we use AMQP Template's sendAndReceive method which takes care of creating a temporary response queue and blocking for the response message to come back from RabbitMQ. I tried to replace the consumer side with Spring Integration using an InboundAdaper and InboundGateway (I tried both because it wasn't working properly for me and was experimenting). In both cases, the message was being properly pulled off the queue and handled. The issue was that Spring Integration would complain that no return channel was being provided -- the method processing the message returned a string. When I examined the incoming headers, I see that the AMQP reply-to property contains a valid value -- the temporary queue that the AMQP Template set up. Is this a supported scenario? Is Spring Integration supposed see the AMQP reply-to value can create some sort of dynamic channel? I tried configuring the AMQP Template so that instead of creating a temporary queue, it used a specific one and I bound my outbound channel to that queue. No dice. Still got the complaint about not having a reply channel. I'd love to let Spring Integration do some of the heavy lifting for me but I can't switch until I get request-reply to work properly. Any insight is appreciated.

    Thanks,
    Ron

  2. #2
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,844

    Default

    I believe the "no reply channel" error you are seeing is not related to the AMQP "replyTo" property but rather to the local replyChannel header on the Message when it reaches the end of a pipeline (where a response has been produced). Could you provide an excerpt of your configuration on the consuming ("inbound") side?

  3. #3
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,037

    Default

    The fact you are saying return "channel" and not "reply queue" tends to make me think you have a Spring Integration configuration issue. The inbound gateway does use the replyTo from the AMQP headers when sending it's reply.

    If you can show your SI config and/or a debug log/stack trace, we might have more insight.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  4. #4
    Join Date
    Mar 2008
    Posts
    11

    Default

    Thanks for the quick replies. I've attached the relevant files for your examination. I'm hoping it is something silly that I'm doing in my configuration.

    Thanks,
    Ron
    Attached Files Attached Files

  5. #5
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,037

    Default

    Change

    Code:
    <int-amqp:channel id="pingChannel" queue-name="ping-queue" encoding="UTF-8" connection-factory="connectionFactory" error-handler="errorHandler"/>
    to

    Code:
    <int:channel id="pingChannel" />
    The way you have it now, you are sending the inbound gateway request back out to AMQP (an amqp:channel is a channel backed by an amqp queue).

    Aside from the fact you really don't want to send it back out to AMQP, this process causes the replyChannel header to be lost.

    With an in-memory <channel/>, the service activator will be invoked directly on the gateway listener's thread.

    Hope that helps.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  6. #6
    Join Date
    Mar 2008
    Posts
    11

    Default

    Thank you for your reply. The configuration I had to use was this one, due to the fact that our AMQP connection factory bean is named "connectionFactory" and not "rabbitConnectionFactory":

    <int-amqp:channel id="pingChannel" connection-factory="connectionFactory"/>

    Unfortunately, it does not seemed to have helped. I still get errors like this one:

    amqp_replyTo = amq.gen-wVaqyFT4Qf7r0JqkL_vDVr
    amqp_contentType = UNKNOWN
    amqp_receivedExchange = ping-exchange
    amqp_contentEncoding = UTF-8
    amqp_redelivered = false
    amqp_messageId = 3D675262
    id = fb42346b-5a65-4708-9501-fc165341fa00
    timestamp = 1348233123838
    amqp_receivedRoutingKey = service.ping
    amqp_deliveryMode = NON_PERSISTENT
    amqp_timestamp = Fri Sep 21 09:12:03 EDT 2012
    amqp_deliveryTag = 1
    spring_return_correlation = a6aa4e23-5caa-4b2f-948d-83bb01053fab
    message-type = NORMAL
    W SimpleMessageListenerContainer Execution of Rabbit message listener failed, and no ErrorHandler has been set. org.springframework.amqp.rabbit.listener.ListenerE xecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractM essageListenerContainer.wrapToListenerExecutionFai ledExceptionIfNeeded(AbstractMessageListenerContai ner.java:638)
    at org.springframework.amqp.rabbit.listener.AbstractM essageListenerContainer.doInvokeListener(AbstractM essageListenerContainer.java:577)
    Caused by: org.springframework.integration.MessagingException : Failure occured in AMQP listener while attempting to convert and dispatch Message.
    at org.springframework.integration.amqp.channel.Abstr actSubscribableAmqpChannel$DispatchingMessageListe ner.onMessage(AbstractSubscribableAmqpChannel.java :157)
    at org.springframework.amqp.rabbit.listener.AbstractM essageListenerContainer.doInvokeListener(AbstractM essageListenerContainer.java:575)
    Caused by: org.springframework.integration.support.channel.Ch annelResolutionException: no output-channel or replyChannel header available

    I dump the incoming message headers and it appears that the reply-to queue from Spring AMQP is getting sent but not converted to an outbound channel that can send the response to the temporary queue that the client is waiting on. Forgive my ignorance, but I am curious about your statement "you really don't want to send it back out to AMQP". Ultimately, the caller is waiting on the temporary response queue that the Spring AMQP template created as part of the sendAndReceive call so don't I want to send the response message back out via AMQP? I noticed that when I hovered over the int-amqp:inbound-gateway tag in IntelliJ, it popped up "Configures a gateway that will receive AMQP Messages sent to a given queue and then forward those messages to a Message Channel. If a reply Message is returned, it will also send that to the 'replyTo' provide by the AMQP request Message." This got me to thinking that my simple service interface should return a Message<String> instead of String. Unfortunately, this didn't seem to make a difference. Any other suggestions you have are appreciated.

    Thanks,
    Ron

  7. #7
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,037

    Default

    I think you missed the crux of my point; you need to change

    Code:
    <int-amqp:channel .../>
    to
    Code:
    <int:channel/>
    An <int-amqp:channel/> is a <channel/> that sends and receives from amqp.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  8. #8
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,037

    Default

    In other words...

    Code:
        <int-amqp:inbound-gateway id="inboundGateway"
                                  request-channel="pingChannel"
                                  queue-names="ping-queue"
                                  connection-factory="connectionFactory"
                                  mapped-request-headers="*"
                                  mapped-reply-headers="*"
                                  error-handler="errorHandler"/>
    
    
        <int:channel id="pingChannel" />
    
        <int:service-activator id="pingServiceActivator"
                               input-channel="pingChannel"
                               requires-reply="true">
            <bean class="com.transparent.symphone.adapter.inbound.amqp.ping.SimplePingService"/>
        </int:service-activator>
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  9. #9
    Join Date
    Mar 2008
    Posts
    11

    Thumbs up

    Thanks! That seems to have done the trick. I obviously do not understand the nuances between the two channel types and would love a quick explanation. I had a minor error in my XML that prevented me from trying the solution out of the box (I was missing some location information). For those of you who may be interested, here is the final working configuration:

    Code:
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:int="http://www.springframework.org/schema/integration"
           xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                               http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
                               http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd">
    
    
        <int:channel id="pingChannel"/>
    
    
        <int-amqp:inbound-gateway id="inboundGateway"
                                  request-channel="pingChannel"
                                  queue-names="ping-queue"
                                  connection-factory="connectionFactory"
                                  mapped-request-headers="*"
                                  mapped-reply-headers="*"
                                  error-handler="errorHandler"/>
    
    
        <int:service-activator id="pingServiceActivator"
                               input-channel="pingChannel"
                               requires-reply="true">
            <bean class="com.transparent.symphone.adapter.inbound.amqp.ping.SimplePingService"/>
        </int:service-activator>
    </beans>
    Thank you for all the help.

    Ron

  10. #10
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,037

    Default

    would love a quick explanation
    <int:channel/> is simply a "connector" between the amqp inbound gateway and the service activator.

    In this case, the connector is a DirectChannel, which means the service method is called directly on the thread that receives the amqp message.

    You could declare it thus:
    Code:
    <int:channel id="pingChannel">
        <queue/>
    <int:channel>
    which would use a QueueChannel (and you'd need a poller on the service activator to "poll" the queue for messages).

    A third option is
    Code:
    <int:channel id="pingChannel">
        <dispatcher task-executor="myTaskExecutor" />
    <int:channel>
    In this case the inbound adapter thread hands off the work to another thread (supplied by the task-executor).


    In all three cases, this is all done entirely in memory.


    Now, let's say you have an application that gets a request, sends it to a QueueChannel, and it's consumed by some service.

    Code:
    <int-http:inbound-adapter ...>
    
    <int:channel id="foo">
        <queue />
    </int:channel>
    
    <int:service ... />
    The problem with this is you could have messages in the QueueChannel which would be lost if the server crashes.

    In this case, you might decide to replace the channel with an <int-amqp:channel .../> because then, the messages will be persisted to an amqp queue, and therefore won't be lost with a server crash.

    Hope that helps.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

Tags for this Thread

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •