jackett_dad
Oct 12th, 2011, 02:58 PM
Hello,
I've created a sample application using the build snapshot of spring integration and amqp. I'm running RabbitMQ and am able to run the sample tests included with the integration-amqp source.
I've got a sample client that creates a simple message, for which I would like to get a reply after a sequence of hops have been made through the messaging system. The topology is this:
inboundChannel
serviceActivator
router
replyChannel or invalidChannel
The client code posts the message to the inboundChannel and registers itself for replies on both the reply and invalid channels.
My spring integration config looks like this:
<beans:bean id="serviceActivator" class="com.sms.connect.adapter.rest.smsRestAdaptor">
<beans:constructor-arg value="http"/>
<beans:constructor-arg value="localhost"/>
<beans:constructor-arg value="8080"/>
<beans:constructor-arg value="api"/>
<beans:constructor-arg value="v1"/>
</beans:bean>
<beans:bean id="router" class="com.sms.connect.adapter.rest.smsRestRouter"/>
<amqp:inbound-channel-adapter channel="smsInboundChannel"
queue-names="sms-inbound-queue" connection-factory="rabbitConnectionFactory" />
<channel id="smsInboundChannel">
<interceptors>
<wire-tap channel="logger" />
</interceptors>
</channel>
<service-activator input-channel="smsInboundChannel"
ref="serviceActivator" method="handleMessage" output-channel="smsOutputChannel" />
<channel id="smsOutputChannel">
<interceptors>
<wire-tap channel="logger" />
</interceptors>
</channel>
<router input-channel="smsOutputChannel" ref="router" method="route"/>
<channel id="smsInvalidChannel"/>
<channel id="smsReplyChannel"/>
<logging-channel-adapter id="logger" level="ERROR" />
<rabbit:queue name="sms-inbound-queue"/>
My router simply returns smsInvalidChannel or smsReplyChannel.
The behavior I am seeing is that the message keeps repeating. Perhaps there is a poller that reexecutes the same message? The message is never removed from RabbitMQ.
So first I'd like to know how to stop the message from repeating and have it removed from the queue. Second, how do I specify a queue name on which to publish the response on the invalid and reply channels.
So far my client looks like this (it is not polling for a reply right now--it's only submitting the request and looking for a response. I'm new to this...
public class RabbitMQClientTest {
@Autowired
private AmqpAdmin admin;
@Autowired
private AmqpTemplate template;
@Test
public void simpleProducerConsumerTest() {
try {
String sent = "gdvs";
admin.declareQueue(new Queue("sms-inbound-queue"));
Message message = new Message(sent.getBytes(), null);
// write message
template.convertAndSend(sent);
// read message
Object results = template.receiveAndConvert();
//System.out.println("Msg: " + received);
//Assert.assertEquals(sent, received);
} catch (Exception e) {
Assert.fail("Test failed: " + e.getLocalizedMessage());
}
}
}
Thanks,
Scott
I've created a sample application using the build snapshot of spring integration and amqp. I'm running RabbitMQ and am able to run the sample tests included with the integration-amqp source.
I've got a sample client that creates a simple message, for which I would like to get a reply after a sequence of hops have been made through the messaging system. The topology is this:
inboundChannel
serviceActivator
router
replyChannel or invalidChannel
The client code posts the message to the inboundChannel and registers itself for replies on both the reply and invalid channels.
My spring integration config looks like this:
<beans:bean id="serviceActivator" class="com.sms.connect.adapter.rest.smsRestAdaptor">
<beans:constructor-arg value="http"/>
<beans:constructor-arg value="localhost"/>
<beans:constructor-arg value="8080"/>
<beans:constructor-arg value="api"/>
<beans:constructor-arg value="v1"/>
</beans:bean>
<beans:bean id="router" class="com.sms.connect.adapter.rest.smsRestRouter"/>
<amqp:inbound-channel-adapter channel="smsInboundChannel"
queue-names="sms-inbound-queue" connection-factory="rabbitConnectionFactory" />
<channel id="smsInboundChannel">
<interceptors>
<wire-tap channel="logger" />
</interceptors>
</channel>
<service-activator input-channel="smsInboundChannel"
ref="serviceActivator" method="handleMessage" output-channel="smsOutputChannel" />
<channel id="smsOutputChannel">
<interceptors>
<wire-tap channel="logger" />
</interceptors>
</channel>
<router input-channel="smsOutputChannel" ref="router" method="route"/>
<channel id="smsInvalidChannel"/>
<channel id="smsReplyChannel"/>
<logging-channel-adapter id="logger" level="ERROR" />
<rabbit:queue name="sms-inbound-queue"/>
My router simply returns smsInvalidChannel or smsReplyChannel.
The behavior I am seeing is that the message keeps repeating. Perhaps there is a poller that reexecutes the same message? The message is never removed from RabbitMQ.
So first I'd like to know how to stop the message from repeating and have it removed from the queue. Second, how do I specify a queue name on which to publish the response on the invalid and reply channels.
So far my client looks like this (it is not polling for a reply right now--it's only submitting the request and looking for a response. I'm new to this...
public class RabbitMQClientTest {
@Autowired
private AmqpAdmin admin;
@Autowired
private AmqpTemplate template;
@Test
public void simpleProducerConsumerTest() {
try {
String sent = "gdvs";
admin.declareQueue(new Queue("sms-inbound-queue"));
Message message = new Message(sent.getBytes(), null);
// write message
template.convertAndSend(sent);
// read message
Object results = template.receiveAndConvert();
//System.out.println("Msg: " + received);
//Assert.assertEquals(sent, received);
} catch (Exception e) {
Assert.fail("Test failed: " + e.getLocalizedMessage());
}
}
}
Thanks,
Scott