Inbound FTP - Polling sub directories
I am trying to integrate/invoke Batch using Admin & Integration.
The requirement is: We have a set of Batch jobs (read/write files) running and want to add the ability to read and write files from FTP folder.
The way I plan to do this is: Create sub folders in the FTP for each of these Batch Jobs. Using FTP Adapter, read the files and put it in local folder.
I want to have a single FTP Adapter that polls all the sub directories instead of having an adapter for each sub-folder. Is this possible?
[SOLVED] - Yiiiiiiihaaa !
Hi guys, here is my solution, just to share, probably not the best, but it seem to work:
Code:
package com.xxxxx.yyyyy.modules.ftp;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import org.apache.commons.net.ftp.FTPFile;
import org.springframework.integration.MessagingException;
import org.springframework.integration.file.remote.session.Session;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
/**
* An implementation of {@link AbstractInboundFileSynchronizer} for FTP.
*
* @author Iwein Fuld
* @author Josh Long
* @author Mark Fisher
* @since 2.0
*/
public class FtpInboundFileSynchronizer extends AbstractInboundFileSynchronizer<FTPFile> {
private SessionFactory sessionFactory;
private volatile String remoteDirectory;
private String remoteFileSeparator;
private boolean deleteRemoteFiles;
private boolean recursive;
/**
* Create a synchronizer with the {@link SessionFactory} used to acquire
* {@link Session} instances.
*
* @param recursive
*/
public FtpInboundFileSynchronizer(SessionFactory sessionFactory, boolean recursive) {
super(sessionFactory);
this.sessionFactory = sessionFactory;
this.recursive = recursive;
}
@Override
public void setRemoteDirectory(String remoteDirectory) {
super.setRemoteDirectory(remoteDirectory);
this.remoteDirectory = remoteDirectory;
}
@Override
public void synchronizeToLocalDirectory(File localDirectory) {
synchronizeToLocalDirectory(localDirectory, this.remoteDirectory);
}
private void synchronizeToLocalDirectory(File localDirectory, String rDir) {
Session session = null;
try {
session = this.sessionFactory.getSession();
Assert.state(session != null, "failed to acquire a Session");
FTPFile[] files = session.<FTPFile> list(rDir);
if (recursive) {
logger.info(files.length + " file(s) retrieved.");
for (FTPFile file : files) {
FTPFile f = (FTPFile) file;
if (f.isDirectory()) {
logger.info("directory found: " + f.getName());
synchronizeToLocalDirectory(localDirectory, rDir + this.remoteFileSeparator + f.getName());
}
}
}
if (!ObjectUtils.isEmpty(files)) {
Collection<FTPFile> filteredFiles = this.filterFiles(files);
for (FTPFile file : filteredFiles) {
if (file != null) {
this.rcopyFileToLocalDirectory(rDir, file, localDirectory, session);
}
}
}
} catch (IOException e) {
throw new MessagingException("Problem occurred while synchronizing remote to local directory", e);
} finally {
if (session != null) {
try {
session.close();
} catch (Exception ignored) {
if (logger.isDebugEnabled()) {
logger.debug("failed to close Session", ignored);
}
}
}
}
}
@Override
public void setRemoteFileSeparator(String remoteFileSeparator) {
super.setRemoteFileSeparator(remoteFileSeparator);
Assert.notNull(remoteFileSeparator, "'remoteFileSeparator' must not be null");
this.remoteFileSeparator = remoteFileSeparator;
}
@Override
public void setDeleteRemoteFiles(boolean deleteRemoteFiles) {
super.setDeleteRemoteFiles(deleteRemoteFiles);
this.deleteRemoteFiles = deleteRemoteFiles;
}
private void rcopyFileToLocalDirectory(String remoteDirectoryPath, FTPFile remoteFile, File localDirectory, Session session) throws IOException {
String remoteFileName = this.getFilename(remoteFile);
String remoteFilePath = remoteDirectoryPath + remoteFileSeparator + remoteFileName;
if (!this.isFile(remoteFile)) {
if (logger.isDebugEnabled()) {
logger.debug("cannot copy, not a file: " + remoteFilePath);
}
return;
}
File localFile = new File(localDirectory, remoteFileName);
if (!localFile.exists()) {
String tempFileName = localFile.getAbsolutePath() + getTemporaryFileSuffix();// this.temporaryFileSuffix;
File tempFile = new File(tempFileName);
InputStream inputStream = null;
FileOutputStream fileOutputStream = new FileOutputStream(tempFile);
try {
session.read(remoteFilePath, fileOutputStream);
} catch (Exception e) {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
} else {
throw new MessagingException("Failure occurred while copying from remote to local directory", e);
}
} finally {
try {
if (inputStream != null) {
inputStream.close();
}
} catch (Exception ignored1) {
}
try {
fileOutputStream.close();
} catch (Exception ignored2) {
}
}
if (tempFile.renameTo(localFile)) {
if (this.deleteRemoteFiles) {
session.remove(remoteFilePath);
if (logger.isDebugEnabled()) {
logger.debug("deleted " + remoteFilePath);
}
}
}
}
}
@Override
protected boolean isFile(FTPFile file) {
return file != null && file.isFile();
}
@Override
protected String getFilename(FTPFile file) {
return (file != null ? file.getName() : null);
}
}
And then i use this class instead of the original one for my FTP adapter. Thanks for your help!