Results 1 to 10 of 11

Thread: Problem w/ polyglot rpc.

Hybrid View

  1. #1

    Default Problem w/ polyglot rpc.

    Greetings,

    I'm attempting to invoke an rpc using RabbitTemplate.convertSendAndReceive as the client and amqp_rpc_server as the server. The convertSendAndReceive is using the two parameter signature, and the appropriate handle_info/2 callback is being invoked properly.

    The problem I'm encountering is that the reply queue that's created by the RabbitTemplate seems to only exist for an extremely short period of time. The response's basic publish's 'mandatory' property is set to 'true', which yields a 312, NO_ROUTE result.

    I've stepped through the code w/ a debugger while simultaneously observing the RabbitMQ Management browser display and the queue does in fact appear. However, once the client request message has been sent, it seemingly nukes.

    If it makes any difference, the rpc client's thread that invokes the initial request is part from an ExecutorService.

    Any thoughts on why the reply queue's lifespan is so short? Has anyone else attempted polyglot rpc in this way?

    Thanks!
    -andy

  2. #2
    Join Date
    Jun 2005
    Posts
    4,231

    Default

    That's an interesting observation, and not really connected with the fact that the server is not Java. The temporary reply queue is auto-delete so it disappears when the Channel is closed. I would hope that the channel stays open long enough to get the response, but maybe that depends on your settings. Are you using a CachingConnectionFactory? What is your channelTransacted setting in the template? Do you have an ongoing transaction during the call to sendAndReceive?

  3. #3

    Default

    Are you using a CachingConnectionFactory?
    Yes.

    What is your channelTransacted setting in the template?
    It's not explicitly set, so it looks like it'd be false.

    Do you have an ongoing transaction during the call to sendAndReceive?
    No.

    thanks for following up!

  4. #4
    Join Date
    Jun 2005
    Posts
    4,231

    Default

    I'm still interested in your settings, but I had another look at the template internals and I actually can't see any reason why it would be a problem, so maybe the server is not implemented correctly after all? Is the server publishing to the default exchange?

    On the other hand, the RabbitTemplate should wait up to 5 seconds for a reply (replyTimeout property default value) and if it doesn't wait that long then it must have received the reply. So that's confusing as well. Is the server sending the response twice?

  5. #5

    Default

    Is the server publishing to the default exchange?
    Yes, it's publishing to the default exchange. I should have provided additional information about the server code. It's RabbitMQ's amqp_rpc_server Erlang module.

    On the other hand, the RabbitTemplate should wait up to 5 seconds for a reply (replyTimeout property default value) and if it doesn't wait that long then it must have received the reply.
    Agreed. What's equally bizarre is that the message converter doesn't get invoked for the/a response, ever. It does get called for converting the outbound request.

    Is the server sending the response twice?
    It doesn't appear so, at least from the code. The gist of it, for readers not terribly familiar w/ Erlang, is that it takes a Connection, creates the Channel, declares the queue (specified by the user), and registers an asynchronous consumer (which is handled by the OTP handle_info/2 callback). A user provided callback function (that provides the response payload) is stored in the gen_server's loop data, and it gets invoked upon receipt of a message. The RabbitMQ code does all the "heavy lifting" concerning pulling the reply_to value out of the request and subsequently posting the response to the default exchange, using the provided reply_to value as the routing key. That code looks totally straight forward and I think it's implemented properly. I'd be happy to provide the NO_ROUTE error message term if you think it may be of use, but that too seems completely legit in that the reply_to queue gets nuked way before five seconds (that setting remains at the default).

    Can I assume that my configuration is conducive to rpc? Under what scenarios does this code generally get tested?

    thanks for the thought cycles!

  6. #6

    Default

    I've figured out the problem. Essentially, it's a "convention" issue between Spring's RabbitTemplate and RabbitMQ's amqp_rpc_server Erlang module. I suppose mileage may vary depending on broker version/configuration; I'm currently running 2.5.0 w/ out-of-box settings.

    At first blush, this looks like the RabbitTemplate is making a poor assumption in that the reply_to value will be parsed rather than simply taken at face value. According to the RabbitMQ Management browser, the name of the reply queue that RabbitTemplate creates is something along the lines of
    Code:
    amq.gen-oAuYwfI6hdoBPOT9SUIkmg==
    , however, the reply_to value that's actually sent in the message is something along the lines of
    Code:
    direct:///amq.gen-oAuYwfI6hdoBPOT9SUIkmg==
    , which is what the amqp_rpc_server takes at face value as a routing_key for the response. Since there isn't an established direct:///amq.gen-oAuYwfI6hdoBPOT9SUIkmg== queue, the response errors w/ a NO_ROUTE.

    Not being intimately familiar w/ the spec, is utilizing the org.springframework.amqp.core.Address approach for the reply_to value acceptable? In other words, would all brokers know how to properly route reply messages that use that convention? If not, this seems like a minor defect in that the
    Code:
    message.getMessageProperties().setReplyTo(replyToAddress);
    should actually be
    Code:
    message.getMessageProperties().setReplyTo(queueDeclaration.getQueue());
    .

    Any thoughts?

    thanks!

  7. #7
    Join Date
    Jun 2005
    Posts
    4,231

    Default

    It's not really the broker that is responsible for interpreting the reply-to header, it's the consumer (which in your case is Erlang, and very well might be in the same VM as the broker, but it isn't the broker). The binding uri format that we use by default is a convention that many AMQP clients adopt, but I don't think it's part of the spec. You have two choices: change the consumer, or change the producer. I have no idea how you would change the consumer, but that's just my ignorance of Erlang. The producer would need to manipulate the reply-to header before the message is sent. There should be plenty of scope for that in the RabbitTemplate.

  8. #8

    Default

    Quote Originally Posted by Dave Syer View Post
    It's not really the broker that is responsible for interpreting the reply-to header, it's the consumer (which in your case is Erlang, and very well might be in the same VM as the broker, but it isn't the broker). The binding uri format that we use by default is a convention that many AMQP clients adopt, but I don't think it's part of the spec. You have two choices: change the consumer, or change the producer. I have no idea how you would change the consumer, but that's just my ignorance of Erlang. The producer would need to manipulate the reply-to header before the message is sent. There should be plenty of scope for that in the RabbitTemplate.
    Agreed. In the scope of rpc, placing producer reply-to convention cognizance on consumers is a bad idea and introduces coupling. Why anything other than a simple routing key and the use of the default exchange would be used for rpc responses seems counterintutive to the spec (and certainly a violation of "keep it simple").

    To reiterate, you are acknowledging that since this is the RabbitTemplate, the proper solution is to provide a reply-to value that RabbitMQ itself has adopted, thus the RabbitTemplate should remove the bindig uri information and simply provide the queue's name? The RabbitMQ code is properly responding to the default exchange, providing the given reply-to attribute as the routing key.

    thanks!

    Ps: here's the hack to enable the amqp_rpc_server Erlang module to work (for anyone reading this, this introduces a coupling to rpc producer; said differently, don't do this):

    Code:
    handle_info({#'basic.deliver'{delivery_tag = DeliveryTag},
                 #amqp_msg{props = Props, payload = Payload}},
                State = #state{handler = Fun, channel = Channel}) ->
        #'P_basic'{correlation_id = CorrelationId,
                   reply_to = Q} = Props,
    	<<"direct:///", RoutingKey/binary>> = Q,
    	Response = Fun(Payload),
        Properties = #'P_basic'{correlation_id = CorrelationId},
        Publish = #'basic.publish'{exchange = <<>>,
                                   routing_key = RoutingKey,
                                   mandatory = true},
        amqp_channel:call(Channel, Publish, #amqp_msg{props = Properties,
                                                      payload = Response}),
        amqp_channel:call(Channel, #'basic.ack'{delivery_tag = DeliveryTag}),
        {noreply, State}.

Tags for this Thread

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •