Page 1 of 2 12 LastLast
Results 1 to 10 of 12

Thread: Handling connectivity issues in web service outbound gateway (retry)

  1. #1
    Join Date
    May 2011
    Posts
    13

    Question Handling connectivity issues in web service outbound gateway (retry)

    Currently, we have a requirement to attempt a WS request, and in the case that the remote endpoint is down, queue the request for retry. This queue should also be backed by a database, so that if the application goes down, the requests will be retried once it is restarted.

    I am hoping that I can achieve this purely with Spring Integration. My thought was that I could use a WS Outbound Gateway, with a message-store-backed queue channel set as the error channel. The error channel would be polled, and the message would be sent to a retry gateway (via Service Activator) which would also hook up to the WS Outbound Gateway, and basically form a processing loop.

    The idea of the Service Activator and retry Gateway was taken from one of Oleg's SpringOne 2GX examples. The polling is happening properly, but I can't figure out how to "complete the loop" and actually get the retry to happen. Oddly, the poller seems to stop after the retry gateway is invoked.

    Here is what I have currently:

    HTML Code:
            <!-- Will replace this with a JDBC store later -->
            <bean id="testMessageStore" class="org.springframework.integration.store.SimpleMessageStore" />
    
            <int:channel id="wsRequestChannel" />
            <int:channel id="wsReplyChannel" />
            <int:channel id="wsErrorChannel">
                <int:queue message-store="testMessageStore" />
            </int:channel>
    
            <int:channel id="testChannel" />
            <int:chain input-channel="wsErrorChannel" auto-startup="true">
                <int:poller fixed-rate="5000">
                    <int:transactional transaction-manager="transactionManager" />
                </int:poller>
                <int:transformer expression="payload.getFailedMessage().getPayload() + ' '" />
                <!-- Everything seems to work fine up to here, but how do I get it to retry the WS? -->
                <int:service-activator ref="retryGateway" />
            </int:chain>
    
            <int:chain input-channel="wsRequestChannel" output-channel="wsReplyChannel">
                <int-ws:header-enricher>
                    <int-ws:soap-action value="theTargetSoapAction" />
                </int-ws:header-enricher>
                <int-ws:outbound-gateway id="testWsGateway" uri="https://someSOAPService" interceptor="wsSecurityInterceptor" />
            </int:chain>
    
            <int:gateway id="mainGateway" default-request-channel="wsRequestChannel" default-reply-channel="wsReplyChannel" error-channel="wsErrorChannel" service-interface="com.mypackage.IntegrationTestService">
                <int:method name="sendMessage" />
            </int:gateway>
            <int:gateway id="retryGateway" default-request-channel="wsRequestChannel" default-reply-channel="wsReplyChannel" error-channel="wsErrorChannel" />

    Can anyone help with this, or offer up suggestions for alternative approaches?
    Thanks!
    Last edited by ach; Jul 18th, 2012 at 08:03 AM.

  2. #2
    Join Date
    Jan 2009
    Location
    Ukraine, Kharkov
    Posts
    644

    Default

    Hello

    If you're going to use persistent MessageStore for <queue>, you just need to configure <transactional> <poller> on <ws:outbound-gateway> and after Exception on service the transaction will be rollbacked and your failed Message won't be removed from MessageStore. And on the next poll the last one will be sended to service again. So, to make simple retry it's enough to configure message-flow like this:
    HTML Code:
     <int:channel id="wsRequestChannel">
       <int:queue message-store="messageStore" />
    </int:channel>
    
    <int:chain input-channel="wsRequestChannel" output-channel="wsReplyChannel">
       <int:poller fixed-rate="5000">
           <int:transactional/>
        </int:poller>
        <int-ws:header-enricher>
            <int-ws:soap-action value="theTargetSoapAction" />
        </int-ws:header-enricher>
        <int-ws:outbound-gateway id="testWsGateway" uri="https://someSOAPService" interceptor="wsSecurityInterceptor" />
    </int:chain>
    However be careful about <gateway> as message-flow entry point: Message Store. "Important" paragraph.

    Take care,
    Artem Bilan

  3. #3
    Join Date
    May 2011
    Posts
    13

    Default

    Thanks for the reply. The reason why my approach may seem a bit convoluted, is that I would like for the first attempt at the web service call to be synchronous -- only when it fails should it be retried asynchronously. In this scenario, the poller rate would be something much slower: 5 mins, for example.

  4. #4
    Join Date
    Jan 2009
    Location
    Ukraine, Kharkov
    Posts
    644

    Default

    OK, let it be
    But <gateway> is still bad idea for <queue> with persistent MessageStore.
    Nevertheless, how do you plan to process WS-response in the asynchronous retry sub-flow?

    By the way, what you're looking for looks like this: http://forum.springsource.org/showthread.php?111867

    Give me, please, more time to figure out how it should be in your case...

  5. #5
    Join Date
    May 2011
    Posts
    13

    Default

    Quote Originally Posted by Cleric View Post
    OK, let it be
    But <gateway> is still bad idea for <queue> with persistent MessageStore.
    The use of a Gateway was mainly just because it's an easy way to start the message flow for my small proof-of-concept. I don't necessarily have to use it.

    Quote Originally Posted by Cleric View Post
    Nevertheless, how do you plan to process WS-response in the asynchronous retry sub-flow?
    Probably just via some service method (is this possible with a Service Activator?).

    Quote Originally Posted by Cleric View Post
    By the way, what you're looking for looks like this: http://forum.springsource.org/showthread.php?111867
    Thanks, I will take a look at that.
    Last edited by ach; Jul 18th, 2012 at 09:31 AM.

  6. #6
    Join Date
    Jan 2009
    Location
    Ukraine, Kharkov
    Posts
    644

    Default

    Hi, again

    So, I've builded some approximate test for your use case:
    HTML Code:
    <int-jdbc:message-store id="messageStore" data-source="dataSource"/>
    
    	<channel id="input"/>
    
    	<channel id="output">
    		<queue/>
    	</channel>
    
    	<channel id="retryChannel">
    		<!--<queue/>-->
    		<queue message-store="messageStore"/>
    	</channel>
    
    	<gateway service-interface="org.springframework.integration.jdbc.RetryViaMessageStoreTests$TestGateway"
    			 error-channel="retryChannel">
    		<method name="send" request-channel="input"/>
    		<method name="receive" reply-channel="output"/>
    	</gateway>
    
    	<service-activator input-channel="input" output-channel="output" ref="service" method="serviceIt"/>
    
    	<beans:bean id="service" class="org.springframework.integration.jdbc.RetryViaMessageStoreTests$TestService"/>
    
    	<chain input-channel="retryChannel">
    		<poller fixed-rate="100">
    			<transactional/>
    		</poller>
    		<transformer expression="payload.failedMessage.payload"/>
    		<gateway request-channel="input"/>
    	</chain>
    Code:
    @ContextConfiguration
    @RunWith(SpringJUnit4ClassRunner.class)
    public class RetryViaMessageStoreTests {
    
    	@Autowired
    	private TestGateway gateway;
    
    	@Test
    	public void testIt() throws InterruptedException {
    		gateway.send("test");
    		assertEquals("TEST", gateway.receive());
    	}
    
    
    	private static interface TestGateway {
    
    		void send(String s);
    
    		String receive();
    	}
    
    	private static class TestService {
    
    		private AtomicInteger counter = new AtomicInteger();
    
    		public String serviceIt(String s) {
    			System.out.println(counter);
    			if (10 == counter.getAndIncrement()) {
    				return s.toUpperCase();
    			}
    			throw new RuntimeException("intentional");
    
    		}
    
    	}
    
    }
    Here you can replace my <service-activator> with your WS-flow. If you change the 'retryChannel' <queue> to SimpleMessageStore you're going to lose message after first retry effort and your upstream invocation will hang on 'gateway.receive()'.
    My test is passed after 10 retries.
    Nevertheless you should try to use real robust solution with Spring-retry as it will be done after solving the issue: https://jira.springsource.org/browse/INT-343

    Good luck

  7. #7
    Join Date
    May 2011
    Posts
    13

    Default

    Thank you again for your help!

    I tried adapting your example, but for some reason the retry is only happening once... here is my configuration:

    HTML Code:
            <int-jdbc:message-store id="testMessageStore" data-source="commonDataSource" region="TestRegion" />
    
            <int:channel id="wsRequestChannel" />
            <int:channel id="wsReplyChannel">
                <int:queue/>
            </int:channel>
            <int:channel id="wsRetryChannel">
                <int:queue message-store="testMessageStore" />
            </int:channel>
    
            <int:gateway error-channel="wsRetryChannel" service-interface="com.myorg.myapp.ws.IntegrationTestService">
                <int:method name="sendMessage" request-channel="wsRequestChannel" />
                <int:method name="receiveMessage" reply-channel="wsReplyChannel" />
            </int:gateway>
    
            <int:chain input-channel="wsRetryChannel">
                <int:poller fixed-rate="5000">
                    <int:transactional />
                </int:poller>
                <int:transformer expression="payload.failedMessage.payload" />
                <int:gateway request-channel="wsRequestChannel" />
            </int:chain>
    
            <int:chain input-channel="wsRequestChannel" output-channel="wsReplyChannel">
                <int-ws:header-enricher>
                    <int-ws:soap-action value="mySoapAction" />
                </int-ws:header-enricher>
                <int-ws:outbound-gateway id="testWsGateway" uri="https://myServiceUrl" interceptor="wsSecurityInterceptor" />
            </int:chain>
    And here are the last log statements that I see after the first retry:

    Code:
    Caused by: org.springframework.ws.client.WebServiceTransportException: Not Found [404]
    	at org.springframework.ws.client.core.WebServiceTemplate.handleError(WebServiceTemplate.java:627)
    	at org.springframework.ws.client.core.WebServiceTemplate.doSendAndReceive(WebServiceTemplate.java:551)
    	at org.springframework.ws.client.core.WebServiceTemplate.sendAndReceive(WebServiceTemplate.java:502)
    	at org.springframework.integration.ws.SimpleWebServiceOutboundGateway.doHandle(SimpleWebServiceOutboundGateway.java:88)
    	at org.springframework.integration.ws.AbstractWebServiceOutboundGateway.handleRequestMessage(AbstractWebServiceOutboundGateway.java:176)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:97)
    	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
    	... 83 more
    2012-07-18 13:25:00 QueueChannel [DEBUG] preSend on channel 'wsErrorChannel', message: [Payload=org.springframework.integration.MessageHandlingException: error occurred in message handler [org.springframework.integration.ws.SimpleWebServiceOutboundGateway#882dfc]][Headers={timestamp=1342632300160, id=2f7ed54b-d912-400c-8f61-3a90241272ef, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@2b9562b6, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@2b9562b6}]
    2012-07-18 13:25:00 QueueChannel [DEBUG] postSend (sent=true) on channel 'wsErrorChannel', message: [Payload=org.springframework.integration.MessageHandlingException: error occurred in message handler [org.springframework.integration.ws.SimpleWebServiceOutboundGateway#882dfc]][Headers={timestamp=1342632300160, id=2f7ed54b-d912-400c-8f61-3a90241272ef, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@2b9562b6, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@2b9562b6}]
    The above is the same behavior I was seeing with the configuration that I posted initially.

  8. #8
    Join Date
    May 2011
    Posts
    13

    Default

    Please disregard my last post. There was an issue with my IDE not picking up the changes to the config file -- it was not picking up my change to the MessageStore. This is exactly what I was looking for, thanks!

  9. #9
    Join Date
    May 2011
    Posts
    13

    Default

    I'm interested in somehow packaging the above up in some kind of a reusable bundle, with certain parameters like dataSource/region (for the message store), soapAction/wsEndpointUri (for the WS gateway), etc. Initially I thought that I could implement my own Gateway that creates the various components programmatically, but it doesn't seem like the SI API is well suited for that (and the documentation doesn't cover it very well). Do you have any suggestion on the best way for me to achieve this?

  10. #10
    Join Date
    Jan 2009
    Location
    Ukraine, Kharkov
    Posts
    644

    Default

    Hello

    At a glance it seems like you need this one: https://jira.springsource.org/browse/INT-1685
    But you say this one:
    with certain parameters like dataSource/region (for the message store), soapAction/wsEndpointUri (for the WS gateway), etc
    So, try to investigate this sample: https://github.com/SpringSource/spri...ed/dynamic-ftp

    Take care,
    Artem

Tags for this Thread

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •