Page 1 of 2 12 LastLast
Results 1 to 10 of 16

Thread: Inbound FTP - Polling sub directories

  1. #1

    Default Inbound FTP - Polling sub directories

    I am trying to integrate/invoke Batch using Admin & Integration.

    The requirement is: We have a set of Batch jobs (read/write files) running and want to add the ability to read and write files from FTP folder.

    The way I plan to do this is: Create sub folders in the FTP for each of these Batch Jobs. Using FTP Adapter, read the files and put it in local folder.

    I want to have a single FTP Adapter that polls all the sub directories instead of having an adapter for each sub-folder. Is this possible?

  2. #2
    Join Date
    Jan 2008
    Location
    Mohnton, PA USA (that's near Philadelphia)
    Posts
    2,148

    Default

    One of the core concept behind an adapter/gateway is to bridge a "particular part/segment" of the remote system with messaging system (e.g., SI in this case) and not to be a generic template for doing remote operations. So when I say "particular part/segment" we actually use the term "addressable" adapter/gateway where an adapter may be specific to a particular URL (in cases of HTTP) or directory (for FTP/SFTP) etc. So in other words no, the adapter will only scan a predefined directory.
    How many directories do you plan to have?

  3. #3
    Join Date
    Jan 2008
    Location
    Mohnton, PA USA (that's near Philadelphia)
    Posts
    2,148

    Default

    Also, you can watch this issues that we will be addressing for 2.1 https://jira.springsource.org/browse/INT-1945. Although it talks about Outbound gateway, in reality it will end up to be a major new feature in SI to allow this level of dynamics - possibly a Template approach which is familiar to Spring users

  4. #4
    Join Date
    Jan 2008
    Location
    Mohnton, PA USA (that's near Philadelphia)
    Posts
    2,148

    Default

    Also, you may want to look at
    org.springframework.integration.file.DirectoryScan ner and its subclasses (e.g., RecursiveLeafOnlyDirectoryScanner). For your use case this may be all you need.

  5. #5

    Default

    Thanks for the responses/inputs. Somehow I did not receive the notifications.
    I have another issue to tackle - After posting, I read the document that states that the Resource Adapters continuously poll(is my understanding correct) which we do not want either. Hence, thinking of using a Jms Listener Gateway and based on the message content, would do FTP to specified Sub folder.
    Hope this is fine.

  6. #6
    Join Date
    Aug 2009
    Location
    Paris, France
    Posts
    32

    Default

    Hi guys,

    I have also the need for a recursive FTP poller. I was thinking about Subclassing the FtpInboundFileSynchronizer and Override method synchronizeToLocalDirectory with a recursive synchronization, do you feel it is a good idea ?

    thanks

  7. #7
    Join Date
    Aug 2009
    Location
    Paris, France
    Posts
    32

    Wink [SOLVED] - Yiiiiiiihaaa !

    Hi guys, here is my solution, just to share, probably not the best, but it seem to work:

    Code:
    package com.xxxxx.yyyyy.modules.ftp;
    
    import java.io.File;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.util.Collection;
    
    import org.apache.commons.net.ftp.FTPFile;
    
    import org.springframework.integration.MessagingException;
    import org.springframework.integration.file.remote.session.Session;
    import org.springframework.integration.file.remote.session.SessionFactory;
    import org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer;
    import org.springframework.util.Assert;
    import org.springframework.util.ObjectUtils;
    
    /**
     * An implementation of {@link AbstractInboundFileSynchronizer} for FTP.
     * 
     * @author Iwein Fuld
     * @author Josh Long
     * @author Mark Fisher
     * @since 2.0
     */
    public class FtpInboundFileSynchronizer extends AbstractInboundFileSynchronizer<FTPFile> {
    
    	private SessionFactory sessionFactory;
    	private volatile String remoteDirectory;
    	private String remoteFileSeparator;
    	private boolean deleteRemoteFiles;
    	private boolean recursive;
    
    	/**
    	 * Create a synchronizer with the {@link SessionFactory} used to acquire
    	 * {@link Session} instances.
    	 * 
    	 * @param recursive
    	 */
    	public FtpInboundFileSynchronizer(SessionFactory sessionFactory, boolean recursive) {
    		super(sessionFactory);
    		this.sessionFactory = sessionFactory;
    		this.recursive = recursive;
    	}
    
    	@Override
    	public void setRemoteDirectory(String remoteDirectory) {
    		super.setRemoteDirectory(remoteDirectory);
    		this.remoteDirectory = remoteDirectory;
    	}
    
    	@Override
    	public void synchronizeToLocalDirectory(File localDirectory) {
    		synchronizeToLocalDirectory(localDirectory, this.remoteDirectory);
    	}
    
    	private void synchronizeToLocalDirectory(File localDirectory, String rDir) {
    		Session session = null;
    		try {
    			session = this.sessionFactory.getSession();
    			Assert.state(session != null, "failed to acquire a Session");
    			FTPFile[] files = session.<FTPFile> list(rDir);
    
    			if (recursive) {
    
    				logger.info(files.length + " file(s) retrieved.");
    				for (FTPFile file : files) {
    					FTPFile f = (FTPFile) file;
    					if (f.isDirectory()) {
    						logger.info("directory found: " + f.getName());
    						synchronizeToLocalDirectory(localDirectory, rDir + this.remoteFileSeparator + f.getName());
    					}
    				}
    			}
    
    			if (!ObjectUtils.isEmpty(files)) {
    				Collection<FTPFile> filteredFiles = this.filterFiles(files);
    				for (FTPFile file : filteredFiles) {
    					if (file != null) {
    						this.rcopyFileToLocalDirectory(rDir, file, localDirectory, session);
    					}
    				}
    			}
    		} catch (IOException e) {
    			throw new MessagingException("Problem occurred while synchronizing remote to local directory", e);
    		} finally {
    			if (session != null) {
    				try {
    					session.close();
    				} catch (Exception ignored) {
    					if (logger.isDebugEnabled()) {
    						logger.debug("failed to close Session", ignored);
    					}
    				}
    			}
    		}
    	}
    
    	@Override
    	public void setRemoteFileSeparator(String remoteFileSeparator) {
    		super.setRemoteFileSeparator(remoteFileSeparator);
    		Assert.notNull(remoteFileSeparator, "'remoteFileSeparator' must not be null");
    		this.remoteFileSeparator = remoteFileSeparator;
    	}
    
    	@Override
    	public void setDeleteRemoteFiles(boolean deleteRemoteFiles) {
    		super.setDeleteRemoteFiles(deleteRemoteFiles);
    		this.deleteRemoteFiles = deleteRemoteFiles;
    	}
    
    	private void rcopyFileToLocalDirectory(String remoteDirectoryPath, FTPFile remoteFile, File localDirectory, Session session) throws IOException {
    		String remoteFileName = this.getFilename(remoteFile);
    		String remoteFilePath = remoteDirectoryPath + remoteFileSeparator + remoteFileName;
    		if (!this.isFile(remoteFile)) {
    			if (logger.isDebugEnabled()) {
    				logger.debug("cannot copy, not a file: " + remoteFilePath);
    			}
    			return;
    		}
    		File localFile = new File(localDirectory, remoteFileName);
    		if (!localFile.exists()) {
    			String tempFileName = localFile.getAbsolutePath() + getTemporaryFileSuffix();// this.temporaryFileSuffix;
    			File tempFile = new File(tempFileName);
    			InputStream inputStream = null;
    			FileOutputStream fileOutputStream = new FileOutputStream(tempFile);
    			try {
    				session.read(remoteFilePath, fileOutputStream);
    			} catch (Exception e) {
    				if (e instanceof RuntimeException) {
    					throw (RuntimeException) e;
    				} else {
    					throw new MessagingException("Failure occurred while copying from remote to local directory", e);
    				}
    			} finally {
    				try {
    					if (inputStream != null) {
    						inputStream.close();
    					}
    				} catch (Exception ignored1) {
    				}
    				try {
    					fileOutputStream.close();
    				} catch (Exception ignored2) {
    				}
    			}
    			if (tempFile.renameTo(localFile)) {
    				if (this.deleteRemoteFiles) {
    					session.remove(remoteFilePath);
    					if (logger.isDebugEnabled()) {
    						logger.debug("deleted " + remoteFilePath);
    					}
    				}
    			}
    		}
    	}
    
    	@Override
    	protected boolean isFile(FTPFile file) {
    		return file != null && file.isFile();
    	}
    
    	@Override
    	protected String getFilename(FTPFile file) {
    		return (file != null ? file.getName() : null);
    	}
    
    }
    And then i use this class instead of the original one for my FTP adapter. Thanks for your help!

  8. #8
    Join Date
    Jan 2008
    Location
    Mohnton, PA USA (that's near Philadelphia)
    Posts
    2,148

    Default

    Can you just extend from it and override public void synchronizeToLocalDirectory(File localDirectory) method?

  9. #9
    Join Date
    Apr 2009
    Posts
    10

    Default

    I stumbled into the same issue. The files on the sftp server are stored in sub-directories (which group files that belong together).

    where / how can I configure a custom SftpInboundFileSynchronizer?

  10. #10
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,040

    Default

    The namespace doesn't currently support injecting a custom synchronizer; you have to assemble the adapter using <bean/> (or @Bean) definitions.

    You need a SourcePollingChannelAdapter, that gets a SftpInboundFileSynchronizingMessageSource in its 'source' property; having passed in your custom synchronizer to the SftpInboundFileSynchronizingMessageSource in its <constructor-arg/>.

    For complete details about how to wire up the message source, see AbstractRemoteFileInboundChannelAdapterParser.
    Last edited by Gary Russell; Jan 17th, 2013 at 11:09 AM.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •