Results 1 to 10 of 10

Thread: Using FTP Inbound Channel Adapter Programmatically

  1. #1

    Default Using FTP Inbound Channel Adapter Programmatically

    Good evening.
    We have a requirement to use ftp inbound channel adapter programmatically.
    In other words it is required to read remote directories from database and set up and run a separate ftp inbound channel adaper for each remote directory.

    First I tried decaratively that worked fine (which is as follows).

    ftp.properties:
    ---------------
    host=127.0.0.1
    username=admin
    password=password
    ===============================
    ftp-inbound-context.xml:
    -----------------------
    <contextroperty-placeholder location="ftp.properties"/>
    <context:component-scan base-package="com.test.si.ftp"/>

    <int-ftp:inbound-channel-adapter id="ftpInbound"
    channel="ftpChannel"
    session-factory="ftpClientFactory"
    filename-regex=".*\.txt$"
    auto-create-local-directory="true"
    delete-remote-files="false"
    remote-directory="/"
    local-directory="/var/output/">
    <intoller fixed-rate="1000"/>
    </int-ftp:inbound-channel-adapter>

    <int:channel id="ftpChannel">
    <int:queue/>
    </int:channel>
    ====================================
    FtpConfiguration.java:
    ----------------------
    @Configuration
    public class FtpConfiguration {

    @Value("${host}")
    private String host;

    @Value("${username}")
    private String username;

    @Value("${password}")
    private String password;

    @Bean
    public DefaultFtpSessionFactory ftpClientFactory() {
    DefaultFtpSessionFactory ftpSessionFactory =
    new DefaultFtpSessionFactory();
    ftpSessionFactory.setHost(host);
    ftpSessionFactory.setPort(2121);
    ftpSessionFactory.setUsername(username);
    ftpSessionFactory.setPassword(password);
    ftpSessionFactory.setClientMode(
    FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE);
    return ftpSessionFactory;
    }
    }
    ==================================================
    Test class:
    -----------
    ApplicationContext context =
    new ClassPathXmlApplicationContext("ftp-inbound-context.xml");
    PollableChannel ftpChannel = context.getBean("ftpChannel", PollableChannel.class);

    Message<?> message = ftpChannel.receive();
    System.out.println("message: " + message);
    ================================================== ===

    The above said ftp inbound channel adapter usage is programmatically done as follows (which didn't work):

    FTPService.java:
    ----------------
    public class FTPService implements Runnable {

    @Override
    public void run() {
    while(true) {
    String host = "127.0.0.1";
    String username = "admin";
    String password = "password";

    DefaultFtpSessionFactory ftpSessionFactory = new DefaultFtpSessionFactory();
    ftpSessionFactory.setHost(host);
    ftpSessionFactory.setPort(2121);
    ftpSessionFactory.setUsername(username);
    ftpSessionFactory.setPassword(password);
    ftpSessionFactory.setClientMode(FTPClient.PASSIVE_ LOCAL_DATA_CONNECTION_MODE);

    FtpInboundFileSynchronizer ftpInboundFileSynchronizer = new FtpInboundFileSynchronizer(ftpSessionFactory);
    ftpInboundFileSynchronizer.setDeleteRemoteFiles(fa lse);
    Pattern pattern = Pattern.compile(".*\\.txt$");
    FtpRegexPatternFileListFilter ftpRegexPatternFileListFilter = new FtpRegexPatternFileListFilter(pattern);
    ftpInboundFileSynchronizer.setFilter(ftpRegexPatte rnFileListFilter);
    ftpInboundFileSynchronizer.setRemoteDirectory("/");

    FtpInboundFileSynchronizingMessageSource ftpInboundFileSynchronizingMessageSource = new FtpInboundFileSynchronizingMessageSource(ftpInboun dFileSynchronizer);
    ftpInboundFileSynchronizingMessageSource.setAutoCr eateLocalDirectory(true);
    ftpInboundFileSynchronizingMessageSource.setAutoSt artup(true);
    ftpInboundFileSynchronizingMessageSource.setLocalD irectory(new File("/var/output"));
    ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
    ftpInboundFileSynchronizingMessageSource.setTaskSc heduler(threadPoolTaskScheduler);
    PollableChannel ftpChannel = new QueueChannel(100);
    ftpInboundFileSynchronizingMessageSource.setOutput Channel(ftpChannel);

    MessageHandler messageHandler = new DelayHandler(1000L);
    ConsumerEndpointFactoryBean factoryBean = new ConsumerEndpointFactoryBean();
    factoryBean.setHandler(messageHandler);
    factoryBean.setInputChannel(ftpChannel);
    factoryBean.setInputChannelName("ftpChannel");
    factoryBean.start();
    Message<?> message = ftpChannel.receive();
    System.out.println("message: " + message);
    }
    }

    public static void main(String[] args) {
    Thread t = new Thread(new FTPService());
    t.start();
    }
    }
    =============================
    Could you please let me know what I am missing while running programmatically.
    I appreciate and thank you for your feedback.

    Thanks
    Venkat

  2. #2
    Join Date
    Jan 2008
    Location
    Mohnton, PA USA (that's near Philadelphia)
    Posts
    2,148

    Default

    I am not sure I understand what other benefits you get from doing it programmatically. I mean we can show you what needs to be done but it would be nice to see if you are getting something extra with it. Am I missing something?

  3. #3

    Default

    Good morning Oleg.
    One of the requirements is to read remote directories from database and create ftp inbound channel adapter for each remote directory. If it is done declaratively then I have to set up ftp inbound channel adapter for each remote directory manually. I know it is much more easier to do delcaratively because it requires least effort. Please let me know if your thoughts if I could fulfill this requirement without doing programmatically. I appreciate and thank you for your feedback.

    Thanks
    Venkat

  4. #4
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,148

    Default

    The dynamic-ftp 'advanced' sample shows how to create a flow-snippet context programmatically, and pass variables into placeholders using the Spring 3.1 environment abstraction.

    The sample is for the outbound side; we simply resolve to a channel to send a message into the flow snippet.

    On the inbound side, one technique would be to make the flow snippet containing the adapter a child context of the main context; that way, the various inbound channel adapters can set the channel to a common channel in the parent context.

    Hope that helps.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  5. #5

    Default

    Good morning Gary.
    I will check out dynamic-ftp sample. Thanks for your help.

    Venkat

  6. #6
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,148

    Default

    If you get something working and would like to contribute it back to the samples; let us know.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  7. #7

    Default

    Gary before even I get started on dynamic-ftp sample I had to work on figuring out on the following problem.
    Assuming that I run ftp-inbound-channel adapter declaratively (for reference I am copying it here again):
    -------------------------------------------------
    <int-ftp:inbound-channel-adapter id="ftpInbound"
    channel="ftpChannel"
    session-factory="ftpClientFactory"
    filename-regex=".*\.txt$"
    auto-create-local-directory="true"
    delete-remote-files="false"
    remote-directory="/"
    local-directory="/var/output/">
    <intoller fixed-rate="1000"/>
    </int-ftp:inbound-channel-adapter>

    <int:channel id="ftpChannel">
    <int:queue/>
    </int:channel>
    -------------------------------------------------
    Assume that this process on NodeA. How do I make this process High Available.
    If I run this process on two Nodes, NodeA and NodeB, then both processes will try to poll for a file and try to download to respective local node directories. Could you please let me know your thoughts on how to make this process Highly Available?.

    Thanks
    Venkat

  8. #8
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,148

    Default

    Please use [ code ] ... [ /code ] tags (no spaces in brackets) around code and config.

    This demonstration https://github.com/garyrussell/sprin...ration-ha-demo (from SpringOne last year) shows a technique of how to have multiple instances of an application up and running, but only one at a time consuming input data. It uses a cluster controller, and hearbeats, to keep track that the master instance is running ok; if one of the other instances detects the master is off line, one of them is elected the new master. The inbound adapters are stopped/started as necessary by the cluster controller using a <control-bus/>. There's a link to the SpringOne presentation in the github readme.

    Active/Passive just needs the cluster controller; Active/Active needs the master to distribute the work to other instances, via an AMQP or JMS queue or similar.

    The global cluster state needs to be maintained somewhere - the demo uses either a database table row, or redis.

    Hope that helps.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  9. #9

    Default

    Gary thanks for the info.
    I was going the demo project to understand spring-integration-cluster. I just wanted to find what value should I provide for key = integration.cluster.inbound.adapter in test-data-source-context.xml if I wanted to test against ftp inbound channel adapter. For amqp, redis is used is there an option to make use of mysql.

    Thanks
    Venkat

  10. #10
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,148

    Default

    That's the id of the inbound-adapter you need to control (e.g. for <ftp:inbound-adapter id="myAdapter" .../> you would use 'myAdapter').

    With the demo you have amqp and redis or jms and jdbc.

    If you don't need active/active, then just use the jms/jdbc option and remove the jms part.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •