Results 1 to 3 of 3

Thread: Skip not working correctly with FlatFileItemReader using CompositeLineTokenizer

  1. #1

    Default Skip not working correctly with FlatFileItemReader using CompositeLineTokenizer

    Hi,

    We are using Spring Batch 1.1.4 in my project.I have configured skip listener for my ItemReader which extends MultiresourceItemReader and uses FlatFileItemReader as delegate to read the file. FlatFileItemReader uses PrefixMatchingCompositeLineTokenizer as tokenizer and fieldMapper as PassThroughFieldSetMapper. Whenever an skippable exception occurs while reading a line(For E.g NumberFormatException), once the skiplistener's callback method is invoked, it continues to read next line in the file rather than the next item. For E.g One item in my file is constituted by Lines H000,H001,D001,D002 and S001. Suppose if the exception is occuring in H001, it continues to read D001 rather than skipping the whole composite Line till S001.Please help me in resolving this.

    Please find the configuration below.

    Code:
    <bean id="oceanLoadJob" class="org.springframework.batch.core.job.SimpleJob">
        <property name="steps">
          <list>
            <bean id="loadStep" parent="simpleStep">
              <property name="itemReader" ref="oceanInvoiceReader"/>
              <property name="itemWriter" ref="oceanInvoiceWriter"/>
              <property name="listeners" ref="oceanLoadSkipListener"/>
    	      <property name="skipLimit" value="10" />
    	      <property name="skippableExceptionClasses" 
    	                value="java.lang.NumberFormatException,
    	                java.lang.NullPointerException,
    	                com.tgt.ecl.exception.TgtDataAccessException,
    	                org.springframework.batch.item.file.FlatFileParseException,
    	                org.springframework.batch.item.file.transform.FlatFileFormatException,
    	                org.springframework.batch.item.file.transform.IncorrectLineLengthException,
    	                org.springframework.batch.item.file.transform.IncorrectTokenCountException"/>
            </bean>
          </list>
        </property>
        <property name="restartable" value="true"/>
        <property name="jobRepository" ref="simpleJobRepository"/>
      </bean>
      
      <bean id="simpleStep" class="org.springframework.batch.core.step.item.StatefulRetryStepFactoryBean" >
    	<property name="transactionManager" ref="JTATxManager" />
    	<property name="jobRepository" ref="simpleJobRepository" />
    	<property name="itemReader" ref="oceanInvoiceReader" />
    	<property name="itemWriter" ref="oceanInvoiceWriter" />
    	<property name="commitInterval" value="1" />
    	<property name="listeners" ref="oceanLoadSkipListener"/>
    	<property name="skipLimit" value="10" />
    	<property name="skippableExceptionClasses" 
    	                value="java.lang.NumberFormatException,
    	                java.lang.NullPointerException"/>
      </bean>
      
      <bean id="oceanLoadSkipListener" class="com.tgt.ofp.invoices.oceaninvoice.listener.OceanLoadSkipListener">
         <property name="dao" ref="batchErrDAO"/>
      </bean>
          
      <bean id="oceanInvoiceReader" class="com.tgt.ofp.invoices.oceaninvoice.reader.OceanInvoiceReader">
        <property name="resources" value="${ofp.baseFileSystemPath}/input/invoices/oceaninvoice/*.txt"/>
        <property name="delegate">
        <bean class="org.springframework.batch.item.file.FlatFileItemReader">
           <property name="lineTokenizer" ref="oceanFileTokenizer"/>
           <property name="fieldSetMapper">
                  <bean class="org.springframework.batch.item.file.mapping.PassThroughFieldSetMapper" />
           </property>
        </bean>
        </property>
       </bean>
    
       
       <bean id="oceanInvoiceWriter" class="com.tgt.ofp.invoices.oceaninvoice.writer.OceanInvoiceWriter">
           <property name="dao" ref="oceanLoadDao"/>
           <property name="paymentDAO" ref="pymtDAO"/>
       </bean>
          
       <bean id="oceanFileTokenizer" class="org.springframework.batch.item.file.transform.PrefixMatchingCompositeLineTokenizer">
         <property name="tokenizers">
            <map>
              <entry key="H000" value-ref="oceanHeaderTokenizer"/>
              <entry key="H001" value-ref="oceanInvoiceTokenizer"/>
              <entry key="D001" value-ref="oceanContainerTokenizer"/>
              <entry key="D002" value-ref="oceanFeeTokenizer"/>
              <entry key="S001" value-ref="oceanSummaryTokenizer"/>
            </map>
          </property>
       </bean>
     
     
       <bean id="oceanHeaderTokenizer" class="org.springframework.batch.item.file.transform.FixedLengthTokenizer">
          <property name="names" value="LINE_HDR,LINE_FTR"/>
          <property name="columns" value="1-4,5-278"/>
       </bean>
       
       <bean id="oceanInvoiceTokenizer" class="org.springframework.batch.item.file.transform.FixedLengthTokenizer">
          <property name="names" value="LINE_HDR,BOL_I,INVC_D,CRGO_RCPT_D,INVC_A,CARR_SCAC_C,LOAD_BY_C,SRVC_CON_I,VES_N,VOYG_I,POEX_C,CTRY_C,POEN_C,ACTL_DEPTR_D,EST_POEN_D,PODL_C,LINE_FTR"/>
          <property name="columns" value="1-4,8-20,71-78,71-78,79-90,91-94,103-106,107-131,135-169,170-173,189-193,196-198,199-203,213-220,229-236,245-249,250-278"/>
       </bean>
       
       <bean id="oceanContainerTokenizer" class="org.springframework.batch.item.file.transform.FixedLengthTokenizer">
           <property name="names" value="LINE_HDR,EQPM_I,EQPM_WT_Q,EQPM_WT_UOM,EQPM_VOL_Q,EQPM_VOL_UOM,EQPM_TYPE_C,EQPM_CTN_Q,LINE_FTR"/>
           <property name="columns" value="1-4,38-48,49-60,61-61,62-73,74-74,75-78,79-87,88-278"/>
       </bean>
       
       <bean id="oceanFeeTokenizer" class="org.springframework.batch.item.file.transform.FixedLengthTokenizer">
           <property name="names" value="LINE_HDR,FRT_RATE_A,FRT_RATE_UOM_C,FEE_A,FRT_EXP_C,CHRG_Q,LINE_FTR"/>
           <property name="columns" value="1-4,45-56,57-58,59-70,83-85,86-94,95-278"/>
       </bean>
       
       <bean id="oceanSummaryTokenizer" class="org.springframework.batch.item.file.transform.FixedLengthTokenizer">
          <property name="names" value="LINE_HDR,TOT_INVC_WT_Q,TOT_COLL_FEE_A,TOT_INVC_VOL_Q,TOT_INVC_VOL_UOM,TOT_INVC_CTN_Q,LINE_FTR"/>
          <property name="columns" value="1-4,38-49,50-61,74-85,86-86,87-95,96-278"/>
       </bean>

  2. #2

    Default

    Here is the code for the writer.

    Code:
    public OceanInvoiceVO read() throws  Exception,UnexpectedInputException{
    	    logger.logp(Level.FINER,className,"read()","Entering method");
    		OceanInvoiceVO oceanInvoiceVO = null;
    		List<OceanEquipmentVO> equipmentList = new ArrayList<OceanEquipmentVO>();
    		List<OceanFeeInfoVO> feeList = new ArrayList<OceanFeeInfoVO>();
    	
    		for(FieldSet line = null; (line = (FieldSet)super.read()) != null;){
    			String lineIdentifier = line.readString(0);
    			if(lineIdentifier.equals("H000")){
    			  oceanInvoiceVO = new OceanInvoiceVO();
    			}
    			if(lineIdentifier.equals("H001")){
    				oceanInvoiceVO.setBillOflandingNumber(line.readString("BOL_I").trim());
    				oceanInvoiceVO.setInvoiceDate(DateUtil.getFormattedDate(line.readString("INVC_D").trim()));
    				oceanInvoiceVO.setCargoReceiptDate(DateUtil.getFormattedDate(line.readString("CRGO_RCPT_D").trim()));
    				oceanInvoiceVO.setInvoiceAmount(new BigDecimal(new BigInteger(line.readString("INVC_A").trim()),3));
    				oceanInvoiceVO.setCarrierSCACCode(line.readString("CARR_SCAC_C").trim());
    				oceanInvoiceVO.setLoadByCode(line.readString("LOAD_BY_C").trim());
    				oceanInvoiceVO.setServiceContactID(line.readString("SRVC_CON_I").trim());
    			    oceanInvoiceVO.setVesselName(line.readString("VES_N").trim());
    			    oceanInvoiceVO.setVoyageID(line.readString("VOYG_I").trim());
    			    oceanInvoiceVO.setPortOfExportCode(line.readString("POEX_C").trim());
    			    oceanInvoiceVO.setIsoCountryCode(line.readString("CTRY_C").trim());
    			    oceanInvoiceVO.setPortOfEntryCode(line.readString("POEN_C").trim());
    			    oceanInvoiceVO.setActualDepartureDate(DateUtil.getFormattedDate(line.readString("ACTL_DEPTR_D").trim()));
    			    oceanInvoiceVO.setEstPortOfEntryDate(DateUtil.getFormattedDate(line.readString("EST_POEN_D").trim()));
    			    oceanInvoiceVO.setPortOfDischarging(line.readString("PODL_C").trim());
    				logger.logp(Level.FINER, className,"read()", "End of Invoice Header record");
    			}
    			else if(lineIdentifier.equals("D001")){
    				OceanEquipmentVO oceanEquipmentVO = new OceanEquipmentVO();
    				oceanEquipmentVO.setEquipmentID(line.readString("EQPM_I").trim());
    				oceanEquipmentVO.setEquipmentWeightQuantity(new BigDecimal(new BigInteger(line.readString("EQPM_WT_Q").trim()),3));
    				oceanEquipmentVO.setEquipmentWeightQuantityMeasure(line.readString("EQPM_WT_UOM").trim());
    				oceanEquipmentVO.setEquipmentVolumeQuantity(new BigDecimal(new BigInteger(line.readString("EQPM_VOL_Q").trim()),3));
    				oceanEquipmentVO.setEquipmentVolumeUnitMeasure(line.readString("EQPM_VOL_UOM").trim());
    				oceanEquipmentVO.setEquipmentType(line.readString("EQPM_TYPE_C").trim());
    				oceanEquipmentVO.setIsoEquipmentTypeCode("4500");
    				oceanEquipmentVO.setCreateUserID("SYSTEM");
    				oceanEquipmentVO.setEquipmentCartonQuantity(new BigDecimal(new BigInteger(line.readString("EQPM_CTN_Q").trim()),3));
    				equipmentList.add(oceanEquipmentVO);
    				logger.logp(Level.FINER, className,"read()", "End of Invoice Container record");
    			}
    			else if(lineIdentifier.equals("D002")){
    			   OceanFeeInfoVO oceanFeeInfoVO = new OceanFeeInfoVO();
    			   oceanFeeInfoVO.setFreightRateAmount(new BigDecimal(new BigInteger(line.readString("FRT_RATE_A").trim()),3));
    			   oceanFeeInfoVO.setPriceBasis(line.readString("FRT_RATE_UOM_C").trim());
    			   oceanFeeInfoVO.setFeeAmount(new BigDecimal(new BigInteger(line.readString("FEE_A").trim()),3));
    			   oceanFeeInfoVO.setChargedQuantity(new BigDecimal(new BigInteger(line.readString("CHRG_Q").trim()),3));
    			   oceanFeeInfoVO.setFreightExpenseCode(line.readString("FRT_EXP_C").trim());
    			   oceanFeeInfoVO.setCreateUserID("SYSTEM");
    			   feeList.add(oceanFeeInfoVO);
    			   logger.logp(Level.FINER, className,"read()", "End of Invoice Fee record");
    			}
    			else if(lineIdentifier.equals("S001")){
    			  oceanInvoiceVO.setTotalInvoiceWeightQuantity(new BigDecimal(new BigInteger(line.readString("TOT_INVC_WT_Q").trim()),3));
    			  oceanInvoiceVO.setTotalInvoiceWeightUOMCode("KG");
    			  oceanInvoiceVO.setTotalCollectFeeAmount(new BigDecimal(new BigInteger(line.readString("TOT_COLL_FEE_A").trim()),3));
    			  oceanInvoiceVO.setTotalVolumeQuantity(new BigDecimal(new BigInteger(line.readString("TOT_INVC_VOL_Q").trim()),3));
    			  oceanInvoiceVO.setTotalVolumeUOMCode(line.readString("TOT_INVC_VOL_UOM").trim());
    			  oceanInvoiceVO.setTotalInvoiceCartonQuantity(new Long(line.readString("TOT_INVC_CTN_Q").trim()));
    			  oceanInvoiceVO.setInvoiceModeCode("SY");
    			  oceanInvoiceVO.setTransportationModeCode("11");
    			  oceanInvoiceVO.setInvoiceStatusCode("PN");
    			  oceanInvoiceVO.setInvoiceCurrencyCode("USD");
    			  oceanInvoiceVO.setVendorID(new Long(5000223));
    			  oceanInvoiceVO.setCreateUserID("SYSTEM");
    			  oceanInvoiceVO.setCommodityCode("A");
    			  oceanInvoiceVO.setInvoiceAuditStatusCode("PN");
    			  oceanInvoiceVO.setContainerList(equipmentList);
    			  oceanInvoiceVO.setFeeInfoList(feeList);
    			  logger.logp(Level.FINER, className,"read()", "End of Invoice Summary record");
    			  return oceanInvoiceVO;
    			}
    		}
    		logger.logp(Level.FINER, className,"read()", "End of read method");
    		return oceanInvoiceVO;
    	}

  3. #3
    Join Date
    Jun 2005
    Posts
    4,230

    Default

    Quote Originally Posted by deepakthakku View Post
    Here is the code for the writer.
    You mean "reader"? Don't you just have to code around the fact that the delegate might occasionally return an H001 record before an H000 (in the case of a restart)?

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •