Hi,
I have a client application which creates messages to be aggregated using spring-integration (Version 2.1.0.RELEASE). A messaging gateway adds in the required header fields (code as seen below) and sends the manually SPLIT messages to a queue channel for aggregation. While unit testing, I noticed, that the first set of split messages get aggregated correctly and the Aggregator creates a single output message and puts it onto the output-channel. If I, however send the same set of three manually SPLIT messages again for aggregation (with changed expiration date / payload), the messages get stuck in the channel for aggregation. Inspecting the message headers on a wiretap and in more detail via the logs, suggests that the newly sent messages use the same ID as the earlier three messages which were sent for aggregation. The only time the whole setup starts working again is if I change the correlationId of the new messages.
Question is:
Once the aggregator aggregates the messages and creates a single message and puts it on the output-channel (all channels are QUEUE channels), shouldn't these split messages be flushed out from the aggregation input-channel and cleared for the next sets of messages ?
Here are snippets of code to demo my issue
Code:<gateway id = "processorGateway" service-interface = "com.xyz.gateway.ProcessorGateway" > <method name = "processPacket" request-channel = "aggregatorChannel" /> </gateway>Here is the Gateway interface definitionCode:<aggregator id = "msgAggregator" input-channel = "aggregatorChannel" output-channel = "decryptionChannel" discard-channel = "aggregatorDiscardChannel" send-partial-result-on-expiry = "false" send-timeout = "600000" ref = "messageAggregatorBean" method = "aggregate"> </aggregator> <beans:bean id = "messageAggregatorBean" class = "com.xyz.aggregator.PacketAggregator" />
Code:public interface ProcessorGateway { // requestChannel configured in XML file. Here only to show equivalent annotation public void processPacket(Object payload, @Header("correlationId") Object correlationId, @Header("sequenceNumber") Integer sequenceNumber, @Header("sequenceSize") Integer sequenceSize, @Header("expirationDate") Long expiryTime); }
Test code to dump manually split packets onto the aggregator channel
The second round of packet sends never shows up on the output-channel after aggregation. They just hang out on the aggregatorChannel waiting for something. I would have thought that once the Aggregator has aggregated messages, the split messages are flushed out from the input-channel. As I mentioned before, the IDs of the messages created remain identical, even if the payload or some of the message header values (like expiration date are changed.). The only time this second round works again is if I change the correlationId of the message headers.Code:public void testAggregationOfPackets() { // Created the messages and simulate a manual split. Packet 1 p1 = new Packet(); p1.setCorrelationId("98220363711"); p1.setSequenceNumber(1); p1.setSequenceSize(3); long currentTimeMillis = System.currentTimeMillis(); p1.setExpirationDate(currentTimeMillis + 60000); p1.setPayload("Part 1 of message"); // Packet 2 p2 = new Packet(); p2.setCorrelationId("98220363711"); p2.setSequenceNumber(2); p2.setSequenceSize(3); currentTimeMillis = System.currentTimeMillis(); p2.setExpirationDate(currentTimeMillis + 600000); p2.setPayload("Part 2 of message"); // Packet 3 p3 = new Packet(); p3.setCorrelationId("98220363711"); p3.setSequenceNumber(3); p3.setSequenceSize(3); currentTimeMillis = System.currentTimeMillis(); p3.setExpirationDate(currentTimeMillis + 600000); p3.setPayload("Part 3 of message"); // Order of insertion P3, P1, P2 // The messages should show up on the "aggregatorChannel" and output a single message on // on the "decryptionChannel" processorGateway.processPacket(p3.getPayload(),p3.getCorrelationId(), p3.getSequenceNumber(), p3.getSequenceSize(), p3.getExpirationDate()); processorGateway.processPacket(p1.getPayload(),p1.getCorrelationId(), p1.getSequenceNumber(), p1.getSequenceSize(), p1.getExpirationDate()); processorGateway.processPacket(p2.getPayload(),p2.getCorrelationId(), p2.getSequenceNumber(), p2.getSequenceSize(), p2.getExpirationDate()); Message<?> packet = aggregatorChannelTap.receive(1000); logger.debug("**** First packet on queue has ID " + packet.getHeaders().getId()); packet = aggregatorChannelTap.receive(1000); logger.debug("**** Second packet on queue has ID " + packet.getHeaders().getId()); packet = aggregatorChannelTap.receive(1000); logger.debug("**** Third packet on queue has ID " + packet.getHeaders().getId()); // Receive the message on the decryptionChannel. THIS WORKS GREAT packet = decryptionChannelTap.receive(5000); assertNotNull("Expecting a packet on vendor processing channel", packet); logger.info("Aggregated packet on queue has ID " + packet.getHeaders().getId()); // Send same messages this time in a different order (P2, P3, P1) processorGateway.processPacket(p2.getPayload(),p2.getCorrelationId(), p2.getSequenceNumber(), p2.getSequenceSize(), p2.getExpirationDate()); processorGateway.processPacket(p3.getPayload(),p3.getCorrelationId(), p3.getSequenceNumber(), p3.getSequenceSize(), p3.getExpirationDate()); processorGateway.processPacket(p1.getPayload(),p1.getCorrelationId(), p1.getSequenceNumber(), p1.getSequenceSize(), p1.getExpirationDate()); Message<?> packet = aggregatorChannelTap.receive(1000); logger.debug("**** Second round - First packet on queue has ID " + packet.getHeaders().getId()); packet = aggregatorChannelTap.receive(1000); logger.debug("**** Second round - Second packet on queue has ID " + packet.getHeaders().getId()); packet = aggregatorChannelTap.receive(1000); logger.debug("**** Second round - Third packet on queue has ID " + packet.getHeaders().getId()); // Receive the message on the decryptionChannel. MESSAGES NEVER SHOW UP HERE UNLESS CORRELATION ID IS CHANGED. packet = decryptionChannelTap.receive(5000); }
Please help me understand the mechanics of how the queues get flushed and how it treats already aggregated messages.


Reply With Quote