1 Attachment(s)
Error handling using message-driven-channel-adapter
Hi,
I'm using spring integration 2.1.3.RELEASE with ActiveMQ.
This is my configuration :
Code:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:context="http://www.springframework.org/schema/context"
xmlns:p="http://www.springframework.org/schema/p" xmlns:int-stream="http://www.springframework.org/schema/integration/stream" xmlns:mongo="http://www.springframework.org/schema/data/mongo"
xmlns:int-jmx="http://www.springframework.org/schema/integration/jmx" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
xsi:schemaLocation="http://www.springframework.org/schema/data/mongo http://www.springframework.org/schema/data/mongo/spring-mongo-1.0.xsd
http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp-2.1.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.1.xsd
http://www.springframework.org/schema/integration/jmx http://www.springframework.org/schema/integration/jmx/spring-integration-jmx-2.1.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.1.xsd
http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream-2.1.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<!-- *************** 1. General Configuration *************** -->
<!-- 1.11 Model to Message converter -->
<bean id="jaxbMessageConverter" class="ca.sunmedia.test.converter.JaxbMessageConverter">
<property name="marshallerProperties">
<map>
<entry>
<key>
<value>jaxb.encoding</value>
</key>
<value type="java.lang.String">UTF-8</value>
</entry>
<entry>
<key>
<value>jaxb.formatted.output</value>
</key>
<value type="java.lang.Boolean">true</value>
</entry>
</map>
</property>
<property name="classesToBeBound">
<list>
<value>ca.sunmedia.test.model.Bidon</value>
</list>
</property>
</bean>
<!-- 1.2 Activates various annotations to be detected in bean classes -->
<context:annotation-config />
<!-- 1.3 Activate the trace of all message history -->
<int:message-history />
<!-- 1.4 Component Scanning -->
<context:component-scan base-package="ca.sunmedia.test.service.impl" />
<!-- *************** 2 ActiveMQ configurations *************** -->
<!-- 2.1 ActiveMQ Conntection configuration -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
</property>
<property name="sessionCacheSize" value="10" />
<property name="cacheProducers" value="false" />
</bean>
<int:channel id="entryChannel" />
<int:channel id="afterSplitterChannel" />
<int:channel id="afterProcessChannel" />
<int:channel id="afterAggregatorChannel" />
<bean id="entryQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="1.entry" />
</bean>
<int-jms:message-driven-channel-adapter channel="entryChannel" destination="entryQueue" message-converter="jaxbMessageConverter"/>
<int-jms:outbound-channel-adapter channel="entryChannel" destination="entryQueue" explicit-qos-enabled="true" delivery-persistent="true" extract-payload="true" message-converter="jaxbMessageConverter"/>
<bean id="afterSplitterQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="2.afterSplitter" />
</bean>
<int-jms:message-driven-channel-adapter channel="afterSplitterChannel" destination="afterSplitterQueue" message-converter="jaxbMessageConverter"/>
<int-jms:outbound-channel-adapter channel="afterSplitterChannel" destination="afterSplitterQueue" explicit-qos-enabled="true" delivery-persistent="true" extract-payload="true" message-converter="jaxbMessageConverter"/>
<bean id="afterProcessQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="3.afterProcess" />
</bean>
<int-jms:message-driven-channel-adapter channel="afterProcessChannel" destination="afterProcessQueue" message-converter="jaxbMessageConverter"/>
<int-jms:outbound-channel-adapter channel="afterProcessChannel" destination="afterProcessQueue" explicit-qos-enabled="true" delivery-persistent="true" extract-payload="true" message-converter="jaxbMessageConverter"/>
<bean id="afterAggregatorQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="4.afterAggregator" />
</bean>
<int-jms:message-driven-channel-adapter channel="afterAggregatorChannel" destination="afterAggregatorQueue" message-converter="jaxbMessageConverter" />
<int-jms:outbound-channel-adapter channel="afterAggregatorChannel" destination="afterAggregatorQueue" explicit-qos-enabled="true" delivery-persistent="true" extract-payload="true" message-converter="jaxbMessageConverter"/>
<int:splitter input-channel="entryChannel" output-channel="afterSplitterChannel" ref="bidonService" method="split" />
<int:service-activator input-channel="afterSplitterChannel" output-channel="afterProcessChannel" ref="bidonService" method="processMessage" />
<!--
<bean id="JMSCorrelationStrategy"
class="ca.sunmedia.test.aggregator.strategy.JMSCorrelationStrategy">
</bean>
-->
<int:aggregator input-channel="afterProcessChannel" output-channel="afterAggregatorChannel" ref="bidonService" method="aggregate" correlation-strategy-method="getCorrelationKey"/>
<int:service-activator input-channel="afterAggregatorChannel" ref="bidonService" method="readAggregatedResult" />
</beans>
The services are in this class :
Code:
package ca.sunmedia.test.service.impl;
import java.util.ArrayList;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.integration.Message;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.annotation.CorrelationStrategy;
import org.springframework.integration.jms.JmsHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.stereotype.Component;
import ca.sunmedia.test.model.Bidon;
@Component(value="bidonService")
public class BidonService {
@Autowired
@Qualifier("entryChannel")
private MessageChannel entryChannel;
public void start() {
Bidon bidon = new Bidon();
bidon.setId("start");
Message<Bidon> message = MessageBuilder.withPayload(bidon).build();
entryChannel.send(message);
}
public List<Message<Bidon>> split(Message<Bidon> input) {
List<Message<Bidon>> messages = new ArrayList<Message<Bidon>>();
for (int i = 1; i <= 5; i++) {
Bidon bidon = new Bidon();
bidon.setId(String.valueOf(i));
Message<Bidon> message = MessageBuilder.withPayload(bidon)
.setHeader(JmsHeaders.CORRELATION_ID, "test123")
.setSequenceSize(5)
.setSequenceNumber(i)
.build() ;
messages.add(message);
}
return messages;
}
@CorrelationStrategy
public Object getCorrelationKey(Message<?> message) {
return message.getHeaders().get(JmsHeaders.CORRELATION_ID);
}
public Message<Bidon> processMessage(Message<Bidon> message) throws Exception {
System.out.println("processed " + message.getPayload().getId());
if(true) {
throw new Exception("dummy exception");
}
Message returnMessage = MessageBuilder.withPayload(message.getPayload())
.setHeader(JmsHeaders.CORRELATION_ID, message.getHeaders().get(JmsHeaders.CORRELATION_ID))
.setSequenceSize(message.getHeaders().getSequenceSize())
.setSequenceNumber(message.getHeaders().getSequenceNumber())
.build() ;
return returnMessage ;
}
public Message<Bidon> aggregate(List<Message<Bidon>> messages) {
StringBuilder sb = new StringBuilder();
for(Message<Bidon> message : messages) {
sb.append(message.getPayload().getId());
}
Bidon bidon = new Bidon();
bidon.setId(sb.toString());
Message<Bidon> message = MessageBuilder.withPayload(bidon).build();
return message;
}
public void readAggregatedResult(Message<Bidon> message) {
System.out.println("end : " + message.getPayload().getId());
}
}
The problem is that when an exception occurs, the message is reinserted in the queue. So this create a loop. In the above class, I generate a dummy exception in the processMessage method.
I've read about specifying the error-channel attribute on the message-driven-channel-adapter, but that didn't fix my loop problem.
Instead of an infinite loop, I would like the exception to be stored in a channel. On this channel, I would log its content.
Does anyone knows why the message keeps being requeued ?
I have uploaded the maven eclipse project to be able to test.
Thank you !