Page 1 of 2 12 LastLast
Results 1 to 10 of 11

Thread: Dynamic commit-interval

  1. #1

    Default Dynamic commit-interval

    Hi all,

    I have a spring batch applications with flat file as item reader.

    This flat contains two differents types of records.

    In the definition of my chunk i have to specify a dynamic commit interval.

    The application has to commit each time it reads a new type of record.

    Exemple :

    1, 'recordType1', 50
    1, 'recordType1', 51
    1, 'recordType1', 52
    COMMIT
    2, 'recordType2', 'foo'
    COMMIT
    1, 'recordType1', 53
    COMMIT
    2, 'recordType2', 'foo'
    COMMIT
    1, 'recordType1', 50
    1, 'recordType1', 50
    1, 'recordType1', 50
    1, 'recordType1', 50
    COMMIT
    2, 'recordType2', 'foo'
    2, 'recordType2', 'foo'
    2, 'recordType2', 'foo'
    2, 'recordType2', 'foo'
    COMMIT


    How can I do this ?

    Thanxs!

  2. #2
    Join Date
    Sep 2008
    Location
    Chicagoland, IL
    Posts
    338

    Default

    Take a look at the chunk-completion-policy. You can implement the CompletionPolicy interface and configure that to perform some form of logic to determine if the chunk is complete or not.
    Michael Minella
    Spring Batch Lead
    Author - Pro Spring Batch
    http://www.michaelminella.com
    Twitter: @MichaelMinella

  3. #3

    Default

    hi,

    thank you for replying, yes im always searching for it.

    Indeed, it works well with chunk-completion-policy... With a little business logic i can commit by bloc of same records, but i have a problem by using it...

    chunk-completion-policy supposes that the entire resource file is correct because it reads all the resource file before doing anything else.

    If the the resource file has for example three corrects blocs but the fourth is wrong then none records will be inserted in my DB.
    Or i want that the three first corrects blocs will be persisted ...

  4. #4
    Join Date
    Sep 2008
    Location
    Chicagoland, IL
    Posts
    338

    Default

    If you don't want the transaction to roll back, you can skip the exception via the skippable-exception-classes (section 5.1.5 here: http://static.springsource.org/sprin...igureStep.html).
    Michael Minella
    Spring Batch Lead
    Author - Pro Spring Batch
    http://www.michaelminella.com
    Twitter: @MichaelMinella

  5. #5

    Default

    Yes, I know, but I want transaction rollback on the entire bloc that got wrong record, then stopping application

    The previous right blocs must be fully commited

  6. #6
    Join Date
    Sep 2008
    Location
    Chicagoland, IL
    Posts
    338

    Default

    Can you post your job configuration?
    Michael Minella
    Spring Batch Lead
    Author - Pro Spring Batch
    http://www.michaelminella.com
    Twitter: @MichaelMinella

  7. #7

    Default

    Here you are :

    Something to say you : First i used the JPA dialect, then I remove it and implement my own writer (my writer do nothing!) In my processor I inject a Service (VenteService) that do some business logic and manage transaction



    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
    	xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
    	xmlns:util="http://www.springframework.org/schema/util"
    
    	xsi:schemaLocation="http://www.springframework.org/schema/beans 
    	http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
    	http://www.springframework.org/schema/batch
    	http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
    	http://www.springframework.org/schema/context 
            http://www.springframework.org/schema/context/spring-context-3.1.xsd 
            http://www.springframework.org/schema/tx 
            http://www.springframework.org/schema/tx/spring-tx-3.1.xsd
            http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.0.xsd">
    
    	 <import resource="classpath:/applicationContext-dao.xml" />
    
    	<!-- Instruct Spring to perform declarative transaction management automatically 
    		on annotated classes. -->
    	<tx:annotation-driven transaction-manager="transactionManager" />
    
    	<bean id="venteReader" class="org.springframework.batch.item.file.FlatFileItemReader">
    		<property name="resource" value="classpath:prixvente_000001.txt" />
    
    		<property name="lineMapper">
    			<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
    				<property name="lineTokenizer">
    					<bean
    						class="org.springframework.batch.item.file.transform.PatternMatchingCompositeLineTokenizer">
    						<property name="tokenizers">
    							<map>
    								<entry key="1*" value-ref="line1Tokenizer" />
    								<entry key="2*" value-ref="line2Tokenizer" />
    							</map>
    						</property>
    					</bean>
    				</property>
    
    				<property name="fieldSetMapper">
    					<bean class="com.ubik.newapp.batch.VenteFieldSetMapper">
    					</bean>
    				</property>
    			</bean>
    		</property>
    	</bean>
    
    	<bean id="line1Tokenizer"
    		class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
    		<property name="names"
    			value="ligneType,numeroCondition, debutValidite, finValidite, prixValeur, miseAJour, numeroArticle, originePV, topPrixDeVenteConseille, prixVenteMin, codePrixDeVente" />
    		<property name="delimiter">
    			<util:constant
    				static-field="org.springframework.batch.item.file.transform.DelimitedLineTokenizer.DELIMITER_TAB" />
    		</property>
    	</bean>
    
    	<bean id="line2Tokenizer"
    		class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
    		<property name="names"
    			value="ligneType,numeroCondition,numeroEntiteMagasin,debutValiditeMag,finValiditeMag,miseAJour,extra1,extra2,extra3,extra4,extra5" />
    		<property name="delimiter">
    			<util:constant
    				static-field="org.springframework.batch.item.file.transform.DelimitedLineTokenizer.DELIMITER_TAB" />
    		</property>
    	</bean>
    
    <!-- 	<bean id="venteWriter" class="org.springframework.batch.item.database.JpaItemWriter"> -->
    <!-- 	<property name="entityManagerFactory" ref="entityManagerFactory" /> -->
    <!-- 	</bean> -->
    
    	<bean id="dataSource" destroy-method="close"
    		class="org.apache.commons.dbcp.BasicDataSource">
    		<property name="driverClassName" value="org.postgresql.Driver" />
    		<property name="url" value="jdbc:postgresql://127.0.0.1:5433/ubik" />
    		<property name="username" value="postgres" />
    		<property name="password" value="ubikubik" />
    	</bean>
        
    <!--     <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> -->
    <!--      <property name="dataSource" ref="dataSource"/> -->
    <!--   </bean> -->
    
    	<bean id="jobLauncher"
    		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
    		<property name="jobRepository" ref="jobRepository" />
    	</bean>
    
    	<bean id="jobRepository"
    		class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
    <!--   		<property name="transactionManager" ref="transactionManager" /> -->
    <!--  		<property name="isolationLevelForCreate" value="PROPAGATION_REQUIRED" />  -->
    	</bean>
    
    	<!-- ========================= BUSINESS OBJECT DEFINITIONS ========================= -->
    
    	<!-- Activates various annotations to be detected in bean classes: Spring's 
    		@Required and @Autowired, as well as JSR 250's @Resource. -->
    	<context:annotation-config />
    
    	<context:component-scan base-package="com.ubik.newapp"
    		scoped-proxy="targetClass" />
    
    	<job id="importVentes" xmlns="http://www.springframework.org/schema/batch" >
    		<step id="readWriteVente">
    			<tasklet >
     				<chunk reader="venteReader" processor="venteProcessor" 
     					reader-transactional-queue="true" processor-transactional="true" writer="venteWriter" chunk-completion-policy="completionPolicy" >
                       
    				</chunk>
    			</tasklet>
    		</step>
    	</job>
    
    	<bean id="completionPolicy" class="com.ubik.newapp.batch.CompletionPolicy" />
    
     	<bean id="venteProcessor" class="com.ubik.newapp.batch.VenteProcessor" />
    
     	<bean id="venteWriter" class="com.ubik.newapp.batch.VenteWriter" />
    </beans>

    VenteService :
    Code:
    package com.ubik.newapp.service.third.impl;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    import org.springframework.transaction.PlatformTransactionManager;
    import org.springframework.transaction.TransactionDefinition;
    import org.springframework.transaction.TransactionStatus;
    import org.springframework.transaction.support.DefaultTransactionDefinition;
    
    import com.ubik.newapp.dao.third.VenteDao;
    import com.ubik.newapp.dto.third.Vente;
    import com.ubik.newapp.service.third.VenteService;
    
    @Service
    public class VenteServiceImpl implements VenteService {
    
    	@Autowired
    	private VenteDao venteDao;
    	
    	@Autowired
        private PlatformTransactionManager transactionManager;
    	
    	private static final int LIGNE_TYPE_MAG = 1;
    	
    	private boolean commit;
    
    	public void create(Vente p) {
    		
    		//BLOC COMMIT WORKS WITH : 
    		
    		//ISOLATION_DEFAULT
    		//ISOLATION_READ_COMMITTED
    		//ISOLATION_READ_UNCOMMITTED
    		//ISOLATION_SERIALIZABLE
    		//PROPAGATION_MANDATORY
    		//PROPAGATION_REQUIRED
    		//PROPAGATION_SUPPORTS
    		//TIMEOUT_DEFAULT
    		TransactionStatus transaction = transactionManager.getTransaction(new DefaultTransactionDefinition(
                    TransactionDefinition.PROPAGATION_REQUIRED));
    		
    		venteDao.save(p);
    		if(commit && p.getLigneType() == LIGNE_TYPE_MAG){
    			transaction.flush();
    			transactionManager.commit(transaction);
    			commit = false;
    		}
    		else if(p.getLigneType()== LIGNE_TYPE_MAG){
    			commit = true;
    		}
    	}
    }

  8. #8
    Join Date
    Sep 2008
    Location
    Chicagoland, IL
    Posts
    338

    Default

    Ok...Questions:
    1. Why did you configure your reader as a transactional queue? That really should only be true if you're reading from a JMS queue or similar resource. A flat file should not be configured this way.
    2. Why is your processor transactional? What is it doing that prevents it from just participating in the chunk's transaction?
    Michael Minella
    Spring Batch Lead
    Author - Pro Spring Batch
    http://www.michaelminella.com
    Twitter: @MichaelMinella

  9. #9

    Default

    i can't

  10. #10

    Default

    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
    	xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
    	xmlns:util="http://www.springframework.org/schema/util"
    
    	xsi:schemaLocation="http://www.springframework.org/schema/beans 
    	http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
    	http://www.springframework.org/schema/batch
    	http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
    	http://www.springframework.org/schema/context 
            http://www.springframework.org/schema/context/spring-context-3.1.xsd 
            http://www.springframework.org/schema/tx 
            http://www.springframework.org/schema/tx/spring-tx-3.1.xsd
            http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.0.xsd">
    
    	 <import resource="classpath:/applicationContext-dao.xml" />
    
    	<!-- Instruct Spring to perform declarative transaction management automatically 
    		on annotated classes. -->
    	<tx:annotation-driven transaction-manager="transactionManager" />
    
    	<bean id="venteReader" class="org.springframework.batch.item.file.FlatFileItemReader">
    		<property name="resource" value="classpath:prixvente_000001.txt" />
    
    		<property name="lineMapper">
    			<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
    				<property name="lineTokenizer">
    					<bean
    						class="org.springframework.batch.item.file.transform.PatternMatchingCompositeLineTokenizer">
    						<property name="tokenizers">
    							<map>
    								<entry key="1*" value-ref="line1Tokenizer" />
    								<entry key="2*" value-ref="line2Tokenizer" />
    							</map>
    						</property>
    					</bean>
    				</property>
    
    				<property name="fieldSetMapper">
    					<bean class="com.ubik.newapp.batch.VenteFieldSetMapper">
    					</bean>
    				</property>
    			</bean>
    		</property>
    	</bean>
    
    	<bean id="line1Tokenizer"
    		class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
    		<property name="names"
    			value="ligneType,numeroCondition, debutValidite, finValidite, prixValeur, miseAJour, numeroArticle, originePV, topPrixDeVenteConseille, prixVenteMin, codePrixDeVente" />
    		<property name="delimiter">
    			<util:constant
    				static-field="org.springframework.batch.item.file.transform.DelimitedLineTokenizer.DELIMITER_TAB" />
    		</property>
    	</bean>
    
    	<bean id="line2Tokenizer"
    		class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
    		<property name="names"
    			value="ligneType,numeroCondition,numeroEntiteMagasin,debutValiditeMag,finValiditeMag,miseAJour,extra1,extra2,extra3,extra4,extra5" />
    		<property name="delimiter">
    			<util:constant
    				static-field="org.springframework.batch.item.file.transform.DelimitedLineTokenizer.DELIMITER_TAB" />
    		</property>
    	</bean>
    
    <!-- 	<bean id="venteWriter" class="org.springframework.batch.item.database.JpaItemWriter"> -->
    <!-- 	<property name="entityManagerFactory" ref="entityManagerFactory" /> -->
    <!-- 	</bean> -->
    
    	<bean id="dataSource" destroy-method="close"
    		class="org.apache.commons.dbcp.BasicDataSource">
    		<property name="driverClassName" value="org.postgresql.Driver" />
    		<property name="url" value="jdbc:postgresql://127.0.0.1:5433/ubik" />
    		<property name="username" value="postgres" />
    		<property name="password" value="ubikubik" />
    	</bean>
        
    <!--     <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> -->
    <!--      <property name="dataSource" ref="dataSource"/> -->
    <!--   </bean> -->
    
    	<bean id="jobLauncher"
    		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
    		<property name="jobRepository" ref="jobRepository" />
    	</bean>
    
    	<bean id="jobRepository"
    		class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
    <!--   		<property name="transactionManager" ref="transactionManager" /> -->
    <!--  		<property name="isolationLevelForCreate" value="PROPAGATION_REQUIRED" />  -->
    	</bean>
    
    	<!-- ========================= BUSINESS OBJECT DEFINITIONS ========================= -->
    
    	<!-- Activates various annotations to be detected in bean classes: Spring's 
    		@Required and @Autowired, as well as JSR 250's @Resource. -->
    	<context:annotation-config />
    
    	<context:component-scan base-package="com.ubik.newapp"
    		scoped-proxy="targetClass" />
    
    	<job id="importVentes" xmlns="http://www.springframework.org/schema/batch" >
    		<step id="readWriteVente">
    			<tasklet >
     				<chunk reader="venteReader" processor="venteProcessor" 
     					reader-transactional-queue="true" processor-transactional="true" writer="venteWriter" chunk-completion-policy="completionPolicy" >
                       
    				</chunk>
    			</tasklet>
    		</step>
    	</job>
    
    	<bean id="completionPolicy" class="com.ubik.newapp.batch.CompletionPolicy" />
    
     	<bean id="venteProcessor" class="com.ubik.newapp.batch.VenteProcessor" />
    
     	<bean id="venteWriter" class="com.ubik.newapp.batch.VenteWriter" />
    </beans>

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •