View Full Version : Not able tor restart AmqpInboundChannelAdapter programatically
babithav_yadav
Jun 22nd, 2012, 07:02 AM
I have a scenario where the queues are dynamically created by the Producer module and the messages in the dynamically created queues need to be consumed by the Consumer.
Below is rabbitBeansContext.xml
<beans:bean id="servicerHighPriorityListener" class="org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer">
<beans:property name="queueNames" value="#{ servicerService.highPriorityQueueNames }"></beans:property>
<beans:property name="connectionFactory" ref="amqpConn" />
<beans:property name="taskExecutor" ref="highPriorityThreadPoolTaskExecutor" />
<beans:property name="autoStartup" value="true" />
<beans:property name="acknowledgeMode" value="AUTO" />
</beans:bean>
<int-amqp:inbound-channel-adapter id="servicerHighPriorityConsumerAdapter"
channel="servicerHighPriorityConsumerChannel" listener-container="servicerHighPriorityListener"/
<int:channel id="servicerHighPriorityConsumerChannel"></int:channel>
<int:service-activator input-channel="servicerHighPriorityConsumerChannel"
ref="loanFilesCopier" method="copyImages" output-channel="highPriorityChecksumOutboundChannel"></int:service-activator>
<int:channel id="highPriorityChecksumOutboundChannel"></int:channel>
<int-amqp:outbound-channel-adapter
channel="highPriorityChecksumOutboundChannel" routing-key="checksum.high.key"
amqp-template="amqpTemplate" exchange-name="cis.Checksum.Exchange" id="highPriorityChecksumOutboundAdapter"/>
The queue names are dynamically assigned to the MessageListener when the server is started.
But after the server is started if any new queue is dynamically created then it is not recognized by the adapter.
So I am trying to restart the AmqpInboundChannelAdapter whenever a new queue is created dynamically so that the consumer will consume the messages from this new queue.
Below is the code that i have written for restarting the Adapter
AmqpInboundChannelAdapter adapter = (AmqpInboundChannelAdapter) applicationContext.getBean(adapterName);
ConnectionFactory connectionFactory = (ConnectionFactory) applicationContext.getBean("amqpConn");
adapter.stop();
Thread.sleep(1000);
SimpleMessageListenerContainer listener=null;
listener = (SimpleMessageListenerContainer) applicationContext.getBean("servicerHighPriorityListener");
listener.setQueueNames(servicerService.getHighPrio rityQueueNames());
listener.setAutoStartup(true);
listener.setAcknowledgeMode(AcknowledgeMode.AUTO);
listener.setConnectionFactory(connectionFactory);
ThreadPoolTaskExecutor executor = (ThreadPoolTaskExecutor) applicationContext.getBean("highPriorityThreadPoolTaskExecutor");
listener.setTaskExecutor(executor);
listener.setMessageListener(adapter);
listener.afterPropertiesSet();
adapter.start();
listener.start();
But the above code is not restarting the adapter and the messages from the newly created queue is not consumed.
But if i restart the tomcat server then the message is getting consumed
Can anyone please help me.
Thanks in advance..
Gary Russell
Jun 25th, 2012, 08:10 AM
This works fine for me...
context.getBean(AmqpInboundChannelAdapter.class).s top();
context.getBean(SimpleMessageListenerContainer.cla ss).setQueueNames("si.test.queue", "test.queue.2");
context.getBean(AmqpInboundChannelAdapter.class).s tart();
Look for messages like this in the log (with DEBUG)...
...BlockingQueueConsumer] Started on queue 'si.test.queue': Consumer...
...
...BlockingQueueConsumer] Started on queue 'si.test.queue': Consumer...
...BlockingQueueConsumer] Started on queue 'test.queue.2': Consumer...
babithav_yadav
Jun 26th, 2012, 02:15 AM
Thanks Gary for the reply. I tried what you suggested but I am getting the below exception after the changes
ERROR SimpleAsyncTaskExecutor-1 com.asps.cis.common.adapter.AdapterRestarter - Failed while restarting the adapter servicerHighPriorityConsumerAdapter .
org.springframework.amqp.AmqpIllegalStateException : Fatal exception on listener startup
at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer.doStart(SimpleMessageListene rContainer.java:309)
at org.springframework.amqp.rabbit.listener.AbstractM essageListenerContainer.start(AbstractMessageListe nerContainer.java:358)
at org.springframework.integration.amqp.inbound.AmqpI nboundChannelAdapter.doStart(AmqpInboundChannelAda pter.java:82)
at org.springframework.integration.endpoint.AbstractE ndpoint.start(AbstractEndpoint.java:84)
at com.asps.cis.common.adapter.AdapterRestarter.resta rtAdapter(AdapterRestarter.java:96)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Nativ e Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Native MethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(De legatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.springframework.expression.spel.support.Reflec tiveMethodExecutor.execute(ReflectiveMethodExecuto r.java:69)
at org.springframework.expression.spel.ast.MethodRefe rence.getValueInternal(MethodReference.java:109)
at org.springframework.expression.spel.ast.CompoundEx pression.getValueInternal(CompoundExpression.java: 57)
at org.springframework.expression.spel.ast.SpelNodeIm pl.getTypedValue(SpelNodeImpl.java:102)
at org.springframework.expression.spel.standard.SpelE xpression.getValue(SpelExpression.java:102)
at org.springframework.integration.util.AbstractExpre ssionEvaluator.evaluateExpression(AbstractExpressi onEvaluator.java:126)
at org.springframework.integration.util.MessagingMeth odInvokerHelper.processInternal(MessagingMethodInv okerHelper.java:225)
at org.springframework.integration.util.MessagingMeth odInvokerHelper.process(MessagingMethodInvokerHelp er.java:125)
at org.springframework.integration.handler.MethodInvo kingMessageProcessor.processMessage(MethodInvoking MessageProcessor.java:73)
at org.springframework.integration.handler.ServiceAct ivatingHandler.handleRequestMessage(ServiceActivat ingHandler.java:64)
at org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.handleMessageInternal(A bstractReplyProducingMessageHandler.java:97)
at org.springframework.integration.handler.AbstractMe ssageHandler.handleMessage(AbstractMessageHandler. java:73)
at org.springframework.integration.dispatcher.Unicast ingDispatcher.doDispatch(UnicastingDispatcher.java :114)
at org.springframework.integration.dispatcher.Unicast ingDispatcher.dispatch(UnicastingDispatcher.java:1 01)
at org.springframework.integration.channel.AbstractSu bscribableChannel.doSend(AbstractSubscribableChann el.java:61)
at org.springframework.integration.channel.AbstractMe ssageChannel.send(AbstractMessageChannel.java:157)
at org.springframework.integration.channel.AbstractMe ssageChannel.send(AbstractMessageChannel.java:128)
at org.springframework.integration.core.MessagingTemp late.doSend(MessagingTemplate.java:288)
at org.springframework.integration.core.MessagingTemp late.send(MessagingTemplate.java:149)
at org.springframework.integration.endpoint.MessagePr oducerSupport.sendMessage(MessageProducerSupport.j ava:92)
at org.springframework.integration.amqp.inbound.AmqpI nboundChannelAdapter.access$200(AmqpInboundChannel Adapter.java:39)
at org.springframework.integration.amqp.inbound.AmqpI nboundChannelAdapter$1.onMessage(AmqpInboundChanne lAdapter.java:73)
at org.springframework.amqp.rabbit.listener.AbstractM essageListenerContainer.doInvokeListener(AbstractM essageListenerContainer.java:527)
at org.springframework.amqp.rabbit.listener.AbstractM essageListenerContainer.invokeListener(AbstractMes sageListenerContainer.java:472)
at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer.access$001(SimpleMessageList enerContainer.java:56)
at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer$1.invokeListener(SimpleMessa geListenerContainer.java:103)
at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer.invokeListener(SimpleMessage ListenerContainer.java:560)
at org.springframework.amqp.rabbit.listener.AbstractM essageListenerContainer.executeListener(AbstractMe ssageListenerContainer.java:452)
at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer.doReceiveAndExecute(SimpleMe ssageListenerContainer.java:436)
at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer.receiveAndExecute(SimpleMess ageListenerContainer.java:420)
at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer.access$200(SimpleMessageList enerContainer.java:56)
at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer$AsyncMessageProcessingConsum er.run(SimpleMessageListenerContainer.java:505)
at java.lang.Thread.run(Thread.java:662)
Caused by: org.springframework.amqp.rabbit.listener.FatalList enerStartupException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
at org.springframework.amqp.rabbit.listener.BlockingQ ueueConsumer.start(BlockingQueueConsumer.java:192)
at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer$AsyncMessageProcessingConsum er.run(SimpleMessageListenerContainer.java:489)
at java.util.concurrent.ThreadPoolExecutor$Worker.run Task(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:908)
... 1 more
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChanne l.java:107)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc (AMQChannel.java:131)
at com.rabbitmq.client.impl.ChannelN.queueDeclarePass ive(ChannelN.java:643)
at com.rabbitmq.client.impl.ChannelN.queueDeclarePass ive(ChannelN.java:59)
at sun.reflect.GeneratedMethodAccessor67.invoke(Unkno wn Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(De legatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.springframework.amqp.rabbit.connection.Caching ConnectionFactory$CachedChannelInvocationHandler.i nvoke(CachingConnectionFactory.java:298)
at $Proxy106.queueDeclarePassive(Unknown Source)
at org.springframework.amqp.rabbit.listener.BlockingQ ueueConsumer.start(BlockingQueueConsumer.java:188)
... 4 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'cis.SAXON.High,cis.JPMC.High,cis.TBW.High,cis.LIT TON.High,cis.PRISM.High' in vhost '/', class-id=50, method-id=10),null,""}
at com.rabbitmq.utility.ValueOrException.getValue(Val ueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.unin terruptibleGetValue(BlockingValueOrException.java: 33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcCon tinuation.getReply(AMQChannel.java:328)
at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel .java:201)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc (AMQChannel.java:125)
... 12 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'cis.SAXON.High,cis.JPMC.High,cis.TBW.High,cis.LIT TON.High,cis.PRISM.High' in vhost '/', class-id=50, method-id=10),null,""}
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(Ch annelN.java:384)
at com.rabbitmq.client.impl.ChannelN.processAsync(Cha nnelN.java:235)
at com.rabbitmq.client.impl.AMQChannel.handleComplete InboundCommand(AMQChannel.java:151)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AM QChannel.java:96)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.ru n(AMQConnection.java:441)
Gary Russell
Jun 26th, 2012, 09:50 AM
Looks like you are supplying a comma-delimited list instead of an array (or varargs) ...
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'cis.SAXON.High,cis.JPMC.High,cis.TBW.High,cis.LIT TON.High,cis.PRISM.High' in vhost '/', class-id=50, method-id=10),null,""}
babithav_yadav
Jun 27th, 2012, 08:48 AM
Thanks Gary for the reply.
I resolved the above issue but still i am getting some error while restarting the adapter, the processor module is still not able to consume messages after the adapter is restarted, it is not able to pick the queue names dynamically after adapter restart.
Below is the code that i have written
AmqpInboundChannelAdapter adapter = (AmqpInboundChannelAdapter) applicationContext.getBean(adapterName);
adapter.stop();
Thread.sleep(1000);
SimpleMessageListenerContainer listener= (SimpleMessageListenerContainer) applicationContext.getBean("servicerHighPriorityListener");
listener.setQueueNames("cis.SAXON.High","cis.JPMC.High","cis.TBW.High","cis.LITTON.High","cis.PRISM.High","cis.TEST.High");
listener.setAutoStartup(true);
listener.initialize();
adapter.start();
15:25:19.660 [highPriorityChecksumThreadPoolTaskExecutor-2] WARN o.s.a.r.l.Simp
leMessageListenerContainer - Consumer raised exception, processing can restart i
f the connection factory supports it
com.rabbitmq.client.ShutdownSignalException: connection error; reason: #method<c
onnection.close>(reply-code=541, reply-text=Internal error in Consumer null (amq
.ctag-g_HSG8z6OfQ3QlqVWNzhd3) method handleCancelOk for channel AMQChannel(amqp:
//guest@172.18.98.68:5672/,7), class-id=0, method-id=0)
at com.rabbitmq.client.impl.AMQConnection.shutdown(AM QConnection.java:58
9) ~[amqp-client-2.5.1.jar:na]
at com.rabbitmq.client.impl.AMQConnection.close(AMQCo nnection.java:695)
~[amqp-client-2.5.1.jar:na]
at com.rabbitmq.client.impl.AMQConnection.close(AMQCo nnection.java:673)
~[amqp-client-2.5.1.jar:na]
at com.rabbitmq.client.impl.DefaultExceptionHandler.h andleChannelKiller(
DefaultExceptionHandler.java:67) ~[amqp-client-2.5.1.jar:na]
at com.rabbitmq.client.impl.DefaultExceptionHandler.h andleConsumerExcept
ion(DefaultExceptionHandler.java:52) ~[amqp-client-2.5.1.jar:na]
at com.rabbitmq.client.impl.ChannelN$2.transformReply (ChannelN.java:824)
~[amqp-client-2.5.1.jar:na]
at com.rabbitmq.client.impl.ChannelN$2.transformReply (ChannelN.java:814)
~[amqp-client-2.5.1.jar:na]
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcCon tinuation.handleCom
mand(AMQChannel.java:319) ~[amqp-client-2.5.1.jar:na]
at com.rabbitmq.client.impl.AMQChannel.handleComplete InboundCommand(AMQC
hannel.java:154) ~[amqp-client-2.5.1.jar:na]
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AM QChannel.java:96) ~
[amqp-client-2.5.1.jar:na]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.ru n(AMQConnection.jav
a:441) ~[amqp-client-2.5.1.jar:na]
Caused by: java.lang.NullPointerException: null
at com.rabbitmq.client.impl.ChannelN$2.transformReply (ChannelN.java:822)
~[amqp-client-2.5.1.jar:na]
... 5 common frames omitted
15:25:19.660 [highPriorityChecksumThreadPoolTaskExecutor-2] INFO o.s.a.r.l.Simp
leMessageListenerContainer - Restarting Consumer: tag=[amq.ctag-gpIbKlBZ3IcWaIgb
Ix-VVZ], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@172.18.98.68:567
2/,1), acknowledgeMode=AUTO local queue size=0
15:25:19.660 [highPriorityChecksumThreadPoolTaskExecutor-2] DEBUG o.s.a.r.l.Bloc
kingQueueConsumer - Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(am
qp://guest@172.18.98.68:5672/,1)
15:25:19.660 [errorThreadPoolTaskExecutor-1] WARN o.s.a.r.l.SimpleMessageListen
erContainer - Consumer raised exception, processing can restart if the connectio
n factory supports it
com.rabbitmq.client.ShutdownSignalException: connection error; reason: #method<c
onnection.close>(reply-code=541, reply-text=Internal error in Consumer null (amq
.ctag-g_HSG8z6OfQ3QlqVWNzhd3) method handleCancelOk for channel AMQChannel(amqp:
//guest@172.18.98.68:5672/,7), class-id=0, method-id=0)
at com.rabbitmq.client.impl.AMQConnection.shutdown(AM QConnection.java:58
9) ~[amqp-client-2.5.1.jar:na]
at com.rabbitmq.client.impl.AMQConnection.close(AMQCo nnection.java:695)
~[amqp-client-2.5.1.jar:na]
at com.rabbitmq.client.impl.AMQConnection.close(AMQCo nnection.java:673)
~[amqp-client-2.5.1.jar:na]
at com.rabbitmq.client.impl.DefaultExceptionHandler.h andleChannelKiller(
DefaultExceptionHandler.java:67) ~[amqp-client-2.5.1.jar:na]
at com.rabbitmq.client.impl.DefaultExceptionHandler.h andleConsumerExcept
ion(DefaultExceptionHandler.java:52) ~[amqp-client-2.5.1.jar:na]
at com.rabbitmq.client.impl.ChannelN$2.transformReply (ChannelN.java:824)
~[amqp-client-2.5.1.jar:na]
at com.rabbitmq.client.impl.ChannelN$2.transformReply (ChannelN.java:814)
~[amqp-client-2.5.1.jar:na]
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcCon tinuation.handleCom
mand(AMQChannel.java:319) ~[amqp-client-2.5.1.jar:na]
at com.rabbitmq.client.impl.AMQChannel.handleComplete InboundCommand(AMQC
hannel.java:154) ~[amqp-client-2.5.1.jar:na]
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AM QChannel.java:96) ~
[amqp-client-2.5.1.jar:na]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.ru n(AMQConnection.jav
a:441) ~[amqp-client-2.5.1.jar:na]
Caused by: java.lang.NullPointerException: null
at com.rabbitmq.client.impl.ChannelN$2.transformReply (ChannelN.java:822)
~[amqp-client-2.5.1.jar:na]
... 5 common frames omitted
15:25:19.660 [normalPriorityChecksumThreadPoolTaskExecutor-2] WARN o.s.a.r.l.Si
mpleMessageListenerContainer - Consumer raised exception, processing can restart
if the connection factory supports it
com.rabbitmq.client.ShutdownSignalException: connection error; reason: #method<c
onnection.close>(reply-code=541, reply-text=Internal error in Consumer null (amq
.ctag-g_HSG8z6OfQ3QlqVWNzhd3) method handleCancelOk for channel AMQChannel(amqp:
//guest@172.18.98.68:5672/,7), class-id=0, method-id=0)
at com.rabbitmq.client.impl.AMQConnection.shutdown(AM QConnection.java:58
9) ~[amqp-client-2.5.1.jar:na]
at com.rabbitmq.client.impl.AMQConnection.close(AMQCo nnection.java:695)
~[amqp-client-2.5.1.jar:na]
at com.rabbitmq.client.impl.AMQConnection.close(AMQCo nnection.java:673)
~[amqp-client-2.5.1.jar:na]
at com.rabbitmq.client.impl.DefaultExceptionHandler.h andleChannelKiller(
DefaultExceptionHandler.java:67) ~[amqp-client-2.5.1.jar:na]
at com.rabbitmq.client.impl.DefaultExceptionHandler.h andleConsumerExcept
ion(DefaultExceptionHandler.java:52) ~[amqp-client-2.5.1.jar:na]
at com.rabbitmq.client.impl.ChannelN$2.transformReply (ChannelN.java:824)
~[amqp-client-2.5.1.jar:na]
at com.rabbitmq.client.impl.ChannelN$2.transformReply (ChannelN.java:814)
~[amqp-client-2.5.1.jar:na]
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcCon tinuation.handleCom
mand(AMQChannel.java:319) ~[amqp-client-2.5.1.jar:na]
at com.rabbitmq.client.impl.AMQChannel.handleComplete InboundCommand(AMQC
hannel.java:154) ~[amqp-client-2.5.1.jar:na]
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AM QChannel.java:96) ~
[amqp-client-2.5.1.jar:na]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.ru n(AMQConnection.jav
a:441) ~[amqp-client-2.5.1.jar:na]
Caused by: java.lang.NullPointerException: null
at com.rabbitmq.client.impl.ChannelN$2.transformReply (ChannelN.java:822)
~[amqp-client-2.5.1.jar:na]
... 5 common frames omitted
15:25:19.660 [errorThreadPoolTaskExecutor-1] INFO o.s.a.r.l.SimpleMessageListen
erContainer - Restarting Consumer: tag=[amq.ctag-An8l7NlH4ckcppOR4O6SGi], channe
l=Cached Rabbit Channel: AMQChannel(amqp://guest@172.18.98.68:5672/,5), acknowle
dgeMode=AUTO local queue size=0
15:25:19.660 [normalPriorityChecksumThreadPoolTaskExecutor-2] INFO o.s.a.r.l.Si
mpleMessageListenerContainer - Restarting Consumer: tag=[amq.ctag-wuWLKksulBf30Y
mE6yuzaH], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@172.18.98.68:5
672/,4), acknowledgeMode=AUTO local queue size=0
15:25:19.660 [errorThreadPoolTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsum
er - Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@172.
18.98.68:5672/,5)
15:25:19.660 [normalPriorityChecksumThreadPoolTaskExecutor-2] DEBUG o.s.a.r.l.Bl
ockingQueueConsumer - Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(
amqp://guest@172.18.98.68:5672/,4)
Gary Russell
Jun 27th, 2012, 11:57 AM
I am not sure if this is the problem but you shouldn't call initialize() on the container.
Just stop() the adapter, change the queues and start() the adapter as I showed above.
Powered by vBulletin® Version 4.2.1 Copyright © 2013 vBulletin Solutions, Inc. All rights reserved.