Hi,
I'm successfully running a Spring 3.2 MVC APP with Spring AMQP version 1.1.3.RELEASE to a RabbitMQ Server (AWS EC2 Ubuntu) version 3.0.2 with SSL to port 5671. Queues, exchanges, etc... created fine and listener subscribed to topic queues come up okay and everything works.
However, when I simply put the RabbitMQ Server behind an Amazon Elastic Load Balancer and point my Spring AMQP client to the load balancer, I see no errors during initialization, but then see the following errors repeat every 20 seconds on the listeners in the Tomcat log:
19:33:23.469 [threadPoolTaskExecutorPrivate-10] WARN o.s.a.r.l.SimpleMessageListenerContainer - Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpIOException: java.io.IOException
19:33:33.483 [threadPoolTaskExecutorWellKnown-10] WARN o.s.a.r.l.SimpleMessageListenerContainer - Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpIOException: java.io.IOException
Note that the Queues, Exchanges are not created on the RabbitMQ, so the init is failing, but I see no error indications in the Tomcat Log until the listeners try to connect.
Here's my AMQP code:
Any ideas why this works fine directly to the RabbitMQ Server but not to a load balancer (same port as the RabbitMQ Server) that has the RabbitMQ Server behind it?Code:@Bean public ConnectionFactory connectionFactory() { final com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory(); connectionFactory.setUsername(this.queueServerUsername); connectionFactory.setPassword(this.queueServerPassword); connectionFactory.setPort(this.queueServerPort); connectionFactory.setHost(this.queueServerAddr); connectionFactory.setRequestedHeartbeat(30); if (this.sslOn.equals("true")) { connectionFactory.setPort(this.queueServerSslPort); try { final KeyStore tks = KeyStore.getInstance("JKS"); final InputStream certsInputStream = this.getClass().getResourceAsStream(this.keyStore); tks.load(certsInputStream, this.keyStorePassword.toCharArray()); // Set up key manager factory to use our key store final final KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509"); kmf.init(tks, this.keyStorePassword.toCharArray()); final TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509"); tmf.init(tks); final SSLContext c = SSLContext.getInstance("SSLv3"); c.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); connectionFactory.useSslProtocol(c); } catch (final Exception e) { e.printStackTrace(); } } final CachingConnectionFactory ccf = new CachingConnectionFactory(connectionFactory); return ccf; } @Bean public AmqpAdmin amqpAdmin() { return new RabbitAdmin(this.connectionFactory()); } @Bean public RabbitTemplate rabbitTemplate() { final RabbitTemplate template = new RabbitTemplate(this.connectionFactory()); template.setRoutingKey(RabbitMqConfiguration.this.wellKnownQueue); template.setQueue(RabbitMqConfiguration.this.wellKnownQueue); return template; } @Bean public TopicExchange applicationWellKnownExchange() { return new TopicExchange(RabbitMqConfiguration.this.wellKnownQueue); } @Bean public TopicExchange applicationPrivateExchange() { return new TopicExchange(RabbitMqConfiguration.this.PrivateQueueName); } @Bean public Queue wellKnownQueue() { return new Queue(RabbitMqConfiguration.this.wellKnownQueue); } @Bean public Queue privateQueue() { return new Queue(RabbitMqConfiguration.this.PrivateQueueName, true, false, true); } @Bean public MessageConverter jsonMessageConverter() { return new JsonMessageConverter(); } @Bean public SimpleMessageListenerContainer messageListenerContainerWellKnown() { try { final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(this.connectionFactory()); container.setQueueNames(RabbitMqConfiguration.this.wellKnownQueue); container.setMessageListener(this.messageListenerWellKnownAdapter()); container.setAcknowledgeMode(AcknowledgeMode.AUTO); container.setTaskExecutor(this.threadPoolTaskExecutorWellKnown()); return container; } catch (final AmqpIOException e) { e.printStackTrace(); } catch (final Exception e) { e.printStackTrace(); } return null; } @Bean public MessageListenerAdapter messageListenerWellKnownAdapter() { final MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(this.applicationClientRestController(), this.jsonMessageConverter()); messageListenerAdapter.setDefaultListenerMethod("handleWellKnownQueueMessage"); return messageListenerAdapter; } @Bean public ThreadPoolTaskExecutor threadPoolTaskExecutorWellKnown() { final ThreadPoolTaskExecutor threadPoolTaskExecutorWellKnown = new ThreadPoolTaskExecutor(); threadPoolTaskExecutorWellKnown.setCorePoolSize(this.queueWellknownCorePoolSize); threadPoolTaskExecutorWellKnown.setMaxPoolSize(this.queueWellknownMaxPoolSize); return threadPoolTaskExecutorWellKnown; } @Bean public SimpleMessageListenerContainer messageListenerContainerPrivate() { final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(this.connectionFactory()); container.setQueueNames(RabbitMqConfiguration.this.PrivateQueueName); container.setMessageListener(this.messageListenerPrivateAdapter()); container.setAcknowledgeMode(AcknowledgeMode.AUTO); container.setTaskExecutor(this.threadPoolTaskExecutorPrivate()); return container; } @Bean public MessageListenerAdapter messageListenerPrivateAdapter() { final MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(this.applicationClientRestController(), this.jsonMessageConverter()); messageListenerAdapter.setDefaultListenerMethod("handlePrivateQueueMessage"); return messageListenerAdapter; } @Bean public ThreadPoolTaskExecutor threadPoolTaskExecutorPrivate() { final ThreadPoolTaskExecutor threadPoolTaskExecutorPrivate = new ThreadPoolTaskExecutor(); threadPoolTaskExecutorPrivate.setCorePoolSize(this.queuePrivateCorePoolSize); threadPoolTaskExecutorPrivate.setMaxPoolSize(this.queuePrivateMaxPoolSize); return threadPoolTaskExecutorPrivate; } @Bean public ApplicationClientRestController applicationClientRestController() { return new ApplicationClientRestController(); } @Bean public Binding rabbitMqBinding() { return BindingBuilder.bind(this.wellKnownQueue()).to(this.applicationWellKnownExchange()).with(RabbitMqConfiguration.this.wellKnownQueue); } @Bean public Binding rabbitMqBinding2() { return BindingBuilder.bind(this.privateQueue()).to(this.applicationPrivateExchange()).with(RabbitMqConfiguration.this.PrivateQueueName); }
Thanks,
Brian


Reply With Quote