I have the following setup using Spring Integration 2.0.0.RELEASE: service activator (indexerWorker) -> gateway (indexService) -> splitter (jobToItemsSplitter) -> transformer (itemToDocumentTransformer) -> aggregator (documentsForItemAggregator) -> splitter (documentToSolrCoreSplitter) -> router (coreRouter) -> service activator (solrWriter) -> aggregator (indexedDocumentsToIndexedItemResultAggregator) -> (response channel back to first service gateway).
I'm noticing that the gateway is creating a message id, which the jobToItemsSplitter is using as the correlationId in the message headers it sends out. The interesting thing is that the documentForItemAggregator leaves the correlationId in the header of the message it sends once it has gathered together the documents for an item. The problem arises the moment the documentToSolrCoreSplitter creates document bundles from the set sent from documentForItemAggregator. Since the correlationId has been left by the aggregator, the code around the documentToSolrCoreSplitter, creates a sequence details using the correlationId and a null for the sequence number and the sequence size. This will finally cause the code around indexerWorker (specifially MessageBuilder.popSequenceDetails) to error with a NullPointerException because the sequence number and total numbers are null.
Here's an example of the logs:
Now the (indexedDocumentsToIndexedItemResultAggregator) will use the correlation id sequenceSize and sequenceNumber fields out of the header, leaving the sequenceDetails in place. And here's the stack trace when the final response is sent back to the gateway and on to the service activator.Code:Message from Gateway (indexService): 2010/12/15 14:34:17,496 [jobItemExecutor-1] INFO org.springframework.integration.handler.LoggingHandler - [Payload=111][Headers={timestamp=1292445257495, id=804b86fa-c8c5-4b0e-b5a9-6df67500f025, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@4e9b7d, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@4e9b7d}] Message from Splitter (jobToItemsSplitter): 2010/12/15 14:34:17,780 [jobItemExecutor-1] INFO org.springframework.integration.handler.LoggingHandler - [Payload=com.peapod.indexer.worker.IndexedProduct@96ac47][Headers={timestamp=1292445257780, id=a51c6f75-1237-47f1-9a7c-c52cae1486e2, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@4e9b7d, correlationId=804b86fa-c8c5-4b0e-b5a9-6df67500f025, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@4e9b7d, sequenceSize=900, sequenceNumber=1}] Message from Aggregator (documentForItemAggregator): 2010/12/15 15:04:12,148 [docGeneratorExecutor-59] INFO org.springframework.integration.handler.LoggingHandler - [Payload=com.peapod.indexer.worker.IndexedProducts@17533df][Headers={timestamp=1292445567519, id=5bdc5a42-d251-4559-a5f2-fcfa30d9b86d, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@4e9b7d, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@4e9b7d, correlationId=804b86fa-c8c5-4b0e-b5a9-6df67500f025}] Message from Splitter (documentToSolrCoreSplitter): 2010/12/15 15:04:14,038 [docGeneratorExecutor-59] INFO org.springframework.integration.handler.LoggingHandler - [Payload=com.peapod.indexer.worker.IndexedProductBunch@86558][Headers={timestamp=1292447054038, id=9e99e91a-f75c-4972-9aa9-896a8613a1a5, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@4e9b7d, correlationId=5bdc5a42-d251-4559-a5f2-fcfa30d9b86d, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@4e9b7d, sequenceSize=1, sequenceDetails=[[804b86fa-c8c5-4b0e-b5a9-6df67500f025, null, null]], sequenceNumber=1}]
So my questions are:Code:Exception in thread "main" org.springframework.integration.MessageHandlingException: at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:76) at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:64) .... Caused by: java.lang.NullPointerException at org.springframework.integration.support.MessageBuilder.popSequenceDetails(MessageBuilder.java:210) at org.springframework.integration.aggregator.AbstractAggregatingMessageGroupProcessor.processMessageGroup(AbstractAggregatingMessageGroupProcessor.java:53) at org.springframework.integration.aggregator.CorrelatingMessageHandler.completeGroup(CorrelatingMessageHandler.java:306) at org.springframework.integration.aggregator.CorrelatingMessageHandler.handleMessageInternal(CorrelatingMessageHandler.java:179) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:110) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:44) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128) at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288) at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:176) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:160) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:125) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:119) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:101) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:110) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:44) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128) at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288) at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149) at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:220) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:110) at org.springframework.integration.dispatcher.UnicastingDispatcher.access$000(UnicastingDispatcher.java:51) at org.springframework.integration.dispatcher.UnicastingDispatcher$1.run(UnicastingDispatcher.java:92) ... 4 more
1. Should the code around the documentsForItemAggregator set the id of the message it sends to the correlationId used in the messages it received?
2. If the answer to question one is no, should the code around the documentToSolrCoreSplitter be checking to see if the sequenceNumber and sequenceSize are null prior to setting them in the sequenceDetails, and should it set the values to 1?
3. Could the code around the documentToSolrCoreSplitter simply check for the absence of a sequenceNumber and sequenceSize in the header of the message it receives and then simply use the correlationId (in the header of the received message) as the correlationId in the set of messages it sends out?
My apologies if the phrase "the code around" is too vague, it just means the Spring Integration framework code. I would like to know the best approach to fixing this because I'll need to add it myself until a proper fix can be released. (That's assuming that my use of the integration framework is not in error.)


Reply With Quote
