Page 1 of 2 12 LastLast
Results 1 to 10 of 13

Thread: Restarting a previously stopped FTP inbound-channel-adapter

  1. #1
    Join Date
    Nov 2011
    Posts
    9

    Default Restarting a previously stopped FTP inbound-channel-adapter

    Hi folks,

    i have an application where
    • i want to receive messages from an FTP inbound-channel-adapter not continuously but only at manually triggered points in time
    • additionally i want to receive only messages of special single files - waiting for the file to exist for a defined time.
    • after receiving the message I want to stop the adapter - and later on restart with the same or another file.


    I used the following setup:

    Code:
    	<int-ftp:inbound-channel-adapter id="ftpInboundAdapter"
    			auto-startup="false"
    			session-factory="ftpSessionFactory"
    			cache-sessions="true"
    			channel="filterChannel"
    			local-directory="#{ systemProperties['java.io.tmpdir']}/localtest"
    			remote-directory="${ftp.directory}"
    			delete-remote-files="false"
    			auto-create-local-directory="true"
    			filter="singleFileFilter">
    		<int:poller fixed-rate="60000" max-messages-per-poll="-1"/>
    	</int-ftp:inbound-channel-adapter>
    	
    	<bean id="singleFileFilter" class="com.myapp.ftp.SingleFtpFileFilter" />
    	<int:channel id="filterChannel"/>
    	<int:filter input-channel="filterChannel" ref="singleFileFilter" output-channel="downloadChannel"/>
    	<int:channel id="downloadChannel">
    		<int:queue/>
    	</int:channel>
    SingleFtpFileFilter is implemented as FileListFilter and MessageSelector:

    Code:
    public class SingleFtpFileFilter implements MessageSelector, FileListFilter<FTPFile> {
    
    	/**
    	 * This class' logger.
    	 */
    	private final static Logger LOG = LoggerFactory.getLogger(SingleFtpFileFilter.class);
    	
    	/**
    	 * The file to download from the ftp server.
    	 */
    	private String fileToDownload;
    	
    	/**
    	 * Set the name of the file to download
    	 * @param fileToDownload name of the remote file
    	 */
    	public void setFileToDownload(String fileToDownload) {
    		this.fileToDownload = fileToDownload;
    	}	
    	
    	// ------------------------------------------------------------------------
    	// the remote file filter
    	// ------------------------------------------------------------------------
    
    	/**
    	 * The remote file filter.
    	 * @see org.springframework.integration.file.filters.FileListFilter#filterFiles(F[])
    	 */
    	@Override
    	public List<FTPFile> filterFiles(FTPFile[] files) {
    		List<FTPFile> filtered = new ArrayList<FTPFile>();
    		if (fileToDownload != null && fileToDownload.length() > 0) {
    			for (FTPFile file : files) {
    				if (file.getName().equals(fileToDownload)) {
    					LOG.debug("Accepted file {}", file.getName());
    					filtered.add(file);
    				}
    			}
    		}
    		return filtered;
    	}
    
    	// ------------------------------------------------------------------------
    	// the local file filter
    	// ------------------------------------------------------------------------
    	
    	@Override
    	public boolean accept(Message<?> message) {
    		if (fileToDownload != null && fileToDownload.length() > 0 &&
    				message != null && ((File) message.getPayload()).getName().equals(fileToDownload)) {
    			LOG.debug("Accepted message {}" + message);
    			return true;
    		}
    		return false;
    	}
    	
    }
    So far so good. The coreography is intended to be as follows:

    Code:
    		
    		// Start the test case - get inbound adapter as well as file filter
    		SourcePollingChannelAdapter adapter = ac.getBean("ftpInboundAdapter", SourcePollingChannelAdapter.class);
    		SingleFtpFileFilter filter = ac.getBean("singleFileFilter", SingleFtpFileFilter.class);
    
    		// Set file to download and start adapter
    		filter.setFileToDownload(fileToDownload);
    		adapter.start();
    
    		// receive message using timeout
    		Message<?> msg = ac.getBean("downloadChannel", PollableChannel.class).receive(3600000);
    		System.out.println("Received file message: " + msg);
    			
    		// stop adapter and reset filter
    		adapter.stop();
    		filter.setFileToDownload(null);
    This works correctly if I do it once. But after restarting the adapter later (calling adapter.start() again and using the same choreography as above) has a lot of side-effects:

    • There will be more than one task-scheduler on the same session thus sending messages more than once:
      Code:
      14:02:12.691 [task-scheduler-2] INFO  o.s.i.ftp.session.FtpSession - File have been successfully transfered to: INDICES/myFile_20111207.csv
      14:02:12.831 [task-scheduler-1] INFO  o.s.i.ftp.session.FtpSession - File have been successfully transfered to: INDICES/myFile_20111207.csv
    • Dependent on the timing I get exceptions receiving no message at all
      Code:
       [task-scheduler-1] ERROR o.s.i.handler.LoggingHandler - org.springframework.integration.MessageDeliveryException: failed to send message to channel 'downloadChannel' within timeout: -1
      	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:292)
      	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:176)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:160)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:125)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:119)
      	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:101)
      	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
      	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:110)
      	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
      	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:61)
      	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
      	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128)
      	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
      	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
      	at org.springframework.integration.endpoint.SourcePollingChannelAdapter.doPoll(SourcePollingChannelAdapter.java:102)
      	at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146)
      	at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:144)
      	at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:207)
      	at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
      	at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:48)
      	at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
      	at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:202)
      	at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:51)
      	at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
      	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
      	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98)
      	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:206)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
      	at java.lang.Thread.run(Thread.java:662)
      Received file message: null
    • There is no way to clear the included AcceptOnceFileListFilter content (except using a bad reflection hack) - and therefore I'm not able to trigger a previsouly received download for a second time.
    • There is no attribute scope which I can set to prototype when using an inbound-channel-adapter


    So how do I do this, restarting an inbound channel adapter from scratch, clearing all previously performed work??

    Thanks in advance
    Gunnar
    Last edited by Gunnar; Dec 12th, 2011 at 07:36 AM.

  2. #2
    Join Date
    Jan 2008
    Location
    Mohnton, PA USA (that's near Philadelphia)
    Posts
    2,148

    Default

    What version are you using? I am puzzled about the behavior of the task-scheduler. In fact I believe someone has reported something similar a while back, but I could never reproduce it including just right now. So it would be nice if you can also provide enough info/code (may be in a zipped file) on how to reproduce it.

  3. #3
    Join Date
    Jan 2008
    Location
    Mohnton, PA USA (that's near Philadelphia)
    Posts
    2,148

    Default

    Also, could you please repeat your test but set max-messages-per-poll="1" (not you have -1). I think it woud better satisfy your requirement since you are looking for a single file anyway

  4. #4
    Join Date
    Nov 2011
    Posts
    9

    Default

    Quote Originally Posted by oleg.zhurakousky View Post
    What version are you using? I am puzzled about the behavior of the task-scheduler. In fact I believe someone has reported something similar a while back, but I could never reproduce it including just right now. So it would be nice if you can also provide enough info/code (may be in a zipped file) on how to reproduce it.
    I'm using the latest available releases:

    • Spring 3.0.6.RELEASE
    • Spring Integration 2.0.5.RELEASE
    • For spring-integration-ftp I used 2.0.5.RELEASE as well, but modified the xsd to have the auto-start feature configurable, rest is original.


    I will try to provide more info / code - hopefully tomorrow.

    Regards
    Gunnar

  5. #5
    Join Date
    Jan 2008
    Location
    Mohnton, PA USA (that's near Philadelphia)
    Posts
    2,148

    Default

    Thanks Gunnar

    That would be very helpful since as I said I do remember something similar was reported before, but was never followed up and I really want to get to the bottom of it.

  6. #6
    Join Date
    Nov 2011
    Posts
    9

    Default

    Hi Oleg,

    attached you will find my test case: ftp.inbound.test.zip

    Please take account of the following customized parts:
    • As I'm behind a somehow very special proxy I introduced a ProxyFtpSessionFactory - the code is a modified DefaultFtpSessionFactory
    • As already mentioned I modified the xsd of spring-integration-ftp to allow auto-startup - same as you have done in INT-2103 added 'auto-startup' to FTP inbound adapter
    • I'm using Ivy for dependency management - please find dependency descriptions included in the build folder


    To reproduce the error please adapt test.resource/user.properties and point to a FTP server, define files to download.
    Eventually you have to run FtpInboundReceiveSample.testFTPInboundReceiveMulti pleFiles() more than once - the error seems to depend on timing... It is very likely that you will get an Exception while downloading the third file:

    Code:
    09:53:15.830 [main] INFO  o.s.i.e.SourcePollingChannelAdapter - started ftpInboundAdapter
    09:53:16.033 [task-scheduler-2] DEBUG d.v.i.ftp.SingleFtpFileFilter - Accepted file yyyy_indices_20111128.csv
    09:53:16.033 [task-scheduler-1] DEBUG d.v.i.ftp.SingleFtpFileFilter - Accepted file yyyy_indices_20111128.csv
    09:53:16.236 [task-scheduler-2] INFO  o.s.i.ftp.session.FtpSession - File have been successfully transfered to: abc/def/yyyy_indices_20111128.csv
    09:53:16.236 [task-scheduler-1] INFO  o.s.i.ftp.session.FtpSession - File have been successfully transfered to: abc/def/yyyy_indices_20111128.csv
    
    
    09:53:16.236 [task-scheduler-1] DEBUG o.s.i.e.SourcePollingChannelAdapter - Poll resulted in Message: null
    09:53:16.236 [task-scheduler-2] DEBUG o.s.i.file.FileReadingMessageSource - Added to queue: [C:\WINDOWS\Profiles\xxx\LOCALS~1\Temp\localtest\yyyy_indices_20111128.csv]
    09:53:16.236 [task-scheduler-1] DEBUG o.s.i.e.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
    09:53:16.236 [task-scheduler-2] INFO  o.s.i.file.FileReadingMessageSource - Created message: [[Payload=C:\WINDOWS\Profiles\xxxx\LOCALS~1\Temp\localtest\yyyy_indices_20111128.csv][Headers={timestamp=1323766396236, id=3f061130-36d0-4f16-b3ce-2cae60e2caf4}]]
    09:53:16.236 [task-scheduler-2] DEBUG o.s.i.e.SourcePollingChannelAdapter - Poll resulted in Message: [Payload=C:\WINDOWS\Profiles\xxxx\LOCALS~1\Temp\localtest\yyyy_indices_20111128.csv][Headers={timestamp=1323766396236, id=3f061130-36d0-4f16-b3ce-2cae60e2caf4}]
    09:53:16.236 [task-scheduler-2] DEBUG o.s.i.channel.DirectChannel - preSend on channel 'filterChannel', message: [Payload=C:\WINDOWS\Profiles\xxxx\LOCALS~1\Temp\localtest\yyyy_indices_20111128.csv][Headers={timestamp=1323766396236, id=3f061130-36d0-4f16-b3ce-2cae60e2caf4}]
    09:53:16.236 [task-scheduler-2] DEBUG o.s.integration.filter.MessageFilter - org.springframework.integration.filter.MessageFilter@15e2075 received message: [Payload=C:\WINDOWS\Profiles\xxxx\LOCALS~1\Temp\localtest\yyyy_indices_20111128.csv][Headers={timestamp=1323766396236, id=3f061130-36d0-4f16-b3ce-2cae60e2caf4}]
    
    
    09:53:16.236 [task-scheduler-2] DEBUG d.v.i.ftp.SingleFtpFileFilter - Accepted message {}[Payload=C:\WINDOWS\Profiles\xxxx\LOCALS~1\Temp\localtest\yyyy_indices_20111128.csv][Headers={timestamp=1323766396236, id=3f061130-36d0-4f16-b3ce-2cae60e2caf4}]
    09:53:16.236 [task-scheduler-2] DEBUG o.s.integration.filter.MessageFilter - handler 'org.springframework.integration.filter.MessageFilter@15e2075' sending reply Message: [Payload=C:\WINDOWS\Profiles\xxxx\LOCALS~1\Temp\localtest\yyyy_indices_20111128.csv][Headers={timestamp=1323766396236, id=3f061130-36d0-4f16-b3ce-2cae60e2caf4}]
    09:53:16.236 [task-scheduler-2] DEBUG o.s.integration.channel.QueueChannel - preSend on channel 'downloadChannel', message: [Payload=C:\WINDOWS\Profiles\xxxx\LOCALS~1\Temp\localtest\yyyy_indices_20111128.csv][Headers={timestamp=1323766396236, id=3f061130-36d0-4f16-b3ce-2cae60e2caf4}]
    09:53:16.236 [task-scheduler-2] DEBUG o.s.integration.channel.QueueChannel - postSend (sent=false) on channel 'downloadChannel', message: [Payload=C:\WINDOWS\Profiles\xxxx\LOCALS~1\Temp\localtest\yyyy_indices_20111128.csv][Headers={timestamp=1323766396236, id=3f061130-36d0-4f16-b3ce-2cae60e2caf4}]
    09:53:16.236 [task-scheduler-2] DEBUG o.s.i.c.PublishSubscribeChannel - preSend on channel 'errorChannel', message: [Payload=org.springframework.integration.MessageDeliveryException: failed to send message to channel 'downloadChannel' within timeout: -1][Headers={timestamp=1323766396236, id=2c59c498-a554-4c28-b51d-6800cc128a1e}]
    09:53:16.236 [task-scheduler-2] DEBUG o.s.i.handler.LoggingHandler - (inner bean)#2 received message: [Payload=org.springframework.integration.MessageDeliveryException: failed to send message to channel 'downloadChannel' within timeout: -1][Headers={timestamp=1323766396236, id=2c59c498-a554-4c28-b51d-6800cc128a1e}]
    09:53:16.236 [task-scheduler-2] ERROR o.s.i.handler.LoggingHandler - org.springframework.integration.MessageDeliveryException: failed to send message to channel 'downloadChannel' within timeout: -1
    	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:292)
    	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:176)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:160)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:125)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:119)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:101)
    	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:110)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
    	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:61)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128)
    	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
    	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
    	at org.springframework.integration.endpoint.SourcePollingChannelAdapter.doPoll(SourcePollingChannelAdapter.java:102)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:144)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:207)
    	at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
    	at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:48)
    	at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:202)
    	at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:51)
    	at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
    	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98)
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:206)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    	at java.lang.Thread.run(Thread.java:662)
    
    09:53:16.236 [task-scheduler-2] DEBUG o.s.i.c.PublishSubscribeChannel - postSend (sent=true) on channel 'errorChannel', message: [Payload=org.springframework.integration.MessageDeliveryException: failed to send message to channel 'downloadChannel' within timeout: -1][Headers={timestamp=1323766396236, id=2c59c498-a554-4c28-b51d-6800cc128a1e}]
    Received file message: null
    Thanks in advance,
    Gunnar

  7. #7
    Join Date
    Jan 2008
    Location
    Mohnton, PA USA (that's near Philadelphia)
    Posts
    2,148

    Default

    Gunnar
    Here is part one. I'll follow up with more.
    When 'max-messages-per-poll' is set to -1 the task will run continuously until it produces no message and than it will sleep until the next scheduled execution time (in your case its every minute)
    So what happens in your case is this;
    1. At the start of the test you manually put 'indices_test.csv' file in the temp directory in the beginning of your test it results in message:
    Message: [Payload=/Users/ozhurakousky/temp/ftptest/indices_test.csv][Headers={timestamp=1323787566011, id=db4c86bf-9172-46b0-b3fc-cb3a8a76ee3b}]
    This Message is generated as soon as adapter scans the temp directory which happens before it had a chance to initiate remote communication with FTP. This message is sent to the channel downstream where you have a <filter> which filters it out and Message never ends up in 'downloadChannel'. Meanwhile, adapter continues to spin and scans temp directory again but the 'indices_test.csv' file is now being filtered out by AcceptOnceFileListFilter and therefore no Message is produced from the temp directory. Only now adapter initiates communication with FTP to poll for more files. Now all that is happening on the separate thread which is not the thread that runs the test where you are issuing adapter.stop() which some times happens when the other thread is in the process of producing/sending Message. The stop command itself is calling this.runningTask.cancel(true); You can see that the interrupt flag is set to 'true' which means that any tasks that are in progress will be interrupted and since you are sending to the QueueChannel which uses LinkedBlockingQueue, queue.put(message) (line 80) results in InterruptedException which makes overall send call return 'false' hence the exception that you see:
    10:28:17.259 [task-scheduler-1hread] DEBUG: org.springframework.integration.channel.QueueChann el - postSend (sent=false) on channel 'myErrorChannel', message: [Payload=org.springframework.integration.MessageDel iveryException: failed to send message to channel 'downloadChannel' within timeout: -1][Headers={timestamp=1323790097258, id=eb94dfa7-dd00-4cc6-b712-5992da1a8fb2}]
    10:28:17.261 [task-scheduler-1hread] ERROR: org.springframework.integration.channel.MessagePub lishingErrorHandler - failure occurred in messaging task with message: [Payload=/Users/ozhurakousky/temp/ftptest/file2.pdf][Headers={timestamp=1323790097254, id=b5c69e5c-b1df-41b2-aaa4-5ebf9bf14ec5}]
    org.springframework.integration.MessageDeliveryExc eption: failed to send message to channel 'downloadChannel' within timeout: -1

  8. #8
    Join Date
    Nov 2011
    Posts
    9

    Default

    Hi Oleg,

    cool Is that a problem to call stop() at an improper time? If yes, how to avoid that?
    Looking forward to your final conclusion!

    Gunnar

  9. #9
    Join Date
    Jan 2008
    Location
    Mohnton, PA USA (that's near Philadelphia)
    Posts
    2,148

    Default

    Gunnar

    I am still working on it, there might actually be a bug on our end. The root of the problem is that when you stop an adapter it interrupts LinkedBlockingQueue.put() method which ideally you don't want to be interrupted.
    Anyway, i'll follow up

  10. #10
    Join Date
    Jan 2008
    Location
    Mohnton, PA USA (that's near Philadelphia)
    Posts
    2,148

    Default

    Gunnar

    To make your test case work all you need is add send-timeout="0" to the <filter> since it is s sender to the QueueChannel.
    Here is what's happening. QueueChannel is backed by a LinkedBlockingQueue (LBQ). When messages arrive we invoke one of 3 methods:
    a) LBQ.offer(message) - non-blocking and non-interruptable method which is invoked if send-timeout is set to 0.
    b) LBQ.offer(message, timeout) - blocking and interruptable method which is invoked if timeout is > 0
    c) LBQ.put(message) - blocking and interruptable method which is invoked if timeout is < 0

    Since you don't provide an explicit timeout we fallback on the LBQ.put(message) which on occasion gets interrupted in the middle of its execution hence the error you see.

    Now, what I said above would make you test case work since 0 time-out would result in call to LBQ.offer(message) and since its non-interruptable the stopping of the FTP adapter would not affect it.

    Having said that I think you are on the wrong track. I believe the reason why you are doing this is because you need to download specific files on demand and you are trying to make polling adapter work as if it was event driven adapter. What you need is FTP Outbound Gateway which is not available in 2.0.5 but is available in 2.1.RC1
    Here is the doc: http://static.springsource.org/sprin...tbound-gateway
    This way you'll be able to get a file on demand and in event driven style which I think is what you really want,
    Could you please upgrade and give it a shot?

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •