Results 1 to 10 of 10

Thread: why SI always resend message when throw exception

  1. #1

    Default why SI always resend message when throw exception

    hi,All:

    I try to read message from rabbitmq and send those message by email. when smtp authenticate failed, SI will read this message from rabbitmq again and try to resend email because the message still in rabbitmq's queue. this is a cycle, app will resend email again and again when message is in the queue.
    so, How to make SI send error to error channel and remove the last message from rabbitmq?

    and another question, there only one queue defined in my code:ats.test.queue, but I found there have two queues in rabbitmq:ats.test.queue and fromRabbit, fromRabbit is just a channel id, I am confused. so does anybody can help me?

    this is my code
    Code:
             <bean id="mailSender" class="org.springframework.mail.javamail.JavaMailSenderImpl">
    		<property name="host" value="smtp.sina.cn"/>
    		<property name="username" value="xxxx"/>
    		<property name="password" value="xxxx"/>
    	</bean>
    
    	<bean id="mailTransformer" class="com.nanhua.trading.bus.mail.MailTransformer">
    		<property name="mailSender" ref="mailSender"/>
    	</bean>
    	<bean id="mailErrorHandler" class="com.nanhua.trading.bus.mail.MailErrorHandler"/>
    	
    	 
    	<amqp:channel  id="fromRabbit" error-handler="mailErrorHandler" connection-factory="connectionFactory" />
    	  
    	<amqp:inbound-channel-adapter channel="fromRabbit"
    		queue-names="ats.test.queue" connection-factory="connectionFactory" />
    	 
    	<int:channel id="emailChannel"/>
    	<int:transformer input-channel="fromRabbit" output-channel="emailChannel" 
                        ref="mailTransformer"/>
                    
    	<int-mail:outbound-channel-adapter channel="emailChannel" java-mail-properties="javaMailProperties"
                                  mail-sender="mailSender"/>
    	
    	
    	<util:properties id="javaMailProperties">
    	   
    	  <prop key="mail.smtp.auth">true</prop>
    	  
    	</util:properties>
    	 
    	<int:channel id="errorChannel">
         	<int:queue capacity="500"/>
     	</int:channel>
     	<int:poller default="true" fixed-delay="1000" max-messages-per-poll="1"/>
     	
    	<int-stream:stdout-channel-adapter channel="errorChannel" append-newline="true" />
    	
    	<!-- Infrastructure -->
    
    	<rabbit:connection-factory id="connectionFactory"
    		host="127.0.0.1" port="5672" />
    	<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
    	<rabbit:admin connection-factory="connectionFactory"
    		auto-startup="true" />
    	<rabbit:queue name="ats.test.queue" />
    	<rabbit:direct-exchange name="ats.test.exchange">
    		<rabbit:bindings>
    			<rabbit:binding queue="ats.test.queue" key="ats.test.binding" />
    		</rabbit:bindings>
    	</rabbit:direct-exchange>

  2. #2
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,029

    Default

    What does your ErrorHandler do with the exception?


    Regarding your second question...

    Code:
    <amqp:channel ...
    This should be a simple


    Code:
    <int:channel id="fromRabbit"/>
    An <amqp:channel /> is backed by an amqp queue - in other words you are reading from 'ats.test.queue' and simply writing it to 'fromRabbit' queue. that queue is then listened-to and messages are sent you your transformer. Using a simple channel connects the inbound adapter directly to the transformer.
    Last edited by Gary Russell; Apr 1st, 2012 at 10:31 AM.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  3. #3

    Default

    hi,Gary, my error handler do nothing
    Code:
    public class MailErrorHandler implements ErrorHandler {
    	
    	private Logger logger=Logger.getLogger(this.getClass().getName());
    	
    	@Override
    	public void handleError(Throwable t) {
    		logger.info(t.getMessage());
    
    	}
    
    }

  4. #4
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,029

    Default

    Fix the channel - if you are still having problems, run with a DEBUG log. If you can't figure out what's going on, post the log.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  5. #5

    Default

    I have fix the channel, the log is too long, I will post twice
    Code:
    2012-04-01 23:43:14,008 [SimpleAsyncTaskExecutor-1] DEBUG org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Execution of Rabbit message listener failed, and no ErrorHandler has been set.
    org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Listener threw exception
    	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:590)
    	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:529)
    	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:472)
    	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:56)
    	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:103)
    	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:560)
    	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:452)
    	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:436)
    	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:420)
    	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$200(SimpleMessageListenerContainer.java:56)
    	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:505)
    	at java.lang.Thread.run(Thread.java:662)
    Caused by: org.springframework.integration.MessageHandlingException: error occurred in message handler [org.springframework.integration.mail.MailSendingMessageHandler#0]
    	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:79)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:114)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:101)
    	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:61)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128)
    	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
    	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:175)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:159)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:124)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:118)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:100)
    	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:114)
    	...
    	at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:92)
    	at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$200(AmqpInboundChannelAdapter.java:39)
    	at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$1.onMessage(AmqpInboundChannelAdapter.java:73)
    	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:527)
    	... 10 more
    Caused by: org.springframework.mail.MailAuthenticationException: Authentication failed; nested exception is javax.mail.AuthenticationFailedException: 535 #5.7.0 Authentication failed
    
    	... 34 more
    Caused by: javax.mail.AuthenticationFailedException: 535 #5.7.0 Authentication failed

  6. #6

    Default

    Code:
    2012-04-01 23:43:14,010 [SimpleAsyncTaskExecutor-1] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Rejecting messages
    2012-04-01 23:43:14,010 [SimpleAsyncTaskExecutor-1] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Retrieving delivery for Consumer: tag=[amq.ctag-gdmLQKPHZqtzudJCpc32Ht], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), acknowledgeMode=AUTO local queue size=0
    2012-04-01 23:43:14,030 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Storing delivery for Consumer: tag=[amq.ctag-gdmLQKPHZqtzudJCpc32Ht], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), acknowledgeMode=AUTO local queue size=0
    2012-04-01 23:43:14,031 [SimpleAsyncTaskExecutor-1] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Received message: (Body:'{"content":"灏婃暚鐨勭敤鎴穞est29锛歕r\n\r\n\t鎮ㄧ殑瀵嗙爜鏄細56lwoip3锛屾偍鍙互浣跨敤浣犵殑鐢ㄦ埛鍚嶆垨鑰呴偖绠辩櫥褰曠郴缁熴?","email":"quzhe1978@163.com"}'; ID:null; Content:text/plain; Headers:{}; Exchange:ats.test.exchange; RoutingKey:ats.test.binding; Reply:null; DeliveryMode:PERSISTENT; DeliveryTag:7)
    2012-04-01 23:43:14,032 [SimpleAsyncTaskExecutor-1] DEBUG org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper - headerName=[amqp_receivedRoutingKey] WILL be mapped, matched pattern=amqp_receivedRoutingKey
    2012-04-01 23:43:14,032 [SimpleAsyncTaskExecutor-1] DEBUG org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper - headerName=[amqp_deliveryMode] WILL be mapped, matched pattern=amqp_deliveryMode
    2012-04-01 23:43:14,032 [SimpleAsyncTaskExecutor-1] DEBUG org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper - headerName=[amqp_contentType] WILL be mapped, matched pattern=amqp_contentType
    2012-04-01 23:43:14,032 [SimpleAsyncTaskExecutor-1] DEBUG org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper - headerName=[amqp_receivedExchange] WILL be mapped, matched pattern=amqp_receivedExchange
    2012-04-01 23:43:14,032 [SimpleAsyncTaskExecutor-1] DEBUG org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper - headerName=[amqp_contentEncoding] WILL be mapped, matched pattern=amqp_contentEncoding
    2012-04-01 23:43:14,032 [SimpleAsyncTaskExecutor-1] DEBUG org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper - headerName=[amqp_redelivered] WILL be mapped, matched pattern=amqp_redelivered
    2012-04-01 23:43:14,032 [SimpleAsyncTaskExecutor-1] DEBUG org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper - headerName=[amqp_deliveryTag] WILL be mapped, matched pattern=amqp_deliveryTag
    2012-04-01 23:43:14,032 [SimpleAsyncTaskExecutor-1] DEBUG org.springframework.integration.channel.DirectChannel - preSend on channel 'fromRabbit', message: [Payload={"content":""}][Headers={timestamp=1333294994032, id=cfce9911-327c-472d-a8a5-0c6ec8cc7e3e, amqp_receivedRoutingKey=ats.test.binding, amqp_deliveryMode=PERSISTENT, amqp_contentType=text/plain, amqp_receivedExchange=ats.test.exchange, amqp_contentEncoding=UTF-8, amqp_redelivered=true, amqp_deliveryTag=7}]
    2012-04-01 23:43:14,033 [SimpleAsyncTaskExecutor-1] DEBUG org.springframework.integration.transformer.MessageTransformingHandler - org.springframework.integration.transformer.MessageTransformingHandler@4947c4ee received message: [Payload={"content":""}][Headers={timestamp=1333294994032, id=cfce9911-327c-472d-a8a5-0c6ec8cc7e3e, amqp_receivedRoutingKey=ats.test.binding, amqp_deliveryMode=PERSISTENT, amqp_contentType=text/plain, amqp_receivedExchange=ats.test.exchange, amqp_contentEncoding=UTF-8, amqp_redelivered=true, amqp_deliveryTag=7}]
    2012-04-01 23:43:14,038 [SimpleAsyncTaskExecutor-1] DEBUG org.springframework.integration.transformer.MessageTransformingHandler - handler 'org.springframework.integration.transformer.MessageTransformingHandler@4947c4ee' sending reply Message: [Payload=org.springframework.mail.javamail.SmartMimeMessage@d8cd562][Headers={timestamp=1333294994038, id=022e2dee-64f4-4052-96a6-3339959145fd, amqp_receivedRoutingKey=ats.test.binding, amqp_deliveryMode=PERSISTENT, amqp_contentType=text/plain, amqp_receivedExchange=ats.test.exchange, amqp_contentEncoding=UTF-8, amqp_deliveryTag=7, amqp_redelivered=true}]
    2012-04-01 23:43:14,038 [SimpleAsyncTaskExecutor-1] DEBUG org.springframework.integration.channel.DirectChannel - preSend on channel 'emailChannel', message: [Payload=org.springframework.mail.javamail.SmartMimeMessage@d8cd562][Headers={timestamp=1333294994038, id=022e2dee-64f4-4052-96a6-3339959145fd, amqp_receivedRoutingKey=ats.test.binding, amqp_deliveryMode=PERSISTENT, amqp_contentType=text/plain, amqp_receivedExchange=ats.test.exchange, amqp_contentEncoding=UTF-8, amqp_deliveryTag=7, amqp_redelivered=true}]
    2012-04-01 23:43:14,038 [SimpleAsyncTaskExecutor-1] DEBUG org.springframework.integration.mail.MailSendingMessageHandler - org.springframework.integration.mail.MailSendingMessageHandler#0 received message: [Payload=org.springframework.mail.javamail.SmartMimeMessage@d8cd562][Headers={timestamp=1333294994038, id=022e2dee-64f4-4052-96a6-3339959145fd, amqp_receivedRoutingKey=ats.test.binding, amqp_deliveryMode=PERSISTENT, amqp_contentType=text/plain, amqp_receivedExchange=ats.test.exchange, amqp_contentEncoding=UTF-8, amqp_deliveryTag=7, amqp_redelivered=true}]
    2012-04-01 23:43:14,152 [task-scheduler-2] DEBUG org.springframework.integration.endpoint.PollingConsumer - Poll resulted in Message: null
    2012-04-01 23:43:14,152 [task-scheduler-2] DEBUG org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
    those log infos will repeat because the message in rabbitmq

  7. #7
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,029

    Default

    Please also post your configuration now you have changed the channel.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  8. #8

    Default

    here is my config file, thank you!
    Code:
    <bean id="mailSender" class="org.springframework.mail.javamail.JavaMailSenderImpl">
    		<property name="host" value="smtp.sina.cn"/>
    		<property name="username" value="xxxx"/>
    		<property name="password" value="xxxx"/>
    	</bean>
    
    	<bean id="mailTransformer" class="com.nanhua.trading.bus.mail.MailTransformer">
    		<property name="mailSender" ref="mailSender"/>
    	</bean>
    	<bean id="mailErrorHandler" class="com.nanhua.trading.bus.mail.MailErrorHandler"/>
    	
    	 
    	<int:channel  id="fromRabbit"/>
    	  
    	<amqp:inbound-channel-adapter channel="fromRabbit"
    		queue-names="ats.test.queue" connection-factory="connectionFactory" />
    	 
    	<int:channel id="emailChannel"/>
    	<int:transformer input-channel="fromRabbit" output-channel="emailChannel" 
                        ref="mailTransformer"/>
                    
    	<int-mail:outbound-channel-adapter channel="emailChannel" java-mail-properties="javaMailProperties"
                                  mail-sender="mailSender"/>
    	
    	
    	<util:properties id="javaMailProperties">
    	   
    	  <prop key="mail.smtp.auth">true</prop>
    	  
    	</util:properties>
    	 
    	<int:channel id="errorChannel">
         	<int:queue capacity="500"/>
     	</int:channel>
     	<int:poller default="true" fixed-delay="1000" max-messages-per-poll="1"/>
     	
    	<int-stream:stdout-channel-adapter channel="errorChannel" append-newline="true" />
    	
    	<!-- Infrastructure -->
    
    	<rabbit:connection-factory id="connectionFactory"
    		host="127.0.0.1" port="5672" />
    	<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
    	<rabbit:admin connection-factory="connectionFactory"
    		auto-startup="true" />
    	<rabbit:queue name="ats.test.queue" />
    	<rabbit:direct-exchange name="ats.test.exchange">
    		<rabbit:bindings>
    			<rabbit:binding queue="ats.test.queue" key="ats.test.binding" />
    		</rabbit:bindings>
    	</rabbit:direct-exchange>

  9. #9
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,029

    Default

    You need to add an error channel to the inbound adapter...

    Code:
    <amqp:inbound-channel-adapter channel="fromRabbit"
        error-channel="errors" 
        queue-names="ats.test.queue" connection-factory="connectionFactory" />
    
    <int:channel id="errors" />
    	
    <int:logging-channel-adapter channel="errors" level="ERROR" />
    In this case, we just log the exception - you can do what you want on the error flow.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  10. #10

    Default

    resloved, thank you very much!

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •