Page 1 of 2 12 LastLast
Results 1 to 10 of 12

Thread: Handling Failures With StatefulRetryOperationsInterceptorFactoryBean

  1. #1

    Default Handling Failures With StatefulRetryOperationsInterceptorFactoryBean

    Hi all,

    I have been using spring-amqp for quite awhile in many production projects and for the most part I handle errors by injecting an amqpTemplate it my messageListeners and simply republish to an exchange on failure (exception, etc).

    I've been reading the docs again and discovered StatefulRetryOperationsInterceptor but I cannot find any information on how to use it. Ideally, I'd like to retry messages 3 times and if they still fail publish them to my error exchange for inspection and re-attempts.

    Thanks,
    James

  2. #2

    Default

    So I looked through the code and discovered why it just wouldn't work for me... it wants a messageId header of which my messages don't have one!

    So I keep getting "Illegal null id in message. Failed to manage retry for message".

    Aside from putting messageIds in my code, any ideas?

  3. #3
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,844

    Default

    Can you provide the full stack trace here? (or at least the relevant part).

    Thanks,
    Mark

  4. #4

    Default

    Sure... here's the relavant portion:


    Code:
    INFO  ContextLoader - Root WebApplicationContext: initialization started
    INFO  AnnotationConfigWebApplicationContext - Refreshing Root WebApplicationContext: startup date [Fri Nov 11 16:13:42 CST 2011]; root of context hierarchy
    INFO  AnnotationConfigWebApplicationContext - Successfully resolved class for [com.carfax.blueprint.amqp.ApplicationConfig]
    INFO  XmlBeanDefinitionReader - Loading XML bean definitions from class path resource [com/carfax/blueprint/amqp/jndi-context.xml]
    INFO  DefaultListableBeanFactory - Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@10ace8d: defining beans [org.springframework.context.annotation.internalConfigurationAnnotationProcessor,org.springframework.context.annotation.internalAutowiredAnnotationProcessor,org.springframework.context.annotation.internalRequiredAnnotationProcessor,org.springframework.context.annotation.internalCommonAnnotationProcessor,applicationConfig,stolenQueue,serviceQueue,stolenBinding,serviceBinding,rabbitAdmin,serviceListener,stolenListener,retryInterceptor,amqpConnectionFactory]; root of factory hierarchy
    WARN  SimpleMessageListenerContainer - CachingConnectionFactory's channelCacheSize can not be less than the number of concurrentConsumers so it was reset to match: 2
    INFO  DefaultLifecycleProcessor - Starting beans in phase 2147483647
    INFO  ContextLoader - Root WebApplicationContext: initialization completed in 625 ms
    INFO  SimpleMessageListenerContainer - Execution of Rabbit message listener failed, and no ErrorHandler has been set: class org.springframework.amqp.rabbit.listener.FatalListenerExecutionException: Illegal null id in message. Failed to manage retry for message: (Body:'{"make":"Honda","model":"Prelude","year":"1985"}'; ID:null; Content:application/json; Headers:{__TypeId__=com.carfax.blueprint.amqp.Vehicle}; Exchange:vehicle_history_changes; RoutingKey:vehicle.history.stolen; Reply:null; DeliveryMode:PERSISTENT; DeliveryTag:1)
    ERROR SimpleMessageListenerContainer - Consumer received fatal exception during processing
    org.springframework.amqp.rabbit.listener.FatalListenerExecutionException: Illegal null id in message. Failed to manage retry for message: (Body:'{"make":"Honda","model":"Prelude","year":"1985"}'; ID:null; Content:application/json; Headers:{__TypeId__=com.carfax.blueprint.amqp.Vehicle}; Exchange:vehicle_history_changes; RoutingKey:vehicle.history.stolen; Reply:null; DeliveryMode:PERSISTENT; DeliveryTag:1)
            at org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean$3.getKey(StatefulRetryOperationsInterceptorFactoryBean.java:105)
            at org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor.invoke(StatefulRetryOperationsInterceptor.java:142)
            at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
            at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)
            at $Proxy38.invokeListener(Unknown Source)
            at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:560)
            at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:452)
            at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:436)
            at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:420)
            at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$200(SimpleMessageListenerContainer.java:56)
            at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:505)
            at java.lang.Thread.run(Thread.java:662)

  5. #5

    Default

    Also, here's the relevant code:

    Code:
            @Bean
    	public SimpleMessageListenerContainer stolenListener(){
    		SimpleMessageListenerContainer container= new SimpleMessageListenerContainer(amqpConnectionFactory);
    		container.setQueueNames(stolenQueue().getName());
    		container.setAutoStartup(true);
    		container.setConcurrentConsumers(2);
    		container.setErrorHandler(errorHandler());
    		container.setAdviceChain(new Advice[]{retryInterceptor()});
    		container.setMessageListener(messageListener(new StolenRecordListener()));
    		return container;
    	}
    	@Bean
    	public Advice retryInterceptor(){
    		StatefulRetryOperationsInterceptorFactoryBean retry = new StatefulRetryOperationsInterceptorFactoryBean();
    		RetryTemplate retryTemplate = new RetryTemplate();
    		SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    		retryPolicy.setMaxAttempts(10);
    		retryTemplate.setRetryPolicy(retryPolicy);
    		retry.setRetryOperations(retryTemplate);
    		return retry.getObject();
    	}

  6. #6

    Default

    Just a quick note... for now, I'm just trying to define behavior so that it retries the message 10 times (rather than the rapid fire retry forever everytime it can get it that is the default). I'm hoping to move on to more advanced stuff like submitting the message to an error queue in the long run.

  7. #7

    Default

    Okay, I spent some time on this during the morning and figured it out... the default behavior is that it looks for a message ID but there is nothing that mandates a message id. Hence, the default setup never works.

    Anyhow, I created a setup that works and pushed it out to our example repository. Could you let me know if there is something I am doing wrong or I could do better? It works, just curious if there are better ways.

    https://github.com/CARFAX/blueprint-...ionConfig.java

  8. #8
    Join Date
    Jun 2005
    Posts
    4,232

    Default

    If you don't control he producer there's nothing else you can do for a stateful retry than add a keyGenerator (as you have done). If you do control the producer you can add the message id, right? In particular if the producer is another Spring AMQP client then the AbstractMessageConverter (base class for all our converters) has a createMessageIds flag you can set.

    We can certainly improve the documentation here, and I'd like to see more people using Spring Retry because, as you have hopefully found, it is quite a powerful abstraction. I was going to write a blog and so far only got a few paragraphs down. I'll have to think about bumping it up the priority list.

  9. #9
    Join Date
    Aug 2009
    Posts
    28

    Default

    +1 for a blog entry covering this.

    I have done a quick retry implementation that works find for an older spring 2.5 project using the raw java amqp library, but my newer project based on spring-integration 2.1 / spring 3.1 is not yet working with a broker death and/or restart. It appears that this should work out-of-box, but I am assuming with how recent the retry stuff was added, plus hitting a moving target with the rabbitmq might make this sensitive to the configuration (my setup is pre-release on almost everything). A blog might be a real help for me

    I haven't spent much time trying to get the retry working yet because I hope it will just fall into place on its own as integration / rabbitmq / spring 3.1 gets closer to release on the versions I am using. Most everything else has with these projects has so far for me - keep up the good work!

  10. #10
    Join Date
    Jun 2005
    Posts
    4,232

    Default

    A standard Spring AMQP consumer should survive broker death/restart, so if your Spring Integration adapter is not working that way it might be a good idea to raise a JIRA ticket (in INT) with some more detail of what you are doing, so it can be checked before 2.1 is final (soon).

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •