Hi,
I'm using Remote Chunking strategy. Imagine that I have 150.000 records at the database. I've defined both pageSize and commit-interval to 1000. I've started one master node and 9 slave nodes. For my surprise, only 3 slave nodes are processing items while the others are idle, just listening to the queue but doing nothing. If I stop one of these working slaves one slave that is idle starts to process items, and so on. It seems to me that the other slaves only start to work after the complete execution of a previous slave's work (after processing 1000 records, as defined by commit-interval).
Well, when I reduce commit-interval to a lower value, say 100, then all the slaves work togheter. Why does it happen? I can't undestand this behaviour. I really want all slaves working together as fast as they can, reading and processing 1k or more records.
This is my configuration:
masterBatchContext.xml
masterIntegrationContext.xml (JMS config for master node)Code:<?xml version="1.0" encoding="UTF-8"?> ... <!-- Spring Batch Job Registry --> <bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry" /> <!-- Spring Batch Job Launcher --> <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> </bean> <!-- Spring Batch Job Repository --> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean" p:dataSource-ref="subscriptionDS" p:transactionManager-ref="transactionManager" /> <!-- Spring Batch Job Registry Processor --> <bean class="org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor"> <property name="jobRegistry" ref="jobRegistry" /> </bean> <!-- Spring Batch Subscriptions Reader --> <bean id="subscriptionPageReader" class="org.springframework.batch.item.database.JdbcPagingItemReader" scope="step"> <property name="dataSource" ref="subscriptionDS" /> <property name="queryProvider"> <bean class="org.springframework.batch.item.database.support.SqlServerPagingQueryProvider"> <property name="selectClause" value="..." /> <property name="fromClause" value="..." /> <property name="whereClause" value="..." /> <property name="sortKey" value="id" /> </bean> </property> <property name="pageSize" value="1000" /> <property name="rowMapper" ref="subscriptionRowMapper" /> </bean> <bean id="subscriptionRowMapper" class="com.m4u.subscription.core.batch.reader.SubscriptionRowMapper" /> <!-- Spring Batch Subscriptions Processor --> <bean id="subscriptionProcessor" class="com.m4u.subscription.core.batch.processor.SubscriptionProcessor"> <property name="client" ref="subscriptionClient"/> </bean> <!-- Spring Batch Subscriptions Writer --> <bean id="subscriptionWriter" class="com.m4u.subscription.core.batch.writer.SubscriptionWriter" /> <!-- Spring Batch Job [Subscription Renew Job] --> <job id="sBatchRenewJob" xmlns="http://www.springframework.org/schema/batch"> <step id="subscriptionsLoad"> <tasklet> <chunk reader="subscriptionPageReader" processor="subscriptionProcessor" writer="subscriptionWriter" commit-interval="1000" /> </tasklet> </step> </job> <!-- Remote Chuncking Config --> <bean id="chunkHandler" class="org.springframework.batch.integration.chunk.RemoteChunkHandlerFactoryBean"> <property name="chunkWriter" ref="chunkWriter" /> <property name="step" ref="subscriptionsLoad" /> </bean> <bean id="chunkWriter" class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter" scope="step"> <property name="messagingOperations" ref="messagingGateway" /> <property name="replyChannel" ref="subscriptionReplies" /> <property name="maxWaitTimeouts" value="1000" /> </bean> </beans>
slaveBatchContext.xmlCode:<?xml version="1.0" encoding="UTF-8"?> ... <import resource="classpath:/jms-context.xml" /> <bean id="messagingGateway" class="org.springframework.integration.core.MessagingTemplate"> <property name="defaultChannel" ref="subscriptionRequests" /> <property name="receiveTimeout" value="1000" /> </bean> <int:channel id="subscriptionRequests" /> <int:channel id="incoming" /> <int-jms:outbound-channel-adapter channel="subscriptionRequests" connection-factory="connectionFactory" destination-name="subscriptionRequests" /> <int:transformer input-channel="incoming" output-channel="subscriptionReplies" ref="headerExtractor" method="extract" /> <bean id="headerExtractor" class="org.springframework.batch.integration.chunk.JmsRedeliveredExtractor" /> <int:channel id="subscriptionReplies"> <!-- <int:queue message-store="messageStore" />--> <int:queue /> <int:interceptors> <bean id="pollerInterceptor" class="org.springframework.batch.integration.chunk.MessageSourcePollerInterceptor"> <property name="messageSource"> <bean class="org.springframework.integration.jms.JmsDestinationPollingSource"> <constructor-arg> <ref bean="jmsTemplate" /> </constructor-arg> </bean> </property> <property name="channel" ref="incoming" /> </bean> </int:interceptors> </int:channel> </beans>
slaveIntegrationContext.xml (JMS config for slave nodes)Code:<?xml version="1.0" encoding="UTF-8"?> ... <import resource="classpath:/slaveIntegrationContext.xml" /> <bean id="chunkHandler" class="org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler"> <property name="chunkProcessor"> <bean class="org.springframework.batch.core.step.item.SimpleChunkProcessor"> <property name="itemWriter" ref="writer" /> <property name="itemProcessor" ref="processor" /> </bean> </property> </bean> <bean id="writer" class="com.m4u.subscription.core.batch.writer.SubscriptionWriter" /> <bean id="processor" class="com.m4u.subscription.core.batch.processor.SubscriptionProcessor"> <property name="client" ref="subscriptionClient"/> </bean> </beans>
jms-context.xml (JMS config that is common to both master and slave nodes)Code:<?xml version="1.0" encoding="UTF-8"?> ... <beans:import resource="classpath:/jms-context.xml" /> <channel id="subscriptionRequests"> <interceptors> <wire-tap channel="logChannel" /> </interceptors> </channel> <channel id="subscriptionReplies"> <queue /> <interceptors> <wire-tap channel="logChannel" /> </interceptors> </channel> <logging-channel-adapter auto-startup="true" id="logChannel" level="DEBUG" /> <beans:bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" /> </beans:beans>
Any suggestions?Code:<?xml version="1.0" encoding="UTF-8"?> ... <context:property-placeholder location="classpath:subscriptionbatch.properties" ignore-unresolvable="true"/> <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="failover:(${activemq.url})?randomize=false" /> </bean> </property> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestinationName" value="subscriptionReplies" /> <property name="receiveTimeout" value="100" /> <property name="sessionTransacted" value="true" /> </bean> <jms:listener-container connection-factory="connectionFactory" transaction-manager="transactionManager" acknowledge="transacted"> <jms:listener destination="subscriptionRequests" response-destination="subscriptionReplies" ref="chunkHandler" method="handleChunk" /> </jms:listener-container> <bean id="subscriptionClient" class="com.m4u.subscription.core.jaxrs.client.SubscriptionJAXRSClient"> <constructor-arg type="java.lang.String" value="${subscription.server.url}" /> </bean> </beans>


Reply With Quote
