Page 1 of 2 12 LastLast
Results 1 to 10 of 15

Thread: Spring Batch + Integration + Partitioning not working.

  1. #1
    Join Date
    Nov 2011
    Posts
    9

    Default Spring Batch + Integration + Partitioning not working.

    Like another post that did not get a solution, I am trying this again. I'm working on a partitioned job POC using Spring Batch and Spring Integration and have hit a problem that I'm hoping that someone can help me help me with. While my issue is in the context of a Spring Batch job, I figured I would try posting it here since the real issue I'm having is with the use of Spring Integration.
    I am launching the Job through an MBean using JConsole. When I run my job with the configuration below, the data is partitioned correctly, it's sent to the slaves fine and the slaves process their functionality successfully to completion. The issue I run into is that the job ends in a failure due to a NullPointerException in the MessageChannelPartitionHandler's handle method when it attempts to receive the replies. This is due to a timeout. The issue I'm having is that it seems like the messages are not getting past the aggregator part of the process. Actually, this only happens when the final aggregator is not running in the same JVM as the Splitter(MessageChannelPartitionHandler). Since ActiveMQ seems to be sending messages in a round robin fashion, my test works every other time. This seems to be an issue getting the message back to the replyChannel specified as inbound-master-replies-channel below.
    I have two JVM processes each with the same spring context. I was hoping that the replyChannel would contain a replyToQueue back to the JVM that contains the Splitter. I have configured the aggregator to have a JDBC back message store so and that works fine. I currently have the Master, Slave, Aggregator and Splitter configured the same in each JVM. We are going for a homogeneous configuration to make configuration easier.
    Some of the posts indicate that an inbound messaging gateway would work, but I cannot find a version of spring integration, or any examples, that allow for this.
    Any ideas?
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
    xmlns:batch="http://www.springframework.org/schema/batch"
    xsi:schemaLocation="http://www.springframework.org/schema/batch
    http://www.springframework.org/schem...-batch-2.1.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schem...ration-2.0.xsd
    http://www.springframework.org/schema/integration/jms
    http://www.springframework.org/schem...on-jms-2.0.xsd
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">

    <description>Stock Record Report</description>

    <import resource="../launch-context.xml"/>

    <batch:job id="stockReportsJob">
    <batch:step id="step1.master">
    <batchartition partitioner="partitioner" handler="partitionHandler">
    </batchartition>
    </batch:step>
    </batch:job>

    <bean id="partitioner" class="com.mycompany.report.stock.batch.CusipParti tioner">
    <property name="posnSecDao" ref="posnSecDao" />
    </bean>

    <bean id="partitionHandler" class="org.springframework.batch.integration.parti tion.MessageChannelPartitionHandler">
    <property name="stepName" value="step1"/>
    <property name="gridSize" value="2"/>
    <property name="replyChannel" ref="inbound-master-replies-channel"/>
    <property name="messagingOperations">
    <bean class="org.springframework.integration.core.Messag ingTemplate">
    <property name="defaultChannel"
    ref="outbound-slave-requests-channel"/>
    <property name="receiveTimeout" value="100000"/>
    </bean>
    </property>
    </bean>

    <int:gateway id="masterGateway"
    default-request-channel="outbound-slave-requests-channel"/>

    <int:aggregator ref="partitionHandler"
    input-channel="inbound-aggregator-request-channel"
    output-channel="inbound-master-replies-channel"
    message-store="jdbcMessageGroupStore"/>

    <int:channel id="outbound-slave-requests-channel"/>
    <int-jms:outbound-channel-adapter connection-factory="jmsConnectionFactory"
    destination="slaveQueue"
    channel="outbound-slave-requests-channel"/>
    <int:channel id="inbound-slave-requests-channel"/>
    <int-jms:message-driven-channel-adapter connection-factory="jmsConnectionFactory"
    destination="slaveQueue" channel="inbound-slave-requests-channel"/>
    <int:service-activator ref="stepExecutionRequestHandler"
    input-channel="inbound-slave-requests-channel"
    output-channel="outbound-aggregator-request-channel"/>

    <int:channel id="outbound-aggregator-request-channel"/>
    <int-jms:outbound-channel-adapter connection-factory="jmsConnectionFactory"
    destination="aggregatorQueue" channel="outbound-aggregator-request-channel"/>

    <int:channel id="inbound-aggregator-request-channel"/>
    <int-jms:message-driven-channel-adapter connection-factory="jmsConnectionFactory"
    destination="aggregatorQueue" channel="inbound-aggregator-request-channel"/>


    <int:channel id="inbound-master-replies-channel">
    <int:queue />
    </int:channel>

    <step id="step1" xmlns="http://www.springframework.org/schema/batch">
    <tasklet>
    <chunk reader="positionItemReader"
    writer="stockReportWriter" commit-interval="100" />
    </tasklet>
    </step>

    <bean id="stepExecutionRequestHandler" class="org.springframework.batch.integration.parti tion.StepExecutionRequestHandler">
    <property name="jobExplorer" ref="jobExplorer"/>
    <property name="stepLocator" ref="stepLocator"/>
    </bean>

    <bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTa skExecutor" />

    <bean id="posnSecDao" class="com. mycompany.report.stock.dao.PositionSecurityDAO">
    <property name="dataSource" ref="dataSourceCache" />
    </bean>

    <bean id="positionItemReader" scope="step" autowire-candidate="false" class="org.springframework.batch.item.database.Jdb cCursorItemReader">
    <property name="dataSource" ref="dataSourceCache" />
    <property name="rowMapper">
    <bean class="com. mycompany.report.stock.batch.reader.PositionRowMap per" />
    </property>
    <property name="sql">
    <value>
    SELECT top 2 ID,CUSIP
    FROM POSN
    WHERE CUSIP=#{stepExecutionContext[minCusip]}
    </value>
    </property>
    </bean>

    <!-- A footer can be created somehow. See http://static.springsource.org/sprin...ASummaryFooter. -->
    <bean id="stockReportWriter" class="org.springframework.batch.item.file.FlatFil eItemWriter" scope="step">
    <property name="resource" value="file:#{stepExecutionContext[fileName]}.txt" />
    <property name="lineAggregator">
    <bean class="org.springframework.batch.item.file.transfo rm.DelimitedLineAggregator">
    <property name="delimiter" value="," />
    <property name="fieldExtractor">
    <bean class="org.springframework.batch.item.file.transfo rm.BeanWrapperFieldExtractor">
    <!-- <property name="names" value="id,cusip,ac9,acct,qtyleglloc" />-->
    <property name="names" value="id,cusip" />
    </bean>
    </property>
    </bean>
    </property>
    </bean>

    </beans>

    Some of Launch-context.xml

    <bean id="jobLauncher" class="org.springframework.batch.core.launch.suppo rt.SimpleJobLauncher">
    <property name="jobRepository" ref="jobRepository" />
    </bean>

    <bean id="jobOperator" class="org.springframework.batch.core.launch.suppo rt.SimpleJobOperator"
    p:jobLauncher-ref="jobLauncher" p:jobExplorer-ref="jobExplorer"
    p:jobRepository-ref="jobRepository" p:jobRegistry-ref="jobRegistry" />

    <bean id="jobExplorer" class="org.springframework.batch.core.explore.supp ort.JobExplorerFactoryBean"
    p:dataSource-ref="dataSource" />

    <bean id="stepLocator" class="org.springframework.batch.integration.parti tion.BeanFactoryStepLocator" />

    <bean id="jobRepository" class="org.springframework.batch.core.repository.s upport.JobRepositoryFactoryBean"
    p:dataSource-ref="dataSource"
    p:transactionManager-ref="transactionManager" />

    <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSou rceTransactionManager">
    <property name="dataSource" ref="dataSource" />
    </bean>

    <bean id="jobRegistry" class="org.springframework.batch.core.configuratio n.support.MapJobRegistry" />
    <bean
    class="org.springframework.batch.core.configuratio n.support.JobRegistryBeanPostProcessor">
    <property name="jobRegistry" ref="jobRegistry"/>
    </bean>

    <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFacto ry">
    <property name="brokerURL" value="tcp://localhost:61616"/>
    </bean>

    <amq:queue id="slaveQueue"
    physicalName="bos.reports.slave"/>
    <amq:queue id="aggregatorQueue"
    physicalName="bos.reports.aggregator"/>
    <amq:queue id="masterRepliesQueue"
    physicalName="bos.reports.master"/>

    <int-jdbc:message-store id="jdbcMessageGroupStore" data-source="dataSource"/>

    <!-- this bean must not be lazily initialized if the exporting is to happen -->
    <bean id="exporter" class="org.springframework.jmx.export.MBeanExporte r" lazy-init="false">
    <property name="beans">
    <map>
    <entry key="bean:name=stockrecordMonitor1" value-ref="stockrecordMonitor"/>
    </map>
    </property>
    </bean>

    <bean id="stockrecordMonitor" class="com.mycompany.report.stock.batch.mbean.JMXS tockReportMonitor">
    <property name="reportName" value="StockReportsJob"/>
    </bean>
    </beans>

  2. #2
    Join Date
    Jan 2008
    Location
    Mohnton, PA USA (that's near Philadelphia)
    Posts
    2,148

    Default

    To much info to process , but i think your issue is with the fact that due to the error aggregator is not receiving all the Messages it expects. For example aggregator might expect 3 Messages based on sequenceSize = 3, but somewhere downstream one of the Message processing results in error and therefore the resulting Message never reaches the aggregator.
    If so, I've explained this use case recently at SpringOne. You may want to look at this sample from it https://github.com/olegz/s12gx.2011/...egration/error
    Let me know if that is the issue otherwise we'll dig deeper.

  3. #3
    Join Date
    Nov 2011
    Posts
    9

    Default

    Sorry about the information overload. I have debugged the CorrelatingMessageHandler and it is always sending the response message. The problem I think is the temporary queue logic/configuration.

    I have two JVM's.

    1. JVM1 launches the job from JMX and invokes the MessageChannelPartitionHandler.
    2. MessageChannelPartitionHandler in JVM1 sends two slave processing messages to the slaveQueue.
    3. JVM1 pulls one message from the slaveQueue and processes it and puts it's response on the aggregatorQueue.
    4. JVM2 pulls the other message from the slaveQueue and processes it and puts it's response on the aggregatorQueue.
    5. JVM1 pulls one message from the aggregatorQueue and processes it. It determines that there is only one message in the INT_MESSAGE_GROUP table and that there needs to be two for further processing and does nothing else.
    6. JVM2 pulls one message from the aggregatorQueue and processes it. It determines that there are two messages in the INT_MESSAGE_GROUP table and that the slave processing is complete. It then sends the response to the inbound-master-replies-channel which seems to send the messge back to itself. The message never gets back to JVM1. I see the call to this.messagingTemplate.send((MessageChannel) replyChannel, (Message<?>) reply); in the sendReplyMessage method in the Spring CorrelatingMessgeHandler.

    When JVM1 is the last aggregator, everything works. It is like the replyToQueue is resovling itself to the JVM's spring conifig that is performing the send. I would like the message to always go back to the JVM that initally put the replyToQueue in the message. That is where the MessageChannelPartitionHandler is waiting for the response.

    Sorry this is probably too much information also.

  4. #4
    Join Date
    Jan 2008
    Location
    Mohnton, PA USA (that's near Philadelphia)
    Posts
    2,148

    Default

    From what I can process you have two aggregators on two different VMs that are connected to a MessageStore backed by the same resource (e.g., RDBMS). That is why the second aggregator is able to see the first Message since they are par of the same Message Group.

    The thing is that aggregator only checks for 'canRelease' upon arrival of a new Message. This explains why second aggregator works. It also explains why first one doesn't, since no message was sent to it to trigger 'canRelease' process. And if it did things would be even worst since the same group would be released twice.

    I guess I want to better understand the use case as to why you need to distribute aggregators. What are you trying to accomplish?

  5. #5
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,148

    Default

    You are using channel adapters, not gateways; there is no 'replyTo' channel.

    One technique I have used in this scenario is to have the inbound adapters autostart="false"

    Code:
    <int-jms:message-driven-channel-adapter id="agg_response_adapter" autostart="false"
    connection-factory="jmsConnectionFactory"
    destination="aggregatorQueue" channel="inbound-aggregator-request-channel"/>
    Then, only start it on the master, by having the partition handler send a '@agg_response_adapter.start()' message to a <control-bus/>

    BTW, when posting code and config, please surround it with [ code ]...[ /code ] tags (no spaces inside brackets).
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  6. #6
    Join Date
    Nov 2011
    Posts
    9

    Post

    What we are trying to achieve...

    We are performing a POC to see if Spring Batch + Spring Integration can replace our homegrown batch processing framework. We are going for massively scalability. We are looking to see if we can create a pool of <n> number of JVM's that are all configured the same. We anticiapte an initial pool of 32 JVM 's on 16 different application server to handle our batch processing needs. We would really like to, not, have a different configuration for each of the JMV's. We anticipate needing to dynamically add more servers and JVM's to the processing pool.

    As far as the aggregators are concerned...

    I believe the aggregators are working as we think they should. Any JVM can pull a message of the aggregator
    queue and the canRelease functionality works fine. It only indicates that the final response message can be sent back to the JVM that intiated the batch job after the final slave response is received.

    ReplyToQueue....

    I was trying to use a temporary replyToQueue that the aggregator that processed the final slave response would use to send the aggregated response back to the JVM that intiated the batch job through JMX. I ,too, believe this is the issue. I have read that we should use an inbound messaging gateway configured on the MessageChannelPartitionHandler, but I cannot find an examples of this.

    To sum it up...

    We could use help figuring out how to get the message from the final aggregator JVM back to the JVM that initiated the processing, using a common Spring configuration. The following is what I was trying to configure:

    1 JVM1 has a batch job launched from JMX.
    2. JVM1 partitions the business logic and sends <n> number of slave processing requests to a queue. In each of the slave messages the MessageChannelPartitionHandler puts it's replyToQueue in the message to each of the slaves. Only JVM1 should be listening on that queue. I guess this could be some kind of message selector also. Not sure.
    3. Any of the pool of JVM's can pick up the slave messages, process them and put their reponse on the aggregator queue.
    4. Any of the pool of JVM's can pick up the aggregator messages, from the aggregator queue, and process them.
    5. When one of the aggregator determines it has recieved the final message, it uses the replyToQueue from the message to send the final(aggregated) response message back to JVM1.

  7. #7
    Join Date
    Jun 2005
    Posts
    4,241

    Default

    Quote Originally Posted by jsr View Post
    I have read that we should use an inbound messaging gateway configured on the MessageChannelPartitionHandler
    That sounds close, but not quite on the money.

    I can sum up the problem as I understand it by pointing out that ig you have multiple consumers competing for step aggregation replies, then that is too many consumers. Only one step instance launched the slaves, so only one consumer should listen for the replies.

    I like Gary's clever workaround, but it shouldn't be necessary if you can just use an anonymous reply queue for the responses - that way there is only ever one consumer listening on the reply queue, and it is (by construction) the master step. The integration tests in Spring Batch Integration use this configuration, and I believe it should work without any modifications (https://github.com/SpringSource/spri...ts-context.xml).

  8. #8
    Join Date
    Nov 2011
    Posts
    9

    Default

    Dave, Thank you! I looked at that config and tried to reproduce it for our config. I am getting the following now:

    Caused by: java.lang.IllegalArgumentException: no outputChannel or replyChannel header available
    at org.springframework.util.Assert.notNull(Assert.jav a:112)
    at org.springframework.integration.aggregator.Correla tingMessageHandler.sendReplies(CorrelatingMessageH andler.java:334)
    at org.springframework.integration.aggregator.Correla tingMessageHandler.completeGroup(CorrelatingMessag eHandler.java:320)
    at org.springframework.integration.aggregator.Correla tingMessageHandler.handleMessageInternal(Correlati ngMessageHandler.java:183)
    at org.springframework.integration.handler.AbstractMe ssageHandler.handleMessage(AbstractMessageHandler. java:78)

    I am using the spring-batch-integration-1.2.1.RELEASE.jar. Would that be the correct one? Seems that the anonomous repyToQueue was not put in the message.

  9. #9
    Join Date
    Nov 2011
    Posts
    9

    Default

    To add a little to the information I am seeing the following informational message before I get the above error.

    283391 [taskExecutor-1] INFO org.springframework.integration.MessageHeaders - removing non-serializable header: replyChannel

    I am guessing that has something to do with the replyChannel not being found. I am not sure how I configured that differently than the example.

  10. #10
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,148

    Default

    Did you change your jms endpoints to be gateways, like those used in the test config file Dave pointed you, instead of the channel adapters that were in your original post?

    The gateway will sit and wait for the reply, retain a reference to the reply channel, and include it in the reply header so it is there for the aggregator when he needs it. With channel adapters, as you have seen, we can't serialize the reply channel and it is lost because nothing is retaining state on the master.

    In order to use gateways, there are a couple of things to note:

    1. You need some async handoff (such as is done in the example by making the requests channel a queue channel). Otherwise it will all run single-threaded and you'll process one partition at a time.
    2. You need to increase the timeout on the outbound gateway to avoid it timing out while it's waiting for the worker to process the partition. Again, you need the requesting thread, that has a reference to the reply channel.


    HTH
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •