Results 1 to 6 of 6

Thread: aggregator and partitionHandler

  1. #1
    Join Date
    Jun 2012
    Posts
    14

    Default aggregator and partitionHandler

    Hi guys,

    I'm working on a partitioned job POC using Spring Batch and Spring Integration and have hit a snag that I'm hoping that someone can help me help me with. While my issue is in the context of a Spring Batch job, I figured I would try posting it here since the real issue I'm having is with it's use of Spring Integration.

    When I run my job with the configuration below, the data is partitioned correctly, it's sent to the slaves fine and the slaves process each step successfully to completion. The issue I run into is that the job ends in a failure due to a NullPointerException in the MessageChannelPartitionHandler's handle method when it attempts to receive the replies. I know that this is due to a timeout. The issue I'm having is that it seems like none of the messages are getting past the aggregator part of the process. I can see via jconsole messages being put on and being pulled off of both the requestsQueue and the stagingQueue, however no messages make it to the repliesQueue.Any help is appreciated! I have seen this problem on google but i have not seen any answer ?

    job.xml
    Code:
    <beans:bean id="partitionHandler" class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
    		<beans:property name="stepName" value="step1"/>
    		<beans:property name="gridSize" value="2"/>
    		<beans:property name="messagingOperations">
    			<beans:bean class="org.springframework.integration.core.MessagingTemplate">
    				<beans:property name="defaultChannel" ref="outbound-requests"/>
    				<beans:property name="receiveTimeout" value="100000"/>
    			</beans:bean>
    		</beans:property>
    	</beans:bean>
    
    	<int:channel id="outbound-requests"/>
    	<int-jms:outbound-channel-adapter connection-factory="connectionFactory" destination="requestsQueue" channel="outbound-requests"/>
    
    	<int:channel id="inbound-requests"/>
    	<int-jms:message-driven-channel-adapter connection-factory="connectionFactory" destination="requestsQueue" channel="inbound-requests"/>
    
    
    
    	<int:service-activator ref="stepExecutionRequestHandler" input-channel="inbound-requests" output-channel="outbound-staging">
    
        </int:service-activator>
    
    	<beans:bean id="stepExecutionRequestHandler" class="org.springframework.batch.integration.partition.StepExecutionRequestHandler">
    		<beans:property name="jobExplorer" ref="jobExplorer"/>
    		<beans:property name="stepLocator" ref="stepLocator"/>
    	</beans:bean>
    
    	<int:channel id="outbound-staging"/>
    	<int-jms:outbound-channel-adapter connection-factory="connectionFactory" destination="stagingQueue" channel="outbound-staging"/>
    
    	<int:channel id="inbound-staging"/>
    	<int-jms:message-driven-channel-adapter connection-factory="connectionFactory" destination="stagingQueue" channel="inbound-staging"/>
    	<int:channel id="channel2"></int:channel>
    	<int:aggregator ref="partitionHandler" input-channel="inbound-staging" output-channel="channel2" />
    	
    
    	<int-jms:outbound-channel-adapter connection-factory="connectionFactory" destination="repliesQueue" channel="channel2"/>
    
    	<int:channel id="inbound-replies">
    		<int:queue/>
    	</int:channel>
    	<int-jms:message-driven-channel-adapter connection-factory="connectionFactory" destination="repliesQueue" channel="inbound-replies"/>

  2. #2
    Join Date
    Jun 2012
    Posts
    14

    Default

    Any help ? is this a bug or something to do with regards to correlation ID ?

  3. #3
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,146

    Default

    I suggest you run with debug logging and follow the messages through the system(s); it should become obvious. If you can't figure out what's happening, post the logs here (in a zip).
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  4. #4
    Join Date
    Jun 2012
    Posts
    14

    Default

    thanks Gary ,

    I have modify the code a bit according to other post in forum .. here is breif description
    JVM1 MY controller ( calling 2 slaves)
    JVM2 AcitveMq runing jvm localhost
    JVM3 slave 1
    JVM4 slave 2

    When i run the code its only invoking the JVM3 twice and wont invoke my second slave ?
    Code:
    JVM 4 : log
    2012-06-27 15:42:41,590 INFO org.springframework.jms.listener.DefaultMessageListenerContainer#0-2 [org.springframework.jms.listener.DefaultMessageListenerContainer] - <Successfully refreshed JMS Connection>
    2012-06-27 15:43:01,569 DEBUG InactivityMonitor WriteCheck [org.apache.activemq.transport.InactivityMonitor] - <9992 ms elapsed since last write check.>
    2012-06-27 15:43:11,568 DEBUG InactivityMonitor WriteCheck [org.apache.activemq.transport.InactivityMonitor] - <9999 ms elapsed since last write check.>
    [JVM3 log]
    2012-06-27 15:42:36,977 DEBUG org.springframework.jms.listener.DefaultMessageListenerContainer#0-3 [org.apache.activemq.transport.WireFormatNegotiator] - <Sending: WireFormatInfo { version=6, properties={CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}>
    2012-06-27 15:42:37,014 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616 [org.apache.activemq.transport.WireFormatNegotiator] - <Received WireFormat: WireFormatInfo { version=6, properties={CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}>
    2012-06-27 15:42:37,014 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616 [org.apache.activemq.transport.WireFormatNegotiator] - <tcp://localhost/127.0.0.1:61616 before negotiation: OpenWireFormat{version=6, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false}>
    2012-06-27 15:42:37,016 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616 [org.apache.activemq.transport.WireFormatNegotiator] - <tcp://localhost/127.0.0.1:61616 after negotiation: OpenWireFormat{version=6, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false}>
    [Controller log]
    
    2012-06-27 15:43:24,627 DEBUG main [org.springframework.batch.integration.partition.MessageChannelPartitionHandler] - <Sending request: [Payload=StepExecutionRequest: [jobExecutionId=0, stepExecutionId=1, stepName=step1]][Headers={timestamp=1340808204627, id=cde843fd-0286-4d01-978b-6006f99e9b10, replyChannel=org.springframework.integration.channel.QueueChannel@4fac7f84, correlationId=0:step1, sequenceSize=2, sequenceNumber=0}]>
    2012-06-27 15:43:24,627 DEBUG main [org.springframework.batch.integration.partition.MessageChannelPartitionHandler] - <Sending request: [Payload=StepExecutionRequest: [jobExecutionId=0, stepExecutionId=2, stepName=step1]][Headers={timestamp=1340808204627, id=7247b7d9-fb37-4fcc-b3fa-ca8c46ddfd9a, replyChannel=org.springframework.integration.channel.QueueChannel@4fac7f84, correlationId=0:step1, sequenceSize=2, sequenceNumber=1}]>
    2012-06-27 15:43:24,629 DEBUG taskExecutor-1 [org.apache.activemq.transport.WireFormatNegotiator] - <Sending: WireFormatInfo { version=6, properties={CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}>
    2012-06-27 15:43:24,632 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616 [org.apache.activemq.transport.WireFormatNegotiator] - <Received WireFormat: WireFormatInfo { version=6, properties={CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}>
    2012-06-27 15:43:24,632 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616 [org.apache.activemq.transport.WireFormatNegotiator] - <tcp://localhost/127.0.0.1:61616 before negotiation: OpenWireFormat{version=6, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false}>
    2012-06-27 15:43:24,634 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616 [org.apache.activemq.transport.WireFormatNegotiator] - <tcp://localhost/127.0.0.1:61616 after negotiation: OpenWireFormat{version=6, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false}>
    2012-06-27 15:43:25,229 DEBUG taskExecutor-1 [org.apache.activemq.ActiveMQMessageConsumer] - <remove: ID:MW7EZEKHNX8CHR-56061-1340808204231-0:2:1:1, lastDeliveredSequenceId:0>
    2012-06-27 15:43:25,244 DEBUG taskExecutor-1 [org.apache.activemq.transport.tcp.TcpTransport] - <Stopping transport tcp://localhost/127.0.0.1:61616>
    2012-06-27 15:43:25,269 DEBUG taskExecutor-1 [org.apache.activemq.transport.WireFormatNegotiator] - <Sending: WireFormatInfo { version=6, properties={CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}>
    2012-06-27 15:43:25,274 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616 [org.apache.activemq.transport.WireFormatNegotiator] - <Received WireFormat: WireFormatInfo { version=6, properties={CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}>
    2012-06-27 15:43:25,274 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616 [org.apache.activemq.transport.WireFormatNegotiator] - <tcp://localhost/127.0.0.1:61616 before negotiation: OpenWireFormat{version=6, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false}>
    2012-06-27 15:43:25,274 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616 [org.apache.activemq.transport.WireFormatNegotiator] - <tcp://localhost/127.0.0.1:61616 after negotiation: OpenWireFormat{version=6, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false}>
    2012-06-27 15:43:25,754 DEBUG taskExecutor-1 [org.apache.activemq.ActiveMQMessageConsumer] - <remove: ID:MW7EZEKHNX8CHR-56061-1340808204231-0:3:1:1, lastDeliveredSequenceId:0>
    2012-06-27 15:43:25,769 DEBUG taskExecutor-1 [org.apache.activemq.transport.tcp.TcpTransport] - <Stopping transport tcp://localhost/127.0.0.1:61616>
    2012-06-27 15:43:25,792 DEBUG main [org.springframework.batch.integration.partition.MessageChannelPartitionHandler] - <Received replies: [Payload=[StepExecution: id=1, version=9, name=step1.master:partition1, status=COMPLETED, exitStatus=COMPLETED, readCount=6, filterCount=0, writeCount=6 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=7, rollbackCount=0, exitDescription=, StepExecution: id=2, version=10, name=step1.master:partition0, status=COMPLETED, exit
    Status=COMPLETED, readCount=7, filterCount=0, writeCount=7 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=8, rollbackCount=0, exitDescription=]][Headers={timestamp=1340808205792, id=821af29b-b3a8-4aff-9d7f-0111a075abf4, correlationId=0:step1, replyChannel=org.springframework.integration.channel.QueueChannel@4fac7f84}]>
    2012-06-27 15:43:25,799 DEBUG main [org.springframework.batch.core.step.AbstractStep] - <Step execution success: id=0>
    2012-06-27 15:43:25,809 DEBUG main [org.springframework.batch.core.step.AbstractStep] - <Step execution complete: StepExecution: id=0, version=2, name=step1.master, status=COMPLETED, exitStatus=COMPLETED, readCount=13, filterCount=0, writeCount=13 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=15, rollbackCount=0>
    2012-06-27 15:43:25,814 DEBUG main [org.springframework.batch.core.job.flow.support.SimpleFlow] - <Completed state=geocodingJob.step1.master with status=COMPLETED>
    my code 
    <beans:bean id="partitionHandler" class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
    		<beans:property name="stepName" value="step1"/>
    		<beans:property name="gridSize" value="2"/>
    		<beans:property name="messagingOperations">
    			<beans:bean class="org.springframework.integration.core.MessagingTemplate">
    				<beans:property name="defaultChannel" ref="requests"/>
    				<beans:property name="receiveTimeout" value="999999"/>
    			</beans:bean>
    		</beans:property>
    	</beans:bean>
    	
    
    
    	<task:executor id="taskExecutor"  />
    
    	<int:channel id="requests">
    		<int:queue  />
    	</int:channel>
    
    <int:channel id="staging" />
    
    
    	<int:service-activator ref="stepExecutionRequestHandler" input-channel="worker" />
    
        <int:aggregator ref="partitionHandler" input-channel="staging" />
    
    
    <int-jms:outbound-gateway request-channel="requests" request-destination-name="bos.job.slave.requests" reply-channel="staging" receive-timeout="30000">
    		<int:poller fixed-rate="200" task-executor="taskExecutor" />
    	</int-jms:outbound-gateway>
    
    <int-jms:inbound-gateway request-channel="worker" request-destination-name="bos.job.slave.requests"  concurrent-consumers="3" />
    
    
    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" >
    		<property name="brokerURL" value="tcp://localhost:61616"/>
    	</bean>
    
          
      
    	

  5. #5
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,146

    Default

    Given that you have concurrent consumers=3, it's perfectly possible for the two partitions to land on just one of the slaves; set it to 1, or use at least 4 partitions to ensure both slaves get some work.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  6. #6
    Join Date
    Jun 2012
    Posts
    14

    Default

    Thanks Gary ! it worked

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •