Results 1 to 4 of 4

Thread: udp-inbound-adapter missing sequence correlationID headers

  1. #1

    Default udp-inbound-adapter missing sequence correlationID headers

    Hi

    I'm trying to send different files(of unknown size) across a network using UDP (using udp is a requirement) at the moment I use a file:inbound-channel-adapter which polls a directory for files, followed by a file-to-bytes-transformer the output of which I feed into a splitter ( as the files can be varied sizes I break the byte streams into known sizes for aggregation after the transfer) .

    file-inbound-adapter--->file-to-byte-transfomer--->splitter--->channel--->udp-outbound-channel-adapter

    Looking at the output of the splitter through a wire-tap on the channel prior to the udp-outbound-adapter I get the following payload/ header info:

    [Payload=[B@b3e15f7][Headers={timestamp=1341442699646, id=39294f3a-5eac-4071-8fea-70b2ebca67d1, correlationId=6707d7c8-c225-43cf-b0a5-8f1bd259f307, sequenceSize=12, file_originalFile=/path/to/files/Attorney Questions.odt, file_name=Attorney Questions.odt, sequenceNumber=1}]
    08:58:19,648 INFO ework.integration.handler.LoggingHandler: 121 - [Payload=[B@44023756][Headers={timestamp=1341442699648, id=7640bb42-1d09-4b3f-9aab-5f90f0d4dd1d, correlationId=6707d7c8-c225-43cf-b0a5-8f1bd259f307, sequenceSize=12, file_originalFile=/path/to/files/Attorney Questions.odt, file_name=Attorney Questions.odt, sequenceNumber=2}]
    ...
    ...
    Payload=[B@120540c][Headers={timestamp=1341442699654, id=9782fffe-b5cf-497a-a44d-808ff9168d84, correlationId=6707d7c8-c225-43cf-b0a5-8f1bd259f307, sequenceSize=12, file_originalFile=/path/to/files/Attorney Questions.odt, file_name=Attorney Questions.odt, sequenceNumber=12}]

    On my receive end I use a udp-inbound-channel-adapter

    udp-inbound-channel-adapter -- > receiveUdpChannel

    Looking at the input to the receiveUdpChannel with a wire-tap I get the following payload/header info(the number of records matches the number that were sent however the information required for correlation has been stripped from the header somewhere ..):

    Payload=[B@d5e92d7][Headers={timestamp=1341443822722, id=8da5a0cf-53c0-4ee3-9335-9ca97cc878aa, ip_address=127.0.0.1, ip_hostname=localhost}]

    this in turn leads to a
    Exception in thread "UDP-Incoming-Msg-Handler" org.springframework.integration.MessageHandlingExc eption: error occurred in message handler [org.springframework.integration.aggregator.Correla tingMessageHandler#0]
    ...
    Caused by: java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing?
    I've included the context below ... any help as always appreciated... I'm using STS Version: 2.9.2.RELEASE
    Build Id: 201205071000


    Code:
    <!-- Beans used as handlers  -->
    	<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
    	<bean id="writePayloadToFileHandler" class="org.springintegration.cds.WritePayloadToFileMessageHandler" />
    	<bean id="handleSplitter" class="org.springintegration.cds.SplitMessageHandler" />
    		
    		
    	<!-- polls for files in the specified directory using the polling settings below -->
    	<file:inbound-channel-adapter id="filesIn"
    		directory="file:/home/john/input"  />
    
    	<!--  Poller used to poll specifies directory -->
    	<integration:poller id="poller" default="true"
    		fixed-delay="5000" />
    
    	<!-- File object to byte stream transformer required to use the udp outbound channel adapter -->
    	<file:file-to-bytes-transformer
    		input-channel="filesIn" output-channel="bytesOut" />
    	
    	<!-- split byte [] into known sizes for aggragation after network transfer -->
    	<int:splitter input-channel="bytesOut" apply-sequence="true"
    		output-channel="sendUdp" ref="handleSplitter" id="splitterChannel" />
    
    
    	<!-- udp output channel adapter -->
    	<ip:udp-outbound-channel-adapter id="udpOut"
    									 host="127.0.0.1" 
    									 port="19876" 
    									 multicast="false" 
    									 check-length="false" 
    									 so-send-buffer-size="4096"
    									 channel="sendUdp" />
                                        
    	<!-- inbound udp adapter -->
    	<ip:udp-inbound-channel-adapter id="udpIn" port="19876" 
    		channel="receiveUdp" multicast="false" check-length="false"  receive-buffer-size="4096" />
    	 
    	<!-- aggragate messages into one message -->
    	<int:aggregator input-channel="receiveUdp"
    		release-strategy="releaseStrategy"
    		correlation-strategy="correlationStrategy" id="fileMessageAggregator"
    		output-channel="aggragatorOutChannel">
    	</int:aggregator>
    	
    	<!-- correlation strategy to use -->
    	<beans:bean id="correlationStrategy" class="org.springframework.integration.aggregator.HeaderAttributeCorrelationStrategy">
    		<beans:constructor-arg value="CONVERSATION_ID" />
    	</beans:bean>  
    	
    	<!-- release strategy -->
    	<beans:bean id="releaseStrategy" class="org.springframework.integration.aggregator.SequenceSizeReleaseStrategy">
    		<beans:constructor-arg value="false" />
    	</beans:bean>
        
        <!-- output channel of aggregator -->
    	<int:channel id="aggragatorOutChannel">
    			<int:interceptors>
    			<int:wire-tap channel="logger" />
    		</int:interceptors>
    	</int:channel>
    	
    	<!-- Convert payload to File -->	
    	 <integration:service-activator output-channel="filesOut"
                                       ref="writePayloadToFileHandler" input-channel="aggragatorOutChannel"/>                               
         
        <!-- write files to output directory -->                               
        <file:outbound-channel-adapter id="filesOut"
        	directory="file:/home/john/output" />
    Last edited by squidVicious; Jul 4th, 2012 at 06:42 PM. Reason: additional info

  2. #2
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,016

    Default

    Unlike http, jms, etc, UDP/TCP has no standard way to transmit a "message", with headers/payload. Therefore, currently, these adapters transfer the payload only.

    If you wish to transfer correlation data, you need to include it in the payload somehow, then move it back to the headers on the receiving side.

    We do have an open JIRA to transfer an entire message over TCP, but doing the same with UDP garnered little interest; in addition UDP has a very limited packet size.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  3. #3

    Default

    Hi Gary

    thanks for the reply, I was aware that TCP did not include header info however got thrown a bit by by the fact there did seem to be header information coming through with the payload ie [Headers={timestamp=1341443822722, id=8da5a0cf-53c0-4ee3-9335-9ca97cc878aa, ip_address=127.0.0.1, ip_hostname=localhost} .... I'm assuming then that this information is somehow being packaged with the payload in such a way that the wire-tap can split it out of the actual payload. Is there a preferred / recommended way of including the header details in the payload? thanks again

  4. #4
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,016

    Default

    It depends on the payload type; if it's a simple string, you could do something like:

    client:
    Code:
    	<int:transformer output-channel="foo" input-channel="bar"
    		expression="headers.correlationId + ':' + payload" />
    server:
    Code:
    	<int:chain input-channel="foo" output-channel="bar">
    		<int:header-enricher>
    			<int:correlation-id expression="payload.substring(0, payload.indexOf(':'))"/>
    		</int:header-enricher>
    		<int:transformer expression="payload.substring(payload.indexOf(':') + 1)"/>
    	</int:chain>
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

Tags for this Thread

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •