Hi,
I'm using spring integration 2.1.3.RELEASE with ActiveMQ.
This is my configuration :
The services are in this class :Code:<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:int-stream="http://www.springframework.org/schema/integration/stream" xmlns:mongo="http://www.springframework.org/schema/data/mongo" xmlns:int-jmx="http://www.springframework.org/schema/integration/jmx" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp" xsi:schemaLocation="http://www.springframework.org/schema/data/mongo http://www.springframework.org/schema/data/mongo/spring-mongo-1.0.xsd http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp-2.1.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.1.xsd http://www.springframework.org/schema/integration/jmx http://www.springframework.org/schema/integration/jmx/spring-integration-jmx-2.1.xsd http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.1.xsd http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream-2.1.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd"> <!-- *************** 1. General Configuration *************** --> <!-- 1.11 Model to Message converter --> <bean id="jaxbMessageConverter" class="ca.sunmedia.test.converter.JaxbMessageConverter"> <property name="marshallerProperties"> <map> <entry> <key> <value>jaxb.encoding</value> </key> <value type="java.lang.String">UTF-8</value> </entry> <entry> <key> <value>jaxb.formatted.output</value> </key> <value type="java.lang.Boolean">true</value> </entry> </map> </property> <property name="classesToBeBound"> <list> <value>ca.sunmedia.test.model.Bidon</value> </list> </property> </bean> <!-- 1.2 Activates various annotations to be detected in bean classes --> <context:annotation-config /> <!-- 1.3 Activate the trace of all message history --> <int:message-history /> <!-- 1.4 Component Scanning --> <context:component-scan base-package="ca.sunmedia.test.service.impl" /> <!-- *************** 2 ActiveMQ configurations *************** --> <!-- 2.1 ActiveMQ Conntection configuration --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> </property> <property name="sessionCacheSize" value="10" /> <property name="cacheProducers" value="false" /> </bean> <int:channel id="entryChannel" /> <int:channel id="afterSplitterChannel" /> <int:channel id="afterProcessChannel" /> <int:channel id="afterAggregatorChannel" /> <bean id="entryQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="1.entry" /> </bean> <int-jms:message-driven-channel-adapter channel="entryChannel" destination="entryQueue" message-converter="jaxbMessageConverter"/> <int-jms:outbound-channel-adapter channel="entryChannel" destination="entryQueue" explicit-qos-enabled="true" delivery-persistent="true" extract-payload="true" message-converter="jaxbMessageConverter"/> <bean id="afterSplitterQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="2.afterSplitter" /> </bean> <int-jms:message-driven-channel-adapter channel="afterSplitterChannel" destination="afterSplitterQueue" message-converter="jaxbMessageConverter"/> <int-jms:outbound-channel-adapter channel="afterSplitterChannel" destination="afterSplitterQueue" explicit-qos-enabled="true" delivery-persistent="true" extract-payload="true" message-converter="jaxbMessageConverter"/> <bean id="afterProcessQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="3.afterProcess" /> </bean> <int-jms:message-driven-channel-adapter channel="afterProcessChannel" destination="afterProcessQueue" message-converter="jaxbMessageConverter"/> <int-jms:outbound-channel-adapter channel="afterProcessChannel" destination="afterProcessQueue" explicit-qos-enabled="true" delivery-persistent="true" extract-payload="true" message-converter="jaxbMessageConverter"/> <bean id="afterAggregatorQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="4.afterAggregator" /> </bean> <int-jms:message-driven-channel-adapter channel="afterAggregatorChannel" destination="afterAggregatorQueue" message-converter="jaxbMessageConverter" /> <int-jms:outbound-channel-adapter channel="afterAggregatorChannel" destination="afterAggregatorQueue" explicit-qos-enabled="true" delivery-persistent="true" extract-payload="true" message-converter="jaxbMessageConverter"/> <int:splitter input-channel="entryChannel" output-channel="afterSplitterChannel" ref="bidonService" method="split" /> <int:service-activator input-channel="afterSplitterChannel" output-channel="afterProcessChannel" ref="bidonService" method="processMessage" /> <!-- <bean id="JMSCorrelationStrategy" class="ca.sunmedia.test.aggregator.strategy.JMSCorrelationStrategy"> </bean> --> <int:aggregator input-channel="afterProcessChannel" output-channel="afterAggregatorChannel" ref="bidonService" method="aggregate" correlation-strategy-method="getCorrelationKey"/> <int:service-activator input-channel="afterAggregatorChannel" ref="bidonService" method="readAggregatedResult" /> </beans>
The problem is that when an exception occurs, the message is reinserted in the queue. So this create a loop. In the above class, I generate a dummy exception in the processMessage method.Code:package ca.sunmedia.test.service.impl; import java.util.ArrayList; import java.util.List; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.integration.Message; import org.springframework.integration.MessageChannel; import org.springframework.integration.annotation.CorrelationStrategy; import org.springframework.integration.jms.JmsHeaders; import org.springframework.integration.support.MessageBuilder; import org.springframework.stereotype.Component; import ca.sunmedia.test.model.Bidon; @Component(value="bidonService") public class BidonService { @Autowired @Qualifier("entryChannel") private MessageChannel entryChannel; public void start() { Bidon bidon = new Bidon(); bidon.setId("start"); Message<Bidon> message = MessageBuilder.withPayload(bidon).build(); entryChannel.send(message); } public List<Message<Bidon>> split(Message<Bidon> input) { List<Message<Bidon>> messages = new ArrayList<Message<Bidon>>(); for (int i = 1; i <= 5; i++) { Bidon bidon = new Bidon(); bidon.setId(String.valueOf(i)); Message<Bidon> message = MessageBuilder.withPayload(bidon) .setHeader(JmsHeaders.CORRELATION_ID, "test123") .setSequenceSize(5) .setSequenceNumber(i) .build() ; messages.add(message); } return messages; } @CorrelationStrategy public Object getCorrelationKey(Message<?> message) { return message.getHeaders().get(JmsHeaders.CORRELATION_ID); } public Message<Bidon> processMessage(Message<Bidon> message) throws Exception { System.out.println("processed " + message.getPayload().getId()); if(true) { throw new Exception("dummy exception"); } Message returnMessage = MessageBuilder.withPayload(message.getPayload()) .setHeader(JmsHeaders.CORRELATION_ID, message.getHeaders().get(JmsHeaders.CORRELATION_ID)) .setSequenceSize(message.getHeaders().getSequenceSize()) .setSequenceNumber(message.getHeaders().getSequenceNumber()) .build() ; return returnMessage ; } public Message<Bidon> aggregate(List<Message<Bidon>> messages) { StringBuilder sb = new StringBuilder(); for(Message<Bidon> message : messages) { sb.append(message.getPayload().getId()); } Bidon bidon = new Bidon(); bidon.setId(sb.toString()); Message<Bidon> message = MessageBuilder.withPayload(bidon).build(); return message; } public void readAggregatedResult(Message<Bidon> message) { System.out.println("end : " + message.getPayload().getId()); } }
I've read about specifying the error-channel attribute on the message-driven-channel-adapter, but that didn't fix my loop problem.
Instead of an infinite loop, I would like the exception to be stored in a channel. On this channel, I would log its content.
Does anyone knows why the message keeps being requeued ?
I have uploaded the maven eclipse project to be able to test.
Thank you !


Reply With Quote
