I am new to Spring Integration, and I try to make an Remote-Paritioning POC with JMS, base on the VanillaIntegrationTest of the spring-batch-integration unit test. Here is my resulting config, can anyone tell me if I am on the right track?
(Spring Batch 2.1.6, Spring Batch Integration 1.2.0, Spring Integration 2.0.3)
Basically I am breaking the original requests channel in orginal VanillaIntegrationTest (similar for staging and replies channel) into 1 outbound and 1 inbound channel (both are direct channel). Outbound one is connected to a JMS outbound-channel-adapter which send to the requests-JMS-Queue. Then a JMS message-driven-channel-adapter receive data from JMS and connects to the inbound channel.Code:<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" (bunch of xml namespaces deleted) > <!-- Active-MQ JMS queues partitionRequestsQueue, partitionStagingQueue and partitionRepliesQueue defined in another file --> <!-- jobLauncher defined in another file too, using default sync executor --> <batch:job id="integrationPartitionJob" > <batch:step id="integrationPartitionStep-master" > <batch:partition step="integrationPartitionStep1" partitioner="integrationPartitioner" handler="partitionHandler" /> </batch:step> </batch:job> <bean id="integrationPartitioner" class="my.SamplePartitioner" scope="step"/> <batch:step id="integrationPartitionStep1" > <batch:tasklet> <bean id="integrationPartitionTasklet" class="my.PartitionTasklet" scope="step"> <property name="partitionedValue" value="#{stepExecutionContext[partitionedValue]}"/> </bean> </batch:tasklet> </batch:step> <!-- Partition Handler: send the partitions to requests channel --> <bean id="partitionHandler" class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler" > <!-- scope="step" --> <property name="messagingOperations" ref="outboundRequestMessagingTemplate" /> <property name="replyChannel" ref="inboundPartitionReplies" /> <property name="stepName" value="integrationPartitionStep1" /> <property name="gridSize" value="2" /> </bean> <!-- Outbound Request channel, and bind the channel to JMS partitionRequestsQueue --> <integration:channel id="outboundPartitionRequests" /> <bean id="outboundRequestMessagingTemplate" class="org.springframework.integration.core.MessagingTemplate"> <property name="defaultChannel" ref="outboundPartitionRequests" /> <property name="receiveTimeout" value="1000" /> </bean> <int-jms:outbound-channel-adapter connection-factory="batch.jmsFactory" destination="partitionRequestsQueue" channel="outboundPartitionRequests" /> <!-- Inbound Request Channel, from JMS partitionRequestsQueue, handle by stepExecutionRequestHandler, output to outboundStaging --> <integration:channel id="inboundPartitionRequests" /> <int-jms:message-driven-channel-adapter id="reqQueueAdapter" connection-factory="batch.jmsFactory" destination="partitionRequestsQueue" channel="inboundPartitionRequests"/> <bean id="partitionStepExecutionRequestHandler" class="org.springframework.batch.integration.partition.StepExecutionRequestHandler" p:jobExplorer-ref="jobExplorer" p:stepLocator-ref="stepLocator"/> <integration:service-activator ref="partitionStepExecutionRequestHandler" input-channel="inboundPartitionRequests" output-channel="outboundPartitionStaging"> </integration:service-activator> <!-- Outbound Staging channel, and bind the channel to JMS partitionStagingQueue --> <integration:channel id="outboundPartitionStaging" /> <bean id="outboundStagingMessagingTemplate" class="org.springframework.integration.core.MessagingTemplate"> <property name="defaultChannel" ref="outboundPartitionStaging" /> <property name="receiveTimeout" value="1000" /> </bean> <int-jms:outbound-channel-adapter connection-factory="batch.jmsFactory" destination="partitionStagingQueue" channel="outboundPartitionStaging" /> <!-- Inbound Staging Channel, from JMS partitionStagingQueue, handle by partitionHandler (as aggregator), output to outboundPartitionReplies --> <integration:channel id="inboundPartitionStaging" /> <int-jms:message-driven-channel-adapter id="stagingQueueAdapter" connection-factory="batch.jmsFactory" destination="partitionStagingQueue" channel="inboundPartitionStaging"/> <integration:aggregator ref="partitionHandler" timeout="10000" input-channel="inboundPartitionStaging" output-channel="outboundPartitionReplies" > </integration:aggregator> <!-- Outbound Replies channel, and bind the channel to JMS partitionRepliesQueue --> <integration:channel id="outboundPartitionReplies" /> <bean id="outboundRepliesMessagingTemplate" class="org.springframework.integration.core.MessagingTemplate"> <property name="defaultChannel" ref="outboundPartitionReplies" /> <property name="receiveTimeout" value="1000" /> </bean> <int-jms:outbound-channel-adapter connection-factory="batch.jmsFactory" destination="partitionRepliesQueue" channel="outboundPartitionReplies" /> <!-- Inbound Replies Channel, from JMS partitionRepliesQueue --> <integration:channel id="inboundPartitionReplies"> <integration:queue/> </integration:channel> <int-jms:message-driven-channel-adapter id="repliesQueueAdapter" connection-factory="batch.jmsFactory" destination="partitionRepliesQueue" channel="inboundPartitionReplies"/> </beans>
Is it what supposed to be done?
Have made a unit test to run it, most of the time it is fine and jobLauncher return a COMPLETED jobExecution. However, it failed once a while, with NPE happened in MessageChannelPartitionHandler:
Is there anything wrong in my config causing this?Code:java.lang.NullPointerException at org.springframework.batch.integration.partition.MessageChannelPartitionHandler.handle(MessageChannelPartitionHandler.java:135) at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:104) at ....
=========================
I have several extra questions which I feel a bit hard to find out, I wish someone can give me some guide to find the answer:
1) Can the same set of channels (and hence, JMS queue) be shared across concurrent job executions which needs partitioning? It seems to me that my existing config, when 'receiving' from those queue, may get mixed data from other execution (for which may be from totally different Jobs). Is there some configs I have missed?
2) I am a bit confused on how the partition be distributed remotely. For example, I am having a JobFactory that will create a separate Child App Ctx for each job launch which contains the job and step. Assume in server1, it somehow partitioned the work and put into the JMS queue. There is no job running in other servers, which means there is no channel connecting to the JMS queue, hence, no other server will pick up the partitioned work from JMS to work on. Therefore, does it mean that my approach of putting all the above partition-related artifact in same file be a wrong way?
I hope I am not asking stupid question...![]()


Reply With Quote