Results 1 to 5 of 5

Thread: Need help using Remote Chunking

  1. #1

    Default Need help using Remote Chunking

    Hi, I'm trying for some days now to use Remote Chunking in my project, but my slaves are not receiving the message. Tried the example from the books Pro Spring Batch and Spring Batch In Action, don't know what I'm missing here. I still don't understand why there's no processor in the chunk tag, processor is only meant to be used in the slaves, in SimpleChunkProcessor? (don't pay attention to nspaces, i'll organize it later)
    Here is my slave main class:
    Code:
    public class SlaveProcessor
    {
        public static void main(String[] args) throws Exception
        {
            new ClassPathXmlApplicationContext("slaveBatchContext.xml");
            System.in.read();
        }
    }
    So i just use java -jar slave.jar, is that correct?
    My xml's:
    jms-context.xml (from Spring Source - Integration):
    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:p="http://www.springframework.org/schema/p"
    	xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd">
    
    	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    		<property name="connectionFactory" ref="connectionFactory" />
    		<property name="receiveTimeout" value="100" />
    		<property name="sessionTransacted" value="true" />
    	</bean>
    
    	<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    		<property name="brokerURL" value="tcp://localhost:61616" />
    	</bean>
    
    </beans>
    slaveIntegrationContext.xml
    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans:beans xmlns="http://www.springframework.org/schema/integration"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans"
    	xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
    	xsi:schemaLocation="
    	http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
        http://www.springframework.org/schema/integration
        http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-2.5.xsd
        http://www.springframework.org/schema/tx
        http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
        ">
    
    	<beans:import resource="classpath:/jms-context.xml" />
    
    	<annotation-config />
    
    	<channel id="requests" />
    	<channel id="replies">
    		<queue />
    	</channel>
    
    	<beans:bean id="transactionManager"
    		class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
    
    	<tx:annotation-driven />
    
    	<service-activator input-channel="requests"
    		output-channel="replies" ref="chunkHandler" />
    </beans:beans>
    slaveBatchContext.xml
    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans"
    	xmlns:integration="http://www.springframework.org/schema/integration"
    	xmlns:batch="http://www.springframework.org/schema/batch"
    	xmlns:context="http://www.springframework.org/schema/context"
    	xmlns:task="http://www.springframework.org/schema/task" xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
    	xmlns:jms="http://www.springframework.org/schema/jms"
    	xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd
        http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.0.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
        http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
        ">
    
    	<import resource="classpath:/slaveIntegrationContext.xml" />
    
    	<bean id="chunkHandler"
    		class="org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler">
    		<property name="chunkProcessor">
    			<bean class="org.springframework.batch.core.step.item.SimpleChunkProcessor">
    				<property name="itemWriter" ref="writer" />
    				<property name="itemProcessor" ref="processor" />
    			</bean>
    		</property>
    	</bean>
    
    	<!-- The Slave ItemWriter that does the writing -->
    
    	<bean id="writer"
    		class="com.X.batch.writer.XWriter" />
    
    	<bean id="processor"
    		class="com.X.batch.processor.XProcessor" />
    
    </beans>
    masterIntegrationContext.xml
    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
    	xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
    	xmlns:jms="http://www.springframework.org/schema/jms" xmlns:task="http://www.springframework.org/schema/task"
    	xsi:schemaLocation="
        http://www.springframework.org/schema/beans 
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration
        http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/integration/jms
        http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd 
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-3.0.xsd
        http://www.springframework.org/schema/task 
        http://www.springframework.org/schema/task/spring-task-3.0.xsd
        ">
    
    	<import resource="classpath:/jms-context.xml" />
    
    	<bean id="messagingGateway" class="org.springframework.integration.core.MessagingTemplate">
    		<property name="defaultChannel" ref="requests" />
    		<property name="receiveTimeout" value="1000" />
    	</bean>
    
    	<int-jms:outbound-channel-adapter
    		channel="requests" connection-factory="connectionFactory"
    		destination-name="requests" />
    
    	<int:channel id="requests" />
    	<int:channel id="incoming" />
    
    	<int:transformer input-channel="incoming"
    		output-channel="replies" ref="headerExtractor" method="extract" />
    
    	<bean id="headerExtractor"
    		class="org.springframework.batch.integration.chunk.JmsRedeliveredExtractor" />
    
    	<int:channel id="replies">
    		<int:queue />
    		<int:interceptors>
    			<bean id="pollerInterceptor"
    				class="org.springframework.batch.integration.chunk.MessageSourcePollerInterceptor">
    				<property name="messageSource">
    					<bean
    						class="org.springframework.integration.jms.JmsDestinationPollingSource">
    						<constructor-arg>
    							<bean class="org.springframework.jms.core.JmsTemplate">
    								<property name="connectionFactory" ref="connectionFactory" />
    								<property name="defaultDestinationName" value="replies" />
    								<property name="receiveTimeout" value="100" />
    							</bean>
    						</constructor-arg>
    					</bean>
    				</property>
    				<property name="channel" ref="incoming" />
    			</bean>
    		</int:interceptors>
    	</int:channel>
    
    	<jms:listener-container connection-factory="connectionFactory"
    		transaction-manager="transactionManager" acknowledge="transacted">
    		<jms:listener destination="requests"
    			response-destination="replies" ref="chunkHandler" method="handleChunk" />
    	</jms:listener-container>
        
        <int:service-activator input-channel="requests"
            output-channel="replies" ref="chunkHandler" />
        
    </beans>

  2. #2

    Default

    masterBatchContext.xml
    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:p="http://www.springframework.org/schema/p"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    	xmlns:int="http://www.springframework.org/schema/integration"
    	xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
    	xmlns:jms="http://www.springframework.org/schema/jms" xmlns:task="http://www.springframework.org/schema/task"
    	xsi:schemaLocation="
    	http://www.springframework.org/schema/aop 
    	http://www.springframework.org/schema/aop/spring-aop.xsd
        http://www.springframework.org/schema/batch 
        http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
        http://www.springframework.org/schema/beans 
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-3.1.xsd
        http://www.springframework.org/schema/integration
        http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/integration/jms
        http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd 
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-3.0.xsd
        http://www.springframework.org/schema/task 
        http://www.springframework.org/schema/task/spring-task-3.0.xsd
        ">
    
    	<import resource="classpath:/masterIntegrationContext.xml" />
    
    	<!-- DS config -->
    	<context:property-placeholder location="classpath:jdbc.properties" />
    
    	<bean id="XDS" class="org.apache.commons.dbcp.BasicDataSource"
    		destroy-method="close" p:driverClassName="${jdbc.driverClassName}"
    		p:url="${jdbc.url}" p:username="${jdbc.username}" p:password="${jdbc.password}" />
    
    	<bean id="sessionFactory"
    		class="org.springframework.orm.hibernate4.LocalSessionFactoryBean">
    		<property name="dataSource" ref="XDS" />
    		<property name="configLocation">
    			<value>classpath:hibernate.cfg.xml</value>
    		</property>
    		<property name="hibernateProperties">
    			<props>
    				<prop key="hibernate.dialect">${hibernate.dialect}</prop>
    				<prop key="hibernate.show_sql">${hibernate.show_sql}</prop>
    				<prop key="hibernate.format_sql">${hibernate.format_sql}</prop>
    				<prop key="hibernate.cache.use_query_cache">false</prop>
    				<prop key="hibernate.cache.use_second_level_cache">false</prop>
    				<prop key="hibernate.cache.region.factory_class">${hibernate.cache.region.factory_class}</prop>
    			</props>
    		</property>
    	</bean>
    
    	<bean
    		class="org.springframework.dao.annotation.PersistenceExceptionTranslationPostProcessor" />
    
    	<bean class="org.springframework.orm.hibernate4.HibernateExceptionTranslator" />
    
    	<bean id="transactionManager"
    		class="org.springframework.orm.hibernate4.HibernateTransactionManager">
    		<property name="sessionFactory" ref="sessionFactory" />
    	</bean>
    
    	<!-- Spring Batch Job Registry -->
    	<bean id="jobRegistry"
    		class="org.springframework.batch.core.configuration.support.MapJobRegistry" />
    
    	<!-- Spring Batch Job Launcher -->
    	<bean id="jobLauncher"
    		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
    		<property name="jobRepository" ref="jobRepository" />
    	</bean>
    
    	<!-- Spring Batch Job Repository -->
    	<bean id="jobRepository"
    		class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean"
    		p:dataSource-ref="XDS" p:transactionManager-ref="transactionManager" />
    
    	<!-- Spring Batch Job Registry Processor -->
    	<bean
    		class="org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor">
    		<property name="jobRegistry" ref="jobRegistry" />
    	</bean>
    
    	<bean id="XProcessor"
    		class="com.X.batch.processor.XProcessor" />
    		
    	<bean id="XReader"
    		class="org.springframework.batch.item.database.HibernateCursorItemReader">
    		<property name="sessionFactory" ref="sessionFactory" />
    		<property name="queryString"
    			value="QUERY OMITED" />
    	</bean>
    
    	<bean id="XWriter"
    		class="com.X.batch.writer.XWriter" />
    
    	<job id="sBatchRenewJob" xmlns="http://www.springframework.org/schema/batch">
    		<step id="XLoad">
    			<tasklet>
    				<chunk reader="XReader"
    					writer="XWriter" commit-interval="10000" />
    			</tasklet>
    		</step>
    	</job>
    
    	<!-- Remote Chuncking Config -->
    	<bean id="chunkHandler"
    		class="org.springframework.batch.integration.chunk.RemoteChunkHandlerFactoryBean">
    		<property name="chunkWriter" ref="chunkWriter" />
    		<property name="step" ref="XLoad" />
    	</bean>
    
    	<bean id="chunkWriter"
    		class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
    		scope="step">
    		<property name="messagingOperations" ref="messagingGateway" />
    		<property name="replyChannel" ref="replies" />
    		<property name="maxWaitTimeouts" value="3000" />
    	</bean>
    
    </beans>
    I'm using maven with 2 profiles to create my slave/master. main class for master:
    Code:
    <mainClass>org.springframework.batch.core.launch.support.CommandLineJobRunner</mainClass>
    So I run the master with java -jar masterBatchContext.xml sBatchRenewJob run.id=1, but the slaves doesnt receive the messages and also only the reader and writer is being called, so no data is processed. If i insert the processor in the chunk tag, only the master process the data.
    Thanks in advance, if you need any other info just ask.

  3. #3

    Default

    Thanks for your support, I solved it.

  4. #4

    Default

    Hi Traduz,
    Would be very interested in what you did to solve this issue. Basically I am trying to do the same and I find that the getObject method of the RemoteChunkHandlerFactoryBean is never getting called in my batch master though it seems to be getting called correctly in the spring batch integration sample.
    Any help appreciated.

    Regards,
    Anoop

  5. #5

    Default

    I'll take a look on my configs and let you know.

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •