I have a scenario where the queues are dynamically created by the Producer module and the messages in the dynamically created queues need to be consumed by the Consumer.
Below is rabbitBeansContext.xml
The queue names are dynamically assigned to the MessageListener when the server is started.HTML Code:<beans:bean id="servicerHighPriorityListener" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer"> <beans:property name="queueNames" value="#{ servicerService.highPriorityQueueNames }"></beans:property> <beans:property name="connectionFactory" ref="amqpConn" /> <beans:property name="taskExecutor" ref="highPriorityThreadPoolTaskExecutor" /> <beans:property name="autoStartup" value="true" /> <beans:property name="acknowledgeMode" value="AUTO" /> </beans:bean> <int-amqp:inbound-channel-adapter id="servicerHighPriorityConsumerAdapter" channel="servicerHighPriorityConsumerChannel" listener-container="servicerHighPriorityListener"/ <int:channel id="servicerHighPriorityConsumerChannel"></int:channel> <int:service-activator input-channel="servicerHighPriorityConsumerChannel" ref="loanFilesCopier" method="copyImages" output-channel="highPriorityChecksumOutboundChannel"></int:service-activator> <int:channel id="highPriorityChecksumOutboundChannel"></int:channel> <int-amqp:outbound-channel-adapter channel="highPriorityChecksumOutboundChannel" routing-key="checksum.high.key" amqp-template="amqpTemplate" exchange-name="cis.Checksum.Exchange" id="highPriorityChecksumOutboundAdapter"/>
But after the server is started if any new queue is dynamically created then it is not recognized by the adapter.
So I am trying to restart the AmqpInboundChannelAdapter whenever a new queue is created dynamically so that the consumer will consume the messages from this new queue.
Below is the code that i have written for restarting the Adapter
But the above code is not restarting the adapter and the messages from the newly created queue is not consumed.Code:AmqpInboundChannelAdapter adapter = (AmqpInboundChannelAdapter) applicationContext.getBean(adapterName); ConnectionFactory connectionFactory = (ConnectionFactory) applicationContext.getBean("amqpConn"); adapter.stop(); Thread.sleep(1000); SimpleMessageListenerContainer listener=null; listener = (SimpleMessageListenerContainer) applicationContext.getBean("servicerHighPriorityListener"); listener.setQueueNames(servicerService.getHighPriorityQueueNames()); listener.setAutoStartup(true); listener.setAcknowledgeMode(AcknowledgeMode.AUTO); listener.setConnectionFactory(connectionFactory); ThreadPoolTaskExecutor executor = (ThreadPoolTaskExecutor) applicationContext.getBean("highPriorityThreadPoolTaskExecutor"); listener.setTaskExecutor(executor); listener.setMessageListener(adapter); listener.afterPropertiesSet(); adapter.start(); listener.start();
But if i restart the tomcat server then the message is getting consumed
Can anyone please help me.
Thanks in advance..


Reply With Quote