Page 1 of 2 12 LastLast
Results 1 to 10 of 15

Thread: Simple AMQP question

  1. #1

    Default Simple AMQP question

    Hello,

    I've created a sample application using the build snapshot of spring integration and amqp. I'm running RabbitMQ and am able to run the sample tests included with the integration-amqp source.

    I've got a sample client that creates a simple message, for which I would like to get a reply after a sequence of hops have been made through the messaging system. The topology is this:

    1. inboundChannel
    2. serviceActivator
    3. router
    4. replyChannel or invalidChannel


    The client code posts the message to the inboundChannel and registers itself for replies on both the reply and invalid channels.

    My spring integration config looks like this:

    Code:
    <beans:bean id="serviceActivator" class="com.sms.connect.adapter.rest.smsRestAdaptor">
            <beans:constructor-arg value="http"/>
            <beans:constructor-arg value="localhost"/>
            <beans:constructor-arg value="8080"/>
            <beans:constructor-arg value="api"/>
            <beans:constructor-arg value="v1"/>
        </beans:bean>
    
        <beans:bean id="router" class="com.sms.connect.adapter.rest.smsRestRouter"/>
    
        <amqp:inbound-channel-adapter channel="smsInboundChannel"
            queue-names="sms-inbound-queue" connection-factory="rabbitConnectionFactory" />
    
        <channel id="smsInboundChannel">
            <interceptors>
                <wire-tap channel="logger" />
            </interceptors>
        </channel>
    
        <service-activator input-channel="smsInboundChannel"
            ref="serviceActivator" method="handleMessage" output-channel="smsOutputChannel" />
    
        <channel id="smsOutputChannel">
            <interceptors>
                <wire-tap channel="logger" />
            </interceptors>
        </channel>
    
        <router input-channel="smsOutputChannel" ref="router" method="route"/>
    
        <channel id="smsInvalidChannel"/>
        <channel id="smsReplyChannel"/>
    
        <logging-channel-adapter id="logger" level="ERROR" />
    
        <rabbit:queue name="sms-inbound-queue"/>
    My router simply returns smsInvalidChannel or smsReplyChannel.

    The behavior I am seeing is that the message keeps repeating. Perhaps there is a poller that reexecutes the same message? The message is never removed from RabbitMQ.

    So first I'd like to know how to stop the message from repeating and have it removed from the queue. Second, how do I specify a queue name on which to publish the response on the invalid and reply channels.

    So far my client looks like this (it is not polling for a reply right now--it's only submitting the request and looking for a response. I'm new to this...

    Code:
    public class RabbitMQClientTest {
        @Autowired
        private AmqpAdmin admin;
        @Autowired
        private AmqpTemplate template;
    
        @Test
        public void simpleProducerConsumerTest() {
            try {
                String sent = "gdvs";
                admin.declareQueue(new Queue("sms-inbound-queue"));
    
                Message message = new Message(sent.getBytes(), null);
    
                // write message
                template.convertAndSend(sent);
                // read message
                Object results = template.receiveAndConvert();
    
                //System.out.println("Msg: " + received);
                //Assert.assertEquals(sent, received);
    
            } catch (Exception e) {
                Assert.fail("Test failed: " + e.getLocalizedMessage());
            }
        }
    
    }
    Thanks,

    Scott

  2. #2
    Join Date
    Jun 2005
    Posts
    4,230

    Default

    I imagine the repeat deliveries you are seeing are caused by an exception in the message listener - by default an exception causes the message to be re-queued on the broker. The exception is probably logged for you, but unless there is some more config that we haven't seen, I can guess that it is Spring Integration telling you that it has no way to deliver the messages you send to the channels downstream of the router (they are not connected to anything). That is related to one of your questions: if you want to send the messages back to the Rabbit broker, then you need an outbound adapter attached to each of the outbound channels.

    Your test looks like a good start, and it should at least send a message, but there are some problems. First, it seems to be sending and receiving from the same queue - the default queue that you have configured in the unseen autowired RabbitTemplate (probably not what you wanted). Second, you don't wait for a response, so at least some of the time the result from receiveAndConvert() will be null. Third (cosmetic only), the Message you declare is never used.

    Since a lot of this is really about Spring Integration, you might find it helpful to analyse the behaviour by taking AMQP out of the equation to start with. Also the Integration forum will be able to provide more detail if you still have questions about vanilla Spring Integration.

  3. #3

    Default

    I noticed toward the end of the day yesterday as I traced through the spring source that some kind of exception was being thrown. This morning I'm working on my example to create two reply queues that I will listen to asynchronously. If I can do that then I am in business.

    Thanks for the reply. I'll update when I know more.

  4. #4

    Default

    I was able to get an example of what I'm looking for to work using Spring AMQP, but not with Integration.

    Is there a working Spring Integration sample available that has the following features:
    1. A client that initiates message specifying a return address
    2. The client monitors the named queue (specified in the return address) for the reply
    3. An endpoint that receives the message
    4. The message is sent back on the queue specified by the client as the return address
    5. The client getting the message it was waiting for


    Surprisingly I have not been able to make this happen.

    My client looks like this when it sends the message:

    Code:
                template.convertAndSend( msg, new MessagePostProcessor() {
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setReplyTo("sms-reply-queue");
                        try {
                            message.getMessageProperties().setCorrelationId(UUID.randomUUID().toString().getBytes("UTF-8"));
                        }
                        catch (UnsupportedEncodingException e) {
                            throw new AmqpException(e);
                        }
                        return message;
                    }
                });
    This specifies the queue name as
    sms-reply-queue
    The client monitors messages on the reply queue like this:
    Code:
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    	container.setConnectionFactory((ConnectionFactory) context.getBean("rabbitConnFactory"));
    	container.setQueueNames("sms-inbound-queue");
    	container.setMessageListener(new MessageListenerAdapter(new MessageListenerAdapter() {
                public void handleMessage(String text) {
                    System.out.println("Message received: " + text);
                }
            }));
    I can't get my integration solution to call back onto this reply queue, though I can get a similar solution to work without integration. This is a simple matter of semantics. Can anyone tell me what my spring integration config should look like to make this happen?

    As a starting point, here is what I have right now:

    Code:
    <amqp:inbound-channel-adapter channel="smsInboundChannel"
            queue-names="sms-inbound-queue" connection-factory="rabbitConnectionFactory" />
    
    <channel id="smsInboundChannel"/>
    
    <service-activator input-channel="smsInboundChannel" ref="smsRestAdaptor" method="handleMessage" />
    
    <channel id="smsOutputChannel"/>
    <!-- <stream:stdout-channel-adapter id="smsOutputChannel"/> -->
    
    <rabbit:queue name="sms-inbound-queue"/>
    <rabbit:queue name="sms-reply-queue"/>
    I haven't defined anything to associate with the smsOutputChannel and so I have an error. You can see that I've commented out the stdout-channel-adapter line, which I've done because I want the reply to go to queue the client named. So what I have definitely won't work; what I need is the last piece of the puzzle. How do I define an output channel for my service-activator.

    Keep in mind too that this is a simplified case. Later there will be several chained steps leading up to and away from my service activator. I will want to maintain the reply queue name throughout and send back to that named queue when the chain is complete.

    Thanks,
    Scott

  5. #5
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,840

    Default

    Scott, it sounds to me like you just want to use the amqp:outbound-gateway and amqp:inbound-gateway. Those handle the request/reply logic for you.

    Here's a sample configuration: https://github.com/SpringSource/spri...ts-context.xml

  6. #6

    Default

    Mark,

    All the samples provided in the integration tests point to the stdout-channel-adapter. There are no examples of spring configuration that leaves the choice of the reply queue/channel up to the requester. The GatewayEchoTest-context.xml file routes replies as such:

    Code:
    <amqp:outbound-gateway request-channel="outboundGatewayRequests" reply-channel="outboundGatewayReplies" routing-key="si.test.gateway.echo" amqp-template="amqpTemplate" />
    
    <console:stdout-channel-adapter id="outboundGatewayReplies" append-newline="true" />
    If I am understanding this correctly, the reply is sent to stdout and is not sent back to the requester--or is it?

  7. #7
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,840

    Default

    You can leave the "reply-channel" off of the outbound-gateway element, and it will instead use the replyChannel header that had been established by the requestor.

  8. #8

    Default

    Mark,

    When I attempt to leave off the reply-channel gateway element, the spring integration schema is invalid--the reply-channel attribute is required.

    Scott

  9. #9
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,840

    Default

    What version are you using?

  10. #10

    Default

    Right now it is a build snapshot version, since the spring integration for amqp is not available in the current release (as of two weeks ago). The versions are:

    Spring AMQP: 1.0.BUILD-SNAPSHOT
    Rabbit MQ Client: 2.5.0
    Spring Framework: 3.0.6.RELEASE
    Spring Integration: 2.1.0.BUILD-SNAPSHOT

    Should I get the latest builds for each of these?

    Thanks for your interest in helping me on this.

    Scott

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •