JMS Input -> Service Problemo
Hello,
was wondering if anybody can help....
I'm trying to post a message onto a queue, then have a spring-integration pipeline set up to process the message, ending in a service call.
I've got a couple of integration tests set up - one test the pipeline without any jms - this one works.
The second test brings up an embedded activeMQ broker and posts a message onto it.... this is the one that fails.
It seems that I post my message ok, but it doesn't progress down the channels, filters, etc..... Until.... the test is ended and the beans get destroyed! At that point the very first filter get's hit, and then an exception gets thrown.
Exception is shown, and then the skeleton config. I've been tearing my hair out trying to figure out what's going on.
I haven't got much hair.
To save the few remaining strands could somebody tell me if I'm doing something stupid here?
Thanks,
Vince.
Exception is thrown when test is torn down...
Code:
org.springframework.integration.dispatcher.AggregateMessageDeliveryException: All attempts to deliver Message to MessageHandlers failed.
at org.springframework.integration.dispatcher.UnicastingDispatcher.handleExceptions(UnicastingDispatcher.java:143)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:112)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:90)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:43)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:116)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:94)
at org.springframework.integration.channel.MessageChannelTemplate.doSend(MessageChannelTemplate.java:223)
at org.springframework.integration.channel.MessageChannelTemplate.send(MessageChannelTemplate.java:180)
at org.springframework.integration.channel.MessageChannelTemplate.send(MessageChannelTemplate.java:168)
at org.springframework.integration.jms.ChannelPublishingJmsMessageListener.onMessage(ChannelPublishingJmsMessageListener.java:203)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:518)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:479)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:451)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:323)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:261)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:982)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:974)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:876)
at java.lang.Thread.run(Thread.java:735)
Config:
Code:
<amq:broker>
<amq:transportConnectors>
<amq:transportConnector uri="tcp://localhost:0"/>
</amq:transportConnectors>
</amq:broker>
<!-- ActiveMQ destinations to use -->
<amq:queue id="inputQueue" physicalName="blah.input.name"/>
<!-- JMS ConnectionFactory to use, configuring the embedded broker using XML -->
<amq:connectionFactory id="connectionFactory" brokerURL="vm://localhost"/>
<!-- Spring JMS Template -->
<bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<!-- lets wrap in a pool to avoid creating a connection per send -->
<bean class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory">
<ref local="connectionFactory"/>
</property>
</bean>
</property>
</bean>
<!-- a pojo to put messages onto an input jms queue -->
<bean id="testProducer" class="blah.TestProducer">
<property name="jmsTemplate">
<ref bean="myJmsTemplate"></ref>
</property>
<property name="queue">
<ref bean="inputQueue"/>
</property>
</bean>
<!-- =============== Receives messages from jms =============== -->
<jms:message-driven-channel-adapter id="messageIn"
destination="inputQueue"
channel="wireFormatFIXMLTradeChannel"/>
<channel id="wireFormatFIXMLTradeChannel">
<interceptors>
<ref bean="infoLoggingChannelInterceptor"/>
</interceptors>
</channel>
<!-- add a filter to remove messages we dont care about -->
<filter input-channel="wireFormatFIXMLTradeChannel"
output-channel="filteredFIXMLTradeChannel"
discard-channel="rejectedFIXMLTradeChannel"
ref="fixmlFilter"/>
<!-- any messages filtered out get put on this -->
<channel id="rejectedFIXMLTradeChannel"/>
<!-- messages that reach this channel have been filtered -->
<channel id="filteredFIXMLTradeChannel"/>
<!-- MxMLUnmarshaller -->
<!-- Takes MxML raw XML
Unmarshalls into MxML POJO's
Outputs MxML POJOS's -->
<transformer id="FIXMLUnmarshaller"
input-channel="filteredFIXMLTradeChannel"
ref="FIXMLJAXBUnmarshaller"
output-channel="FIXMLTradeChannel"/>
<!-- MxMLTradeChannel -->
<!-- Data on here are FIXML POJO's -->
<channel id="FIXMLTradeChannel"/>
<!-- takes the jaxb objects and maps to Message with Collection<Trade> payload -->
<transformer id="tradeTransformer"
input-channel="FIXMLTradeChannel"
ref="fixmlTradeTransformer"
output-channel="tradeCollectionOutputChannel"/>
<!-- channel contains a Messages with a Collection<Trade> payload -->
<channel id="tradeCollectionOutputChannel"/>
<!-- filters our invalid trades -->
<filter input-channel="tradeCollectionOutputChannel"
output-channel="validatedTradeChannel"
discard-channel="failedTradeValidationChannel"
ref="validTradeFilter"/>
<!-- trades that pass validation get put on here -->
<channel id="validatedTradeChannel"/>
<!-- trades that fail validation get put on here -->
<channel id="failedTradeValidationChannel"/>
<service-activator id="tradeProcessorServiceActivator"
input-channel="validatedTradeChannel"
ref="tradeProcessorActivator"
method="processTrades"/>
<!-- theres some reference beans here but I don't think they're important. -->