Page 2 of 2 FirstFirst 12
Results 11 to 15 of 15

Thread: File adapter in APPEND mode doesn't work.

  1. #11
    Join Date
    Oct 2012
    Posts
    10

    Default

    Maybe actual configuration will help you to point to my problem.

    spring-context-common.xml
    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
    	xmlns:int-file="http://www.springframework.org/schema/integration/file"
        
    ....">
     
     <int:poller id="defaultPoller"  
     	max-messages-per-poll="10" 
    	default="true" fixed-rate="3000" />
     
    <!-- Channel adapters --> 
     
    <!-- Chains --> 
     <int:chain input-channel="errorChannel">
      	<int:service-activator
        	ref="ErrorService" 
        	method="processError" />
        <int:object-to-string-transformer />
        <int-file:outbound-channel-adapter
     	  directory="file:C:/BASTest/Exceptions" /> 
     </int:chain>
     
    <!-- Spring Beans -->  	
     <bean id="ErrorService"
     	class="org.bas.integration.common.ErrorService" />
     	 	
    </beans>
    spring-context-businessevents.xml
    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
    	xmlns:jms="http://www.springframework.org/schema/integration/jms"
    	xmlns:int-file="http://www.springframework.org/schema/integration/file"
    	xmlns:int-xml="http://www.springframework.org/schema/integration/xml"
        
    ...">
    
     <import resource="classpath:spring-context-common.xml" />
     
     
    <!-- Channels --> 
     <int:publish-subscribe-channel id="BusinessEventsFilesPubSubChannel" />
     <int:channel id="BusinessEventsInChannel" />
     <int:channel id="BusinessEventsPreExportChannel" />
     <int:channel id="BusinessEventsExportChannel" />
     <int:channel id="FXBusinessEventChannel" />
     <int:channel id="MMBusinessEventChannel" />
     <int:channel id="SAPBusinessEventChannel" />
     <int:publish-subscribe-channel id="ToProcessPubSubChannel" />
     <int:channel id="PostponedChannel" /> 
     <int:channel id="EodInChannel" />
     
    <!-- Channel adapters --> 
     <int-file:inbound-channel-adapter id="businessEventsIn" 
     	channel="BusinessEventsFilesPubSubChannel"
     	prevent-duplicates="false" 
     	directory="file:C:/BASTest/BusinessEventsIn" />
     <int-file:outbound-channel-adapter id="businessEventsCopyOriginal"
     	channel="BusinessEventsFilesPubSubChannel" 
     	directory="file:C:/BASTest/BusinessEventsCopy" /> 	
     <int-file:outbound-channel-adapter id="businessEventsMM"
     	channel="MMBusinessEventChannel"
     	directory="file:C:/BASTest/BusinessEventsOut/Processed/MM" />
     <int-file:outbound-channel-adapter id="businessEventsFX"
     	channel="FXBusinessEventChannel" 
     	directory="file:C:/BASTest/BusinessEventsOut/Processed/FX" />
     <int-file:outbound-channel-adapter id="businessEventsSAP" 
     	channel="SAPBusinessEventChannel"
     	directory="file:C:/BASTest/BusinessEventsOut/Processed/SAP" />	
     
    <!-- Chains -->
     <int:chain input-channel="BusinessEventsFilesPubSubChannel" output-channel="BusinessEventsInChannel">
       <int-file:file-to-string-transformer delete-files="true" />
       <int:transformer ref="BusinessEventUnmarshaller" />
     </int:chain> 
     
     <int:chain input-channel="ToProcessPubSubChannel">
      <int:transformer ref="BusinessEventMarshaller" />
      <int:header-value-router header-name="BusinessEventClass">
        <int:mapping value="org.bas.entities.businessevents.FXBusinessEvent" channel="FXBusinessEventChannel" />
        <int:mapping value="org.bas.entities.businessevents.MMBusinessEvent" channel="MMBusinessEventChannel" />
        <int:mapping value="org.bas.entities.businessevents.SAPBusinessEvent" channel="SAPBusinessEventChannel" />   
      </int:header-value-router>
     </int:chain>
    
     <int:chain input-channel="ToProcessPubSubChannel">
       <int:filter ref="SettlementFilter" method="accept" />
       <int:transformer ref="BusinessEventMarshaller" />
       <int-file:outbound-channel-adapter 
         directory="file:C:/BASTest/BusinessEventsOut/ForSettlement" /> 
     </int:chain>
     
     <int:chain input-channel="PostponedChannel">
       <int:transformer ref="BusinessEventMarshaller" />
       <int-file:outbound-channel-adapter
         directory="file:C:/BASTest/BusinessEventsOut/Postponed" />
     </int:chain>
     
     <int:chain input-channel="BusinessEventsPreExportChannel">
       <int-file:file-to-string-transformer delete-files="true" />
       <int:transformer ref="BusinessEventUnmarshaller" />
       <int:transformer ref="BusinessEventMarshaller" />
       <int-xml:xslt-transformer xsl-resource="BusinessEventToFinanceFormat.xsl" />
       <int-file:outbound-channel-adapter
         directory="file:C:/BASTest/Export" mode="APPEND"
     	 filename-generator="businessEventFileNameGenerator" />	
     </int:chain> 
     
    <!-- Routers -->
     <int:router method="route"	input-channel="BusinessEventsInChannel">
       <bean class="org.bas.integration.businessevents.BusinessEventProcessDateRouter" />
     </int:router>
     
    <!-- Services -->
     <int:service-activator input-channel="EodInChannel"
        ref="BusinessEventsEodService" 
        method="export" />
    
    <!-- Spring Beans --> 
     <bean id="BusinessEventsEodService"
     	 class="org.bas.integration.businessevents.BusinessEventsEodService">
       <constructor-arg value="C:/BASTest/BusinessEventsOut/Processed" />
       <constructor-arg value="C:/BASTest/BusinessEventsOut/Postponed" />
     </bean>
     
     <bean id="businessEventFileNameGenerator"
     	class="org.bas.integration.businessevents.FileNameGenerator" />
     	
     <bean id="HashFileCreator"
     	class="org.bas.integration.businessevents.HashFileCreator" />
     
     <bean id="BusinessEventMarshaller"
     	class="org.bas.integration.businessevents.BusinessEventMarshaller" />  
     
     <bean id="BusinessEventUnmarshaller"
     	class="org.bas.integration.businessevents.BusinessEventUnmarshaller" /> 	
     
     <bean id="SettlementFilter"
     	class="org.bas.integration.businessevents.SettlementFilter" /> 	 
     	
    </beans>
    Main.java
    Code:
    package org.bas.integration;
    
    ...
    
    public class Main {
    	
    	@SuppressWarnings("resource")
    	public static void main(final String... args) throws Exception {      
            final AbstractApplicationContext context = new ClassPathXmlApplicationContext(
                    "classpath:spring-context-businessevents.xml");
            context.registerShutdownHook();
            
            /* This works as expected
            BusinessEventsEodService svc = (BusinessEventsEodService) 
            		context.getBean("BusinessEventsEodService");
    		Map<String, String> headers = new HashMap<String, String>();
    		headers.put("ProductFamily", "MM");
    		headers.put("ProcessDate", "20130122");
    		EodMessage msg = new EodMessage();
    		msg.setProcessDate("20130122");
    		msg.setDailyMessageCount(2);
    		msg.setProductFamily("MM");
    		svc.export(MessageBuilder.withPayload(msg).build());
    		*/
    	}
    }
    BusinessEventsEodService.java
    Code:
    package org.bas.integration.businessevents;
     ....
    public class BusinessEventsEodService implements ApplicationContextAware {
    	private ApplicationContext context;
    	private String processedFolder;
    	private String postponedFolder;
    	
    	public BusinessEventsEodService(String processedFolder, String postponedFolder) {
    		this.processedFolder = processedFolder;
    		this.postponedFolder = postponedFolder;
    	}
    	
    	public void export(Message<EodMessage> message) throws Exception {
    		SourcePollingChannelAdapter inAdapter = (SourcePollingChannelAdapter) 
    				context.getBean("businessEventsIn");
    		EodMessage eodMsg = message.getPayload();
    		
    		inAdapter.stop();
    		
    		File inFolder = new File(String.format("%s/%s", processedFolder, 
    				eodMsg.getProductFamily()));
    		
    		validateEodMessage(eodMsg);
    		
    		if (inFolder.listFiles().length != eodMsg.getDailyMessageCount()) {
    			throw new BasException(
    					"Number of business events received is not the same as in EOD file", 
    					DateTimeUtils.getCurrentBusinessDate(eodMsg.getProductFamily() + "BusinessEvent"));
    		}
    
    		Map<String, String> headers = new HashMap<String, String>();
    		headers.put("ProductFamily", eodMsg.getProductFamily());
    		headers.put("ProcessDate", eodMsg.getProcessDate());
    		generateMessagesFromFolder(inFolder, "BusinessEventsPreExportChannel", headers);
    		
    		DateTimeUtils.closeBusinessDate(
    				eodMsg.getProductFamily() + "BusinessEvent",
    				DateTimeUtils.toDate(eodMsg.getProcessDate(), "yyyyMMdd"));
    		
    		inAdapter.start();
    		generateMessagesFromFolder(new File(postponedFolder), "BusinessEventsFilesPubSubChannel");
    	}
    
    	public void generateMessagesFromFolder(File folder, String channelToSend, Map<String, String> headers) {
    		MessageChannel channel = (MessageChannel) context.getBean(channelToSend);
    		
    		for (File file : folder.listFiles()) {
    			Message<File> msg = MessageBuilder.withPayload(file)
    					.copyHeaders(headers)
    					.build();
    			
    			try {
    				channel.send(msg);
    			} catch (Exception e) {
    				Logger.getRootLogger().error(e);
    			}
    		}
    	}
    	
    	public void generateMessagesFromFolder(File folder, String channelToSend) {
    		MessageChannel channel = (MessageChannel) context.getBean(channelToSend);
    		
    		for (File file : folder.listFiles()) {
    			Message<File> msg = MessageBuilder.withPayload(file)
    					.build();
    			
    			channel.send(msg);
    		}
    	}
    
    ....
    	
    	@Override
    	public void setApplicationContext(ApplicationContext applicationContext)
    			throws BeansException {
    		context = applicationContext;
    	}
    }

  2. #12
    Join Date
    Oct 2012
    Posts
    10

    Default

    One more addition:
    I seem to localize what is the source of the problem. As you can see in export method of BusinessEventsEodService the following code is being executed

    Code:
    SourcePollingChannelAdapter inAdapter = (SourcePollingChannelAdapter) 
    				context.getBean("businessEventsIn");
    inAdapter.stop();
    When I comment those lines out, the file appender works as expected. I wonder, what can be the problem with stopping the adapter and, as I assume, the default poller? And why it works when invoking explicitly from Main method?

  3. #13
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,012

    Default

    Stopping the adapter does not stop the default poller, just the adapter.

    I cannot imagine why stopping the adapter would cause this problem - especially because that adapter is polling a different directory.

    As I said before, running the main() is a different environment; the problem is because some other class has the file open.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  4. #14
    Join Date
    Oct 2012
    Posts
    10

    Default

    Quote Originally Posted by Gary Russell View Post
    Stopping the adapter does not stop the default poller, just the adapter.

    I cannot imagine why stopping the adapter would cause this problem - especially because that adapter is polling a different directory.

    As I said before, running the main() is a different environment; the problem is because some other class has the file open.
    Hi Gary,
    Thanks for your answer, seems like now I suspect, that I understand why this happens, and it has nothing to do with file locking. The thing is, by invoking stop on adapter, the main method terminates, and sends notification to other threads to interrupt themselves. That's why file locking throws InterruptedException. Calling the service activator from Main doesn't have this problem for obvious reason - Main is still being executed.
    Could you please explain if there is some rational behind that line of thought? Maybe you could also advise on how to implement Main in the best manner (compared to how it is done right now)? Plus, what can I do instead of stopping the adapter, in order to stop processing of incoming messages while EOD logic is being executed?

    Thanks in advance!

  5. #15
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,012

    Default

    Why don't you just stop your main method from terminating?
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •