PDA

View Full Version : Listening to auto-generated queues with <int-amqp:inbound-channel-adapter />



goethe
Jan 30th, 2012, 08:44 AM
Hi all, and congratulations to every one working on Spring Integration Team
and Spring AMQP, your work is great!

I'm trying this kind of client-server (master-worker) communication using
Spring-based applications and Rabbit MQ server:


Multiple clients send requests to one or more servers.
Each request is processed by only one server.


I implemented this with AMQP outbound gateway in client, and AMQP inbound
gateway in server. Client publishes to a topic exchange, 'client.requests',
with a routing key like 'request.type.server.one'. Server listens to a
exclusive, durable, non auto-delete queue which is binded to
'client.request' topic with pattern '#.server.one'. Client listens to reply
messages on an auto-generated queue.

This is working OK, though a new queue is generated on each request and
client is not sending ACK when receiving the server's reply (I'm not sure
of this, but this is what I assume after looking at logging and Rabbit MQ
http management console), but for the moment this is OK (though this is not
the main question I ask in this post, any clue that helps me to resolve
this is appreciated too).

Now my main question is that I want the server to notify all clients the
exit status of the operation done on each request. The queues on which
clients listen to the notifications must be auto-generated too. I figured
that this could be accomplished with an inbound channel adapter. I post my
configuration.


<rabbit:queue id="anonymous.notification.listening.queue" />

<rabbit:topic-exchange name="server.notification">
<rabbit:bindings>
<rabbit:binding queue="anonymous.notification.listening.queue"
pattern="#.notification.server.one" />
</rabbit:bindings>
</rabbit:topic-exchange>

<int-amqp:inbound-channel-adapter
channel="from.rabbit" connection-factory="rabbit.connection.factory"
queue-names="anonymous.notification.listening.queue" />

I use id in 'queue' element cause I want the framework to auto generate the
queue. And this is done, I guess, but I get an exception.


Exception in thread "main" org.springframework.context.ApplicationContextExce ption: Failed to start bean 'fromRabbitAdapter'; nested exception is org.springframework.amqp.AmqpIllegalStateException : Fatal exception on listener startup
at org.springframework.context.support.DefaultLifecyc leProcessor.doStart(DefaultLifecycleProcessor.java :169)
at org.springframework.context.support.DefaultLifecyc leProcessor.access$1(DefaultLifecycleProcessor.jav a:154)
at org.springframework.context.support.DefaultLifecyc leProcessor$LifecycleGroup.start(DefaultLifecycleP rocessor.java:335)
at org.springframework.context.support.DefaultLifecyc leProcessor.startBeans(DefaultLifecycleProcessor.j ava:143)
at org.springframework.context.support.DefaultLifecyc leProcessor.onRefresh(DefaultLifecycleProcessor.ja va:108)
at org.springframework.context.support.AbstractApplic ationContext.finishRefresh(AbstractApplicationCont ext.java:908)
at org.springframework.context.support.AbstractApplic ationContext.refresh(AbstractApplicationContext.ja va:428)
at org.springframework.context.support.ClassPathXmlAp plicationContext.<init>(ClassPathXmlApplicationContext.java:139)
at org.springframework.context.support.ClassPathXmlAp plicationContext.<init>(ClassPathXmlApplicationContext.java:83)
at es.umh.intelvia.Main.main(Main.java:32)
Caused by: org.springframework.amqp.AmqpIllegalStateException : Fatal exception on listener startup
at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer.doStart(SimpleMessageListene rContainer.java:309)
at org.springframework.amqp.rabbit.listener.AbstractM essageListenerContainer.start(AbstractMessageListe nerContainer.java:358)
at org.springframework.integration.amqp.inbound.AmqpI nboundChannelAdapter.doStart(AmqpInboundChannelAda pter.java:82)
at org.springframework.integration.endpoint.AbstractE ndpoint.start(AbstractEndpoint.java:84)
at org.springframework.context.support.DefaultLifecyc leProcessor.doStart(DefaultLifecycleProcessor.java :166)
... 9 more
Caused by: org.springframework.amqp.rabbit.listener.FatalList enerStartupException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
at org.springframework.amqp.rabbit.listener.BlockingQ ueueConsumer.start(BlockingQueueConsumer.java:192)
at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer$AsyncMessageProcessingConsum er.run(SimpleMessageListenerContainer.java:489)
at java.lang.Thread.run(Thread.java:679)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChanne l.java:107)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc (AMQChannel.java:131)
at com.rabbitmq.client.impl.ChannelN.queueDeclarePass ive(ChannelN.java:643)
at com.rabbitmq.client.impl.ChannelN.queueDeclarePass ive(ChannelN.java:59)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Nativ e Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Native MethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(De legatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:616)
at org.springframework.amqp.rabbit.connection.Caching ConnectionFactory$CachedChannelInvocationHandler.i nvoke(CachingConnectionFactory.java:298)
at $Proxy0.queueDeclarePassive(Unknown Source)
at org.springframework.amqp.rabbit.listener.BlockingQ ueueConsumer.start(BlockingQueueConsumer.java:188)
... 2 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'anonymous.notification.listening.queue' in vhost '/intelvia', class-id=50, method-id=10),null,""}
at com.rabbitmq.utility.ValueOrException.getValue(Val ueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.unin terruptibleGetValue(BlockingValueOrException.java: 33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcCon tinuation.getReply(AMQChannel.java:328)
at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel .java:201)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc (AMQChannel.java:125)
... 11 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'anonymous.notification.listening.queue' in vhost '/intelvia', class-id=50, method-id=10),null,""}
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(Ch annelN.java:384)
at com.rabbitmq.client.impl.ChannelN.processAsync(Cha nnelN.java:235)
at com.rabbitmq.client.impl.AMQChannel.handleComplete InboundCommand(AMQChannel.java:151)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AM QChannel.java:96)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.ru n(AMQConnection.java:441)


The exception is caused, as far as I can see, by the inbound adapter (by
the message listener or the listener container framework instantiates, I
think) who tries to get a queue declared with that name and not the
auto-generated Queue object that is in Spring container.

I don't know how to solve this, or if this is the correct way for achieving
what I want.


Any help will be much appreciated.

goethe
Jan 31st, 2012, 05:32 AM
Thanks to gnuphie (http://forum.springsource.org/member.php?57551-gnuphie), in his/her thread Messages not being auto-acked (http://forum.springsource.org/showthread.php?122547-Messages-not-being-auto-acked) he solved my main question, maybe accidentally.

Problem is solved using SpEL, changing this code:



<rabbit:queue id="anonymous.notification.listening.queue" />

<rabbit:topic-exchange name="server.notification">
<rabbit:bindings>
<rabbit:binding queue="anonymous.notification.listening.queue"
pattern="#.notification.server.one" />
</rabbit:bindings>
</rabbit:topic-exchange>

<int-amqp:inbound-channel-adapter
channel="from.rabbit" connection-factory="rabbit.connection.factory"
queue-names="anonymous.notification.listening.queue" />


to this one:



<rabbit:queue id="anonymousQueue" />

<rabbit:topic-exchange name="server.notification">
<rabbit:bindings>
<rabbit:binding queue="anonymousQueue"
pattern="#.notification.server.one" />
</rabbit:bindings>
</rabbit:topic-exchange>

<int-amqp:inbound-channel-adapter
channel="from.rabbit" connection-factory="rabbit.connection.factory"
queue-names="#{anonymousQueue.getName()}" />



And my secondary question, messages not being auto-acked :rolleyes: is solved in the post I reference.