Results 1 to 10 of 10

Thread: FTP inbound adapter with poller sometimes stops

  1. #1
    Join Date
    Sep 2011
    Posts
    12

    Default FTP inbound adapter with poller sometimes stops

    Hi, we are using ftp:inbound-channel-adapter with poller (Spiring Integration 2.0.4.RELEASE):

    Code:
    <int-ftp:inbound-channel-adapter channel="epoSourceChannel" local-directory="${ftp.localFolder}"
    		session-factory="ftpSessionFactory" remote-directory="${ftp.remoteFolder}"
    		cache-sessions="false" filter="epoFilenameFilter"> 
    		<int:poller max-messages-per-poll="-1" cron="0 * 2-22 * * *" />
    </int-ftp:inbound-channel-adapter>
    epoFilenameFilter bean is implementation of FileListFilter. Processed files are not deleted from FTP server so we check for new files by their filenames (they contain timestamps, latest processed timestamp is stored). There are other integration elements for processing of downloaded files but they shouldn't be important here. This all works under Tomcat 6, loaded from web.xml of SWF-JSF-webapplication.

    Here is the problem. Sometimes (and lately - more often, like once a week) this ftp:inbound-channel-adapter together with poller silently stops. This looks like this: every minute epoFilenameFilter logs information about processing of files. And then at one moment it just stops - logs from filter (and of course from next elements in processing chain) do not come up more. I've set DEBUG logging level and added LOG statement specifically in the beginning of filter method - so each filtering log message was prepended by "connected" message from FTP session factory The last messages were:

    Code:
    2012-02-05 18:17:00,053 DEBUG [task-scheduler-5] org.springframework.integration.ftp.session.DefaultFtpSessionFactory: Connected to server [ftp.server.com:21]
    2012-02-05 18:17:00,591 INFO  [task-scheduler-5] com.example.integration.epoimport.filter.EpoFilenameFilter: Filtering files...
    2012-02-05 18:17:00,593 INFO  [task-scheduler-5] com.example.integration.epoimport.filter.EpoFilenameFilter: Looking for ePO from 05.02.2012 17:56:37...
    2012-02-05 18:17:00,598 INFO  [task-scheduler-5] com.example.integration.epoimport.filter.EpoFilenameFilter: Number of new ePO: 0
    2012-02-05 18:18:00,061 DEBUG [task-scheduler-3] org.springframework.integration.ftp.session.DefaultFtpSessionFactory: Connected to server [ftp.server.com:21]
    And after that integration beans do not send any log messages at all. From what I see it looks like either an infinite loop somewhere between session creation and calling of filter method or like simultaneous fail of both ftp inbound adapter and poller - but this can as well be far away from truth.

    The moment is rather arbitrary and I can't find any relations to any actions/movements in application (and it's not at 22 o'clock ). For example, that happened at 16:00, 08:48 and at 18:18. Logs contain no other errors, application continues to work as usual.

    Are there any suggestions as what to look at additionally to find out real cause of problem?
    Last edited by yozh-tema; Feb 6th, 2012 at 03:27 AM. Reason: spring integration version added

  2. #2
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,037

    Default

    I am guessing the thread (in this case task-scheduler-3) is hanging trying to read data from your server. You can confirm this by taking a thread dump using jstack or visualvm.

    We don't currently expose the setDataTimeout() and setDefaultTimeout() methods on the FtpClient, so the socket SO_TIMEOUT option is not set, and so tcp will wait indefinitely for data; we probably should allow these to be set, so please open an Improvement JIRA here https://jira.springsource.org/browse/INT
    Last edited by Gary Russell; Feb 6th, 2012 at 08:14 AM.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  3. #3
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,037

    Default

    If you want to see if this solves your problem, you could try making your own session factory....

    Code:
    public class MyFtpSessionFactory extends AbstractFtpSessionFactory<FTPClient> {
    
    	@Override
    	protected FTPClient createClientInstance() {
    		FTPClient ftpClient = new FTPClient();
    		ftpClient.setDataTimeout(5000);
    		ftpClient.setDefaultTimeout(5000);
    		return ftpClient;
    	}
    
    }
    And use an instance of that instead of DefaultFtpSessionFactory.

    Of course, when these timeouts occur, you still won't get any data, but you should see log activity.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  4. #4
    Join Date
    Sep 2011
    Posts
    12

    Default

    Thank you Gary.
    I raised JIRA issue https://jira.springsource.org/browse/INT-2429 (didn't find FTP component there though). I'll try to use own session factory.

  5. #5
    Join Date
    Jan 2008
    Location
    Mohnton, PA USA (that's near Philadelphia)
    Posts
    2,148

    Default

    There is actually another way of solving it and is incorporated into the design. AbstractFtpSessionFactory defines two NOOP methods
    Code:
    /**
    * Will handle additional initialization after client.connect() method was invoked, 
    * but before any action on the client has been taken 
    */
    protected void postProcessClientAfterConnect(T t) throws IOException {
    	// NOOP
    }
    /**
    * Will handle additional initialization before client.connect() method was invoked. 
    */
    protected void postProcessClientBeforeConnect(T client) throws IOException {
    	// NOOP
    }
    So all you need is to extend DefaultFtpSessionFactory and provide implementation of one or both of these methods. I guess for your case you only need 'postProcessClientBeforeConnect' method.

  6. #6
    Join Date
    Sep 2011
    Posts
    12

    Default

    Hi Oleg,
    thanks for pointing that out, mentioned JIRA improvement is then probably redundant. I think however that these hooks should be documented in reference.

  7. #7
    Join Date
    Jan 2008
    Location
    Mohnton, PA USA (that's near Philadelphia)
    Posts
    2,148

    Default

    Yes Artem, that is correct, it should be documented. I'll change the JIRA to be a documentation issue.

  8. #8
    Join Date
    Sep 2011
    Posts
    12

    Default

    Hi Gary,
    it looks like you were right, I've checked stacktraces next time this problem happened (although this time it was connection timeout, not data reading) and one of the threads showed:

    Code:
    "task-scheduler-4" prio=10 tid=0x00002aabbd39d400 nid=0x1335 in Object.wait() [0x000000004274f000..0x000000004274fda0]
       java.lang.Thread.State: WAITING (on object monitor)
            at java.lang.Object.wait(Native Method)
            at java.lang.Object.wait(Object.java:485)
            at org.apache.commons.net.telnet.TelnetInputStream.read(TelnetInputStream.java:339)
            - locked <0x00002aaadb012f80> (a [I)
            at org.apache.commons.net.telnet.TelnetInputStream.read(TelnetInputStream.java:466)
            at java.io.BufferedInputStream.read1(BufferedInputStream.java:256)
            at java.io.BufferedInputStream.read(BufferedInputStream.java:317)
            - locked <0x00002aaadb010ed8> (a java.io.BufferedInputStream)
            at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:264)
            at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:306)
            at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:158)
            - locked <0x00002aaac2c69910> (a java.io.InputStreamReader)
            at java.io.InputStreamReader.read(InputStreamReader.java:167)
            at java.io.BufferedReader.fill(BufferedReader.java:136)
            at java.io.BufferedReader.readLine(BufferedReader.java:299)
            - locked <0x00002aaac2c69910> (a java.io.InputStreamReader)
            at java.io.BufferedReader.readLine(BufferedReader.java:362)
            at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:264)
            at org.apache.commons.net.ftp.FTP._connectAction_(FTP.java:335)
            at org.apache.commons.net.ftp.FTPClient._connectAction_(FTPClient.java:550)
            at org.apache.commons.net.SocketClient.connect(SocketClient.java:163)
            at org.springframework.integration.ftp.session.AbstractFtpSessionFactory.createClient(AbstractFtpSessionFactory.java:148)
            at org.springframework.integration.ftp.session.AbstractFtpSessionFactory.getSession(AbstractFtpSessionFactory.java:128)
            at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:131)
            at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.receive(AbstractInboundFileSynchronizingMessageSource.java:144)
            at org.springframework.integration.endpoint.SourcePollingChannelAdapter.doPoll(SourcePollingChannelAdapter.java:89)
            at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146)
            at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:144)
            at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:207)
            at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
            at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:48)
            at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
            at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:202)
            at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:51)
            at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
            at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
            at java.util.concurrent.FutureTask.run(FutureTask.java:138)
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98)
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:207)
            at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
            at java.lang.Thread.run(Thread.java:619)
    It seems that default timeout settings of commons-net FTP client aren't very friendly (at least for Linux). Anyway, hopefully setting timeouts will work.

  9. #9
    Join Date
    Jan 2008
    Location
    Mohnton, PA USA (that's near Philadelphia)
    Posts
    2,148

    Default

    Artem

    Now since you know how to adjust the settings it would be nice if at some point you could share what works for you as we may consider setting more friendly defaults on our end so real user stories would definitely help.

  10. #10
    Join Date
    Sep 2011
    Posts
    12

    Default

    Hi Oleg,

    I've implemented described solution:

    Code:
        @Override
        protected void postProcessClientBeforeConnect(FTPClient client) throws IOException {
            super.postProcessClientBeforeConnect(client);
            
            client.setDataTimeout(timeout);
            client.setDefaultTimeout(timeout);
        }
    Timeout set to 30 seconds. So far problem did not repeat. And in two days after deployment logs show new entries:

    Code:
    2012-02-14 11:55:30,222 ERROR [task-scheduler-7] org.springframework.integration.handler.LoggingHandler: org.springframework.integration.MessagingException: Problem occurred while synchronizing remote to local directory
            at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:144)
            at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.receive(AbstractInboundFileSynchronizingMessageSource.java:144)
            at org.springframework.integration.endpoint.SourcePollingChannelAdapter.doPoll(SourcePollingChannelAdapter.java:89)
            at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146)
            at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:144)
            at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:207)
            at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
            at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:48)
            at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
            at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:202)
            at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:51)
            at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
            at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
            at java.util.concurrent.FutureTask.run(FutureTask.java:138)
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98)
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:207)
            at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
            at java.lang.Thread.run(Thread.java:619)
    Caused by: java.net.SocketTimeoutException: Read timed out
            at java.net.SocketInputStream.socketRead0(Native Method)
            at java.net.SocketInputStream.read(SocketInputStream.java:129)
            at java.io.BufferedInputStream.fill(BufferedInputStream.java:218)
            at java.io.BufferedInputStream.read(BufferedInputStream.java:237)
            at java.io.FilterInputStream.read(FilterInputStream.java:66)
            at java.io.PushbackInputStream.read(PushbackInputStream.java:122)
            at org.apache.commons.net.io.FromNetASCIIInputStream.__read(FromNetASCIIInputStream.java:75)
            at org.apache.commons.net.io.FromNetASCIIInputStream.read(FromNetASCIIInputStream.java:170)
            at java.io.BufferedInputStream.fill(BufferedInputStream.java:218)
            at java.io.BufferedInputStream.read(BufferedInputStream.java:237)
            at org.apache.commons.net.telnet.TelnetInputStream.__read(TelnetInputStream.java:114)
            at org.apache.commons.net.telnet.TelnetInputStream.run(TelnetInputStream.java:535)
            ... 1 more
    Hence I think the problem is solved.

    I'm not sure what would I expect from spring-intergation to do in this case: set its own default timeouts or leave as is. Daniel Savarese's reasoning in mentioned commons-net issue is understandable - while 30 sec is OK for our project considering that we check FTP every minute, this might be inappropriate for others. On the other hand, I don't think that infinite timeouts is what most of the users needs and in this case almost everyone would need to extend DefaultFtpSessionFactory only to configure timeouts.

    Thank you again for your help.

Tags for this Thread

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •