Hi all, and congratulations to every one working on Spring Integration Team
and Spring AMQP, your work is great!
I'm trying this kind of client-server (master-worker) communication using
Spring-based applications and Rabbit MQ server:
- Multiple clients send requests to one or more servers.
- Each request is processed by only one server.
I implemented this with AMQP outbound gateway in client, and AMQP inbound
gateway in server. Client publishes to a topic exchange, 'client.requests',
with a routing key like 'request.type.server.one'. Server listens to a
exclusive, durable, non auto-delete queue which is binded to
'client.request' topic with pattern '#.server.one'. Client listens to reply
messages on an auto-generated queue.
This is working OK, though a new queue is generated on each request and
client is not sending ACK when receiving the server's reply (I'm not sure
of this, but this is what I assume after looking at logging and Rabbit MQ
http management console), but for the moment this is OK (though this is not
the main question I ask in this post, any clue that helps me to resolve
this is appreciated too).
Now my main question is that I want the server to notify all clients the
exit status of the operation done on each request. The queues on which
clients listen to the notifications must be auto-generated too. I figured
that this could be accomplished with an inbound channel adapter. I post my
configuration.
Code:
<rabbit:queue id="anonymous.notification.listening.queue" />
<rabbit:topic-exchange name="server.notification">
<rabbit:bindings>
<rabbit:binding queue="anonymous.notification.listening.queue"
pattern="#.notification.server.one" />
</rabbit:bindings>
</rabbit:topic-exchange>
<int-amqp:inbound-channel-adapter
channel="from.rabbit" connection-factory="rabbit.connection.factory"
queue-names="anonymous.notification.listening.queue" />
I use id in 'queue' element cause I want the framework to auto generate the
queue. And this is done, I guess, but I get an exception.
Code:
Exception in thread "main" org.springframework.context.ApplicationContextException: Failed to start bean 'fromRabbitAdapter'; nested exception is org.springframework.amqp.AmqpIllegalStateException: Fatal exception on listener startup
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:169)
at org.springframework.context.support.DefaultLifecycleProcessor.access$1(DefaultLifecycleProcessor.java:154)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:335)
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:143)
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:108)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:908)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:428)
at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:139)
at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:83)
at es.umh.intelvia.Main.main(Main.java:32)
Caused by: org.springframework.amqp.AmqpIllegalStateException: Fatal exception on listener startup
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doStart(SimpleMessageListenerContainer.java:309)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:358)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.doStart(AmqpInboundChannelAdapter.java:82)
at org.springframework.integration.endpoint.AbstractEndpoint.start(AbstractEndpoint.java:84)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:166)
... 9 more
Caused by: org.springframework.amqp.rabbit.listener.FatalListenerStartupException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:192)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:489)
at java.lang.Thread.run(Thread.java:679)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:107)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:131)
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:643)
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:59)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:616)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:298)
at $Proxy0.queueDeclarePassive(Unknown Source)
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:188)
... 2 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'anonymous.notification.listening.queue' in vhost '/intelvia', class-id=50, method-id=10),null,""}
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:328)
at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:201)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:125)
... 11 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'anonymous.notification.listening.queue' in vhost '/intelvia', class-id=50, method-id=10),null,""}
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:384)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:235)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:151)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:96)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:441)
The exception is caused, as far as I can see, by the inbound adapter (by
the message listener or the listener container framework instantiates, I
think) who tries to get a queue declared with that name and not the
auto-generated Queue object that is in Spring container.
I don't know how to solve this, or if this is the correct way for achieving
what I want.
Any help will be much appreciated.