-
Feb 20th, 2012, 10:54 AM
#1
Retry Mechanism with delay using spring retry framework
Good morning Dave.
Dave I am trying to you spring's retry framework (org.springframework.retry).
I have tested out retry mechanism with the attached configuration.
I wanted to find if I could set delay (for example 1 minute) before each retry.
I appreciate and thank you for your feedback.
Thanks
Venkat
Parent Queue Configuration:
----------------------------
@Configuration
public abstract class AbstractMDBRabbitConfiguration {
protected abstract void configureMDBTemplate(DMBTemplate template);
@Bean
public ConnectionFactory connectionFactory() {
String rabbitMQUser = "guest";
String rabbitMQPassword = "guest";
String rabbitMQHost = "localhost";
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitMQHost);
connectionFactory.setChannelCacheSize(10);
connectionFactory.setUsername(rabbitMQUser);
connectionFactory.setPassword(rabbitMQPassword);
return connectionFactory;
}
@Bean
public DMBTemplate rabbitTemplate() {
DMBTemplate template = new DMBTemplate(connectionFactory());
template.setMessageConverter(messageConverter());
configureMDBTemplate(template);
return template;
}
@Bean
public MessageConverter messageConverter() {
SimpleMessageConverter messageConverter = new SimpleMessageConverter();
messageConverter.setCreateMessageIds(true);
return messageConverter;
}
@Bean
public Advice retryInterceptor(){
StatefulRetryOperationsInterceptorFactoryBean retry = new StatefulRetryOperationsInterceptorFactoryBean();
retry.setRetryOperations(retryTemplate());
retry.setMessageRecoverer(messageRecoverer());
return retry.getObject();
}
@Bean
public RetryTemplate retryTemplate(){
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
@Bean
public MessageRecoverer messageRecoverer() {
return new DMBMessageRecoverer();
}
@Bean
public AmqpAdmin amqpAdmin() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
return rabbitAdmin ;
}
}
------------------------------------------------------------------------------------------
Concrete Queue Configuration:
------------------------------
@Configuration
public class WriteToFileConfig extends AbstractMDBRabbitConfiguration {
private static final String MDB_AUDIT_EXCHANGE_NAME = "write.tofile.exchange";
/** The Constant MDB_QUEUE_NAME. */
private static final String MDB_QUEUE_NAME = "write.tofile.queue";
/** The Constant MDB_ROUTING_KEY. */
private static final String MDB_ROUTING_KEY = MDB_QUEUE_NAME;
@Override
protected void configureMDBTemplate(DMBTemplate template) {
template.setExchange(MDB_AUDIT_EXCHANGE_NAME);
template.setRoutingKey(MDB_ROUTING_KEY);
}
@Bean
public Queue writeQueue() {
Map<String, Object> args = new HashMap<String, Object>();
String haPolicyValue = CommonsConfigurationServiceImpl.getPropertyValue(C ommonsConfigurationConstants.RABBIT_MQ_CONFIG_NAME , "rabbitmq.ha.policy");
args.put("x-ha-policy", haPolicyValue);
Queue queue = new Queue(MDB_QUEUE_NAME,true,false,false,args);
return queue;
}
@Bean
public TopicExchange writeExchange() {
TopicExchange exchange = new TopicExchange(MDB_AUDIT_EXCHANGE_NAME,true,false);
return exchange;
}
@Bean
public Binding writeBinding() {
return BindingBuilder.bind(writeQueue()).to(writeExchange ()).with(MDB_ROUTING_KEY);
}
}
-----------------------------------------------------------------------------------------------------------------------
consumer configuration:
=======================
<context:component-scan base-package="com.pb.test.write.file">
</context:component-scan>
<!-- central log messaging config -->
<bean id="messageListenerContainer" class="org.springframework.amqp.rabbit.listener.Si mpleMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="queueNames" value="write.tofile.queue"/>
<property name="concurrentConsumers" value="5" />
<property name="txSize" value="100" />
<property name="prefetchCount" value="100" />
<property name="adviceChain" >
<list>
<ref bean="retryInterceptor"/>
</list>
</property>
<property name="messageListener" ref="messageListenerAdapter" />
</bean>
<bean id="messageListenerAdapter" class="org.springframework.amqp.rabbit.listener.ad apter.MessageListenerAdapter">
<property name="delegate" ref="writeToFileHandler" />
<property name="messageConverter" ref="messageConverter" />
</bean>
<!-- central log handlers config -->
<bean id="writeToFileHandler" class="com.pb.test.write.file.WriteToFileProcessor ">
</bean>
-
Feb 21st, 2012, 02:17 AM
#2
That all looks fine. For a backoff you need to add a BackOffPolicy to the RetryTemplate when you create it. 1 minute is a bit of a long wait to start with, but it should work.
-
Feb 21st, 2012, 02:56 PM
#3
Good afternoon Dave. Thanks for your feedback.
Dave I tried to use ExponentialBackOffPolicy configured as follows. I have set initial interval as 1000 milliseconds. When I executed test program that throws an exception every time it gets into Queue Consumer, I found that the all retries are using the same value as initialInterval (1000 ms). I placed a break-point in the following method:
public synchronized long getSleepAndIncrement() {
long sleep = this.interval;
if (sleep > maxInterval) {
sleep = (long) maxInterval;
}
else {
this.interval *= this.multiplier;
}
return sleep;
}
}
I noticed that interval get incremented by the multiplier. But the interval value is always 1000 and thus it is only retrying each retry with 1000 ms for all four retries. Could you please let me know if I am missing any setting.
Dave one more thing I noticed is that if I set maxAttempts as five in SimpleTryPolicy it is retrying for ten times. If I keep this number less tha five it works as expected.
Thanks
Venkat
@Bean
public RetryTemplate retryTemplate(){
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(4);
ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
exponentialBackOffPolicy.setInitialInterval(1000L) ;
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.setBackOffPolicy(exponentialBackOffP olicy);
return retryTemplate;
}
-
Feb 22nd, 2012, 02:33 AM
#4
You can cut down some of the confusion by setting the txSize=1 while you debug what is going on. The state for the backoff (and retry) is stored in a retry context keyed on the failed message id. If you have txSize>1 it is possible that the failed message is different from attempt to attempt, so the context will be different. If you have a test listener that fails for every message this is all the more likely. You would hope that in a real system you have relatievly few failures.
-
Feb 22nd, 2012, 09:25 PM
#5
Good evening Dave.
I have tried by setting txSize=1 and prefetchCount=1. No luck. I believe each retry is treated as a new message and thus the Backoff is not in tact.
Thanks
Venkat
-
Feb 23rd, 2012, 08:45 AM
#6
Actually, now you mention it, I think that is a known issue with spring-retry. It was reported here https://jira.springsource.org/browse/BATCH-1795 and there is a suggested patch there but I haven't seen a contributor's agreement or a pull request.
-
Mar 22nd, 2012, 10:06 PM
#7
Good evening Dave I have applied the patch as suggested on the link https://jira.springsource.org/browse/BATCH-1795. Exponential Backoff Policy worked as expected. Today I found that for every retry there is a new connection and channels equal to number of retries are created. I found that there are 2000 connections and 2071 channels created during the period of 2-3 hours as some queues were subjected to retries. Is there a known channel leak with spring retry.
Thanks
Venkat
Posting Permissions
- You may not post new threads
- You may not post replies
- You may not post attachments
- You may not edit your posts
-
Forum Rules