Sorry for bothering you again, but I'd really like to know if I am doing something wrong.
My Service Activator class looks like this:
Code:
public class ConversationService {
final Logger log = LoggerFactory.getLogger(ConversationService.class);
/**
* simulation of unpredictable error
*/
@Value("${error-probability}")
private Double errorProbability;
/**
* merge needs to be called only if service is called in separate thread then input adapter poller
*/
@Value("${should-merge}")
private Boolean shouldMerge = false;
/**
* max number of retries
*/
@Value("${num-of-retries}")
private Integer numOfRetries = 3;
/**
* Creates conversation and update transaction to processed in case of success. In case it fails, new exception is thrown with the message = current retry count.
*/
@ServiceActivator
public void execute(Message<Transaction> message) throws Exception {
Transaction transaction = (Transaction) message.getPayload();
// I am getting here retry count correctly set from transformer
Integer retryCount = message.getHeaders().get("retryCount")==null?0:(Integer)message.getHeaders().get("retryCount");
log.debug("retryCount: " + retryCount);
try{
double calculated = Math.random();
if(errorProbability > calculated){
log.debug("Calculated: " + calculated + " , so transaction " + transaction.getId() + " will retry.");
throw new Exception("something bad bad happened");
}
else {
// my business logic
}
} catch(Exception e){
if(retryCount.intValue() > numOfRetries){
// do not throw exception, mark transaction as pending
log.debug("change status to pending");
transaction.setStatus(TransactionStatus.pending.name());
if(shouldMerge){
transaction.merge();
}
}
else {
// throw exception
// we need to pass current retry count as a part of the exception, since headers are not preserved when message
// is sent to error channel.
throw new Exception(String.valueOf(retryCount));
}
}
}
}
And my integration application context looks like this:
Code:
<int:channel id="input" datatype="java.util.List"/>
<int:channel id="output"/>
<int:channel id="transaction" datatype="com.ma.siroo.model.Transaction">
<int:dispatcher task-executor="pool"/>
</int:channel>
<int:channel id="errorChannel">
<int:queue capacity="${queue-capacity}"/>
</int:channel>
<task:executor id="pool"
pool-size="${pool-size}"
queue-capacity="${queue-capacity}"
keep-alive="120" rejection-policy="CALLER_RUNS"/>
<task:executor id="poller"
pool-size="1" />
<int:inbound-channel-adapter channel="input" method="receive" ref="transactionInputAdapter">
<int:poller fixed-rate="${fixed-rate}" receive-timeout="${receive-timeout}" max-messages-per-poll="${max-messages-per-poll}" task-executor="poller">
</int:poller>
</int:inbound-channel-adapter>
<bean id="transactionInputAdapter" class="com.ma.siroo.integration.adapter.TransactionInputAdapter">
</bean>
<int:splitter id="transactionSplitter" input-channel="input" output-channel="transaction"/>
<int:gateway id="transactionGateway"
service-interface="com.ma.siroo.integration.gateway.TransactionGateway" default-request-channel="transaction" error-channel="errorChannel"/>
<int:service-activator input-channel="transaction">
<bean class="com.ma.siroo.integration.service.ConversationService"/>
</int:service-activator>
<int:service-activator input-channel="errorChannel" output-channel="transaction">
<bean class="com.ma.siroo.integration.transformer.ErrorHandlingTransformer"/>
<int:poller fixed-rate="${fixed-rate}" task-executor="poller">
</int:poller>
</int:service-activator>
Thanks again,
Milan