Like another post that did not get a solution, I am trying this again. I'm working on a partitioned job POC using Spring Batch and Spring Integration and have hit a problem that I'm hoping that someone can help me help me with. While my issue is in the context of a Spring Batch job, I figured I would try posting it here since the real issue I'm having is with the use of Spring Integration.
I am launching the Job through an MBean using JConsole. When I run my job with the configuration below, the data is partitioned correctly, it's sent to the slaves fine and the slaves process their functionality successfully to completion. The issue I run into is that the job ends in a failure due to a NullPointerException in the MessageChannelPartitionHandler's handle method when it attempts to receive the replies. This is due to a timeout. The issue I'm having is that it seems like the messages are not getting past the aggregator part of the process. Actually, this only happens when the final aggregator is not running in the same JVM as the Splitter(MessageChannelPartitionHandler). Since ActiveMQ seems to be sending messages in a round robin fashion, my test works every other time. This seems to be an issue getting the message back to the replyChannel specified as inbound-master-replies-channel below.
I have two JVM processes each with the same spring context. I was hoping that the replyChannel would contain a replyToQueue back to the JVM that contains the Splitter. I have configured the aggregator to have a JDBC back message store so and that works fine. I currently have the Master, Slave, Aggregator and Splitter configured the same in each JVM. We are going for a homogeneous configuration to make configuration easier.
Some of the posts indicate that an inbound messaging gateway would work, but I cannot find a version of spring integration, or any examples, that allow for this.
Any ideas?
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xmlns:batch="http://www.springframework.org/schema/batch"
xsi:schemaLocation="http://www.springframework.org/schema/batch
http://www.springframework.org/schem...-batch-2.1.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schem...ration-2.0.xsd
http://www.springframework.org/schema/integration/jms
http://www.springframework.org/schem...on-jms-2.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
<description>Stock Record Report</description>
<import resource="../launch-context.xml"/>
<batch:job id="stockReportsJob">
<batch:step id="step1.master">
<batchartition partitioner="partitioner" handler="partitionHandler">
</batchartition>
</batch:step>
</batch:job>
<bean id="partitioner" class="com.mycompany.report.stock.batch.CusipParti tioner">
<property name="posnSecDao" ref="posnSecDao" />
</bean>
<bean id="partitionHandler" class="org.springframework.batch.integration.parti tion.MessageChannelPartitionHandler">
<property name="stepName" value="step1"/>
<property name="gridSize" value="2"/>
<property name="replyChannel" ref="inbound-master-replies-channel"/>
<property name="messagingOperations">
<bean class="org.springframework.integration.core.Messag ingTemplate">
<property name="defaultChannel"
ref="outbound-slave-requests-channel"/>
<property name="receiveTimeout" value="100000"/>
</bean>
</property>
</bean>
<int:gateway id="masterGateway"
default-request-channel="outbound-slave-requests-channel"/>
<int:aggregator ref="partitionHandler"
input-channel="inbound-aggregator-request-channel"
output-channel="inbound-master-replies-channel"
message-store="jdbcMessageGroupStore"/>
<int:channel id="outbound-slave-requests-channel"/>
<int-jms:outbound-channel-adapter connection-factory="jmsConnectionFactory"
destination="slaveQueue"
channel="outbound-slave-requests-channel"/>
<int:channel id="inbound-slave-requests-channel"/>
<int-jms:message-driven-channel-adapter connection-factory="jmsConnectionFactory"
destination="slaveQueue" channel="inbound-slave-requests-channel"/>
<int:service-activator ref="stepExecutionRequestHandler"
input-channel="inbound-slave-requests-channel"
output-channel="outbound-aggregator-request-channel"/>
<int:channel id="outbound-aggregator-request-channel"/>
<int-jms:outbound-channel-adapter connection-factory="jmsConnectionFactory"
destination="aggregatorQueue" channel="outbound-aggregator-request-channel"/>
<int:channel id="inbound-aggregator-request-channel"/>
<int-jms:message-driven-channel-adapter connection-factory="jmsConnectionFactory"
destination="aggregatorQueue" channel="inbound-aggregator-request-channel"/>
<int:channel id="inbound-master-replies-channel">
<int:queue />
</int:channel>
<step id="step1" xmlns="http://www.springframework.org/schema/batch">
<tasklet>
<chunk reader="positionItemReader"
writer="stockReportWriter" commit-interval="100" />
</tasklet>
</step>
<bean id="stepExecutionRequestHandler" class="org.springframework.batch.integration.parti tion.StepExecutionRequestHandler">
<property name="jobExplorer" ref="jobExplorer"/>
<property name="stepLocator" ref="stepLocator"/>
</bean>
<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTa skExecutor" />
<bean id="posnSecDao" class="com. mycompany.report.stock.dao.PositionSecurityDAO">
<property name="dataSource" ref="dataSourceCache" />
</bean>
<bean id="positionItemReader" scope="step" autowire-candidate="false" class="org.springframework.batch.item.database.Jdb cCursorItemReader">
<property name="dataSource" ref="dataSourceCache" />
<property name="rowMapper">
<bean class="com. mycompany.report.stock.batch.reader.PositionRowMap per" />
</property>
<property name="sql">
<value>
SELECT top 2 ID,CUSIP
FROM POSN
WHERE CUSIP=#{stepExecutionContext[minCusip]}
</value>
</property>
</bean>
<!-- A footer can be created somehow. See http://static.springsource.org/sprin...ASummaryFooter. -->
<bean id="stockReportWriter" class="org.springframework.batch.item.file.FlatFil eItemWriter" scope="step">
<property name="resource" value="file:#{stepExecutionContext[fileName]}.txt" />
<property name="lineAggregator">
<bean class="org.springframework.batch.item.file.transfo rm.DelimitedLineAggregator">
<property name="delimiter" value="," />
<property name="fieldExtractor">
<bean class="org.springframework.batch.item.file.transfo rm.BeanWrapperFieldExtractor">
<!-- <property name="names" value="id,cusip,ac9,acct,qtyleglloc" />-->
<property name="names" value="id,cusip" />
</bean>
</property>
</bean>
</property>
</bean>
</beans>
Some of Launch-context.xml
<bean id="jobLauncher" class="org.springframework.batch.core.launch.suppo rt.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
<bean id="jobOperator" class="org.springframework.batch.core.launch.suppo rt.SimpleJobOperator"
p:jobLauncher-ref="jobLauncher" p:jobExplorer-ref="jobExplorer"
p:jobRepository-ref="jobRepository" p:jobRegistry-ref="jobRegistry" />
<bean id="jobExplorer" class="org.springframework.batch.core.explore.supp ort.JobExplorerFactoryBean"
p:dataSource-ref="dataSource" />
<bean id="stepLocator" class="org.springframework.batch.integration.parti tion.BeanFactoryStepLocator" />
<bean id="jobRepository" class="org.springframework.batch.core.repository.s upport.JobRepositoryFactoryBean"
p:dataSource-ref="dataSource"
p:transactionManager-ref="transactionManager" />
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSou rceTransactionManager">
<property name="dataSource" ref="dataSource" />
</bean>
<bean id="jobRegistry" class="org.springframework.batch.core.configuratio n.support.MapJobRegistry" />
<bean
class="org.springframework.batch.core.configuratio n.support.JobRegistryBeanPostProcessor">
<property name="jobRegistry" ref="jobRegistry"/>
</bean>
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFacto ry">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
<amq:queue id="slaveQueue"
physicalName="bos.reports.slave"/>
<amq:queue id="aggregatorQueue"
physicalName="bos.reports.aggregator"/>
<amq:queue id="masterRepliesQueue"
physicalName="bos.reports.master"/>
<int-jdbc:message-store id="jdbcMessageGroupStore" data-source="dataSource"/>
<!-- this bean must not be lazily initialized if the exporting is to happen -->
<bean id="exporter" class="org.springframework.jmx.export.MBeanExporte r" lazy-init="false">
<property name="beans">
<map>
<entry key="bean:name=stockrecordMonitor1" value-ref="stockrecordMonitor"/>
</map>
</property>
</bean>
<bean id="stockrecordMonitor" class="com.mycompany.report.stock.batch.mbean.JMXS tockReportMonitor">
<property name="reportName" value="StockReportsJob"/>
</bean>
</beans>


artition partitioner="partitioner" handler="partitionHandler">
Reply With Quote
, but i think your issue is with the fact that due to the error aggregator is not receiving all the Messages it expects. For example aggregator might expect 3 Messages based on sequenceSize = 3, but somewhere downstream one of the Message processing results in error and therefore the resulting Message never reaches the aggregator.
