Page 1 of 2 12 LastLast
Results 1 to 10 of 14

Thread: TCP receive message to different port than sent on

  1. #1
    Join Date
    Jan 2013
    Posts
    17

    Question TCP receive message to different port than sent on

    I have an application in which I need to get a person's credit report by sending demographic information to a remote server and receiving the report from that server to a different port. For example, I'd send a message containing the demo info to port 9300 on the remote server and receive the credit report message to port 9305 of my local.

    I tried this configuration :

    Code:
    <!-- CLIENT SIDE -->
    	
    	<bean id="rawSerializer" class="org.springframework.integration.ip.tcp.serializer.ByteArrayRawSerializer" />
    	<bean id="isRequestUtils" class="com.util.ISRequestUtils"/>
    	
    	<int:channel 
    		id="initialRequest"
    		datatype="java.lang.String"/>
    		
    	<int:channel 
    		id="txAdded"
    		datatype="java.lang.String"/>
    		
    	<int:channel 
    		id="finalRequest"
    		datatype="java.lang.String"/>	
    		
    	<int:channel
    		id="creditReport"
    		datatype="java.lang.String"/>
    		
    	<int:gateway
    		id="requestHandler"
    		service-interface="com.util.ISRequestHandler2"
    		default-request-channel="initialRequest"
    		default-reply-channel="creditReport"/>
    		
    	<int:transformer
    		id="addTXcontrol"
    		input-channel="initialRequest"
    		output-channel="txAdded"
    		ref="isRequestUtils"
    		method="addTX"/>
    		
    	<int:transformer
    		id="addPrefix"
    		input-channel="txAdded"
    		output-channel="finalRequest"
    		ref="isRequestUtils"
    		method="addPrefix"/>
    	
     	<int-ip:tcp-outbound-channel-adapter
    		id="message"
    		channel="finalRequest"
    		connection-factory="requestCF"/>
    		
    	<int-ip:tcp-connection-factory
    		id="requestCF"
    		type="client" 
    		host="${infoserver.address}" 
    		port="${infoserver.request.port}"
    		so-keep-alive="true"
    		single-use="false"
    		serializer="rawSerializer"/>
    
    
    	<!-- SERVER SIDE -->
    	
    	<bean id="byteArrayStxEtxSerializer" class="org.springframework.integration.ip.tcp.serializer.ByteArrayStxEtxSerializer">
    		<property name="maxMessageSize" value="102400"/>
    	</bean>
    	<bean id="isResponseUtils" class="com.util.ISResponseUtils"/>
    	
    	<int:channel
    		id="byteResponse"/>
    		
    	<int:channel
    		id="stringResponse"
    		datatype="java.lang.String"/>
    		
    	<int:channel
    		id="finalResponse"
    		datatype="java.lang.String"/>		
    				
    	<int-ip:tcp-connection-factory
    		id="isListener"
    		type="server"
    		host="localhost"
    		port="${infoserver.response.port}"
    		so-keep-alive="true"
    		single-use="false"
    		deserializer="byteArrayStxEtxSerializer"/>
    		
    	<int-ip:tcp-inbound-channel-adapter
    		id="inboundResponse"
    		channel="byteResponse"
    		connection-factory="isListener"/>
    
    	<int:transformer
    		id="bytes2String"
    		input-channel="byteResponse"
    		output-channel="stringResponse"
    		expression="new String(payload)"/>
    				
    	<int:service-activator
    		input-channel="stringResponse"
    		output-channel="creditReport"
    		ref="isResponseUtils"
    		method="showResponse"/>
    My main:

    Code:
    ApplicationContext context = new ClassPathXmlApplicationContext("/spring-config/si-config.xml");
    		
    		final ISRequestHandler2 requestHandler = context.getBean(ISRequestHandler2.class);
    		//final AbstractServerConnectionFactory isListener = context.getBean(AbstractServerConnectionFactory.class);
    		
    		String response = requestHandler.send("DEMOGRAPHIC_INFO");
    		System.out.println(response);

  2. #2
    Join Date
    Jan 2013
    Posts
    17

    Default

    This is the error I get:

    Code:
    [02-07-2013 11:28:09,760] ERROR TcpNetConnection:155 Exception sending meeeage: [Payload=[B@69d02b][Headers={timestamp=1360254485026, id=3933c780-3969-4030-8856-8eaf143cf25a, ip_tcp_remotePort=2381, ip_address=10.127.211.92, ip_hostname=VMCAP2ISDEVMED, ip_connectionId=VMCAP2ISDEVMED:2381:ba447e3a-2a1b-4fe0-a732-c93323be6232}]
    org.springframework.integration.support.channel.ChannelResolutionException: no output-channel or replyChannel header available
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:206)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:165)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:159)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:141)
    	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:115)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:102)
    	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128)
    	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
    	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:216)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:200)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:165)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:159)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:141)
    	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:115)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:102)
    	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128)
    	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
    	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:216)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:200)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:165)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:159)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:141)
    	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:115)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:102)
    	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128)
    	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
    	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
    	at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:92)
    	at org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter.onMessage(TcpReceivingChannelAdapter.java:74)
    	at org.springframework.integration.ip.tcp.connection.TcpNetConnection.run(TcpNetConnection.java:144)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    	at java.lang.Thread.run(Unknown Source)

  3. #3
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,022

    Default

    Right - if you want to use a gateway, with asynchronous TCP (or any) adapters, you need to preserve/restore the replyChannel header that the gateway saves in the outbound message headers. The sending thread is waiting for a response on this channel (which is a temporary channel created by the gateway).

    You can either do this with custom code (invoke a service to capture the header outbound and use a <header-enricher /> to restore it on inbound.

    The tcp-client-server-multiplex sample (https://github.com/SpringSource/spri...rver-multiplex) uses another technique - sends a copy of the outbound message to an aggregator and sends the reply to the aggregator; the headers from each are merged (thus restoring the replyChannel), and then the request message is dropped from the aggregated result.

    Or, instead of using a request/reply gateway, you can do it all yourself - with the sending bean sending the request to a gateway method that returns void and exposing a method on that bean that is invoked using a <service-activator/>.

    Hope that helps.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  4. #4
    Join Date
    Jan 2013
    Posts
    17

    Default

    But if I specify the default-reply-channel of the gateway to be the output-channel of the service-activator, shouldn't the framework do that for me?

  5. #5
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,022

    Default

    No; it doesn't work that way - the gateway needs some way to correlate responses to requests - there might be many threads waiting for a response and the gateway needs to know which thread gets a particular response. The way it's done is to create a TemporaryChannel for each request and store it in the replyChannel header.

    The reply-channel on the gateway is simply a mechanism to allow you to explicitly send there from some component (or if you wish it to be a special channel type - e.g. <publish-subscribe/> so the reply can go to more than one destination), or if you want to, say, <wire-tap/> the reply-channel.

    Internally, the gateway bridges messages arriving on the reply-channel to the replyChannel header.

    Since the replyChannel is a LIVE java object - it can't be exchanged with external systems - the thread is waiting to receive the reply on it.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  6. #6
    Join Date
    Jan 2013
    Posts
    17

    Default

    I think I understand.

    Using my config file, to accomplish what I want, I need to set the replyChannel of messages dropped onto the creditReport channel to the replyChannel of message dropped onto the finalRequest channel and then have message dropped onto the creditReport channel go to a tcp-inbound-channel-adapter connected to the requestCF connection factory?

    Can this be done in just the config file?

  7. #7
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,022

    Default

    You need something in the data to correlate the response to the request - you may be able to do it without any code - the tcp-client-server-multiplex sample I mentioned in post #3 uses an aggregator - but it uses a simple correlation scheme. It depends on how easy it is to extract the correlation data.

    For your case, you might find it easier to do the correlation in your code that handles the reply - set up another method and invoke it as a <service-activator/>
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  8. #8
    Join Date
    Jan 2013
    Posts
    17

    Default

    I did look at the tcp multiplex example. But if the correlation data is in different places in the request and response payloads, I don't know if that will work. Also, the method I use to send the request is synchronized, would there be a need to correlate the response to the request?

    I'm using (or want to use) something like:
    Code:
    public synchronized String getCreditReport(String personID) {
    			String creditReport = null;
    			String demoInfo = lookupDemoInfo(personID);
    			
    			SIContextManager.getInstance();
    			RequestHandler requestHandler = (RequestHandler) SIContextManager.getBean("requestHandler");
    			creditReport = requestHandler.send(demoInfo);
    			
    			return creditReport;
    		}

  9. #9
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,022

    Default

    You could do it that way, but you'd need a transformer/header-enricher on each leg to populate a header with the correlation data.

    need to correlate the response to the request?
    Synchronizing won't help; the framework needs some way to figure out where to send the reply.

    For your application, I would recommend doing something like this...

    Code:
    public class Foo {
    
    	private final Log logger = LogFactory.getLog(this.getClass().getName());
    
    	private final Map<String, BlockingQueue<String>> pendingReplies = new ConcurrentHashMap<String, BlockingQueue<String>>();
    
    	@Autowired
    	private RequestHandler requestHandler;
    
    	public String getCreditReport(String personID) {
    		BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
    		this.pendingReplies.put(personID, queue);
    		this.requestHandler.send(personID);
    		String reply = null;
    		try {
    			reply = queue.poll(10, TimeUnit.SECONDS);
    		}
    		catch (InterruptedException e) {
    			Thread.currentThread().interrupt();
    		}
    		this.pendingReplies.remove(personID);
    		if (reply == null) {
    			throw new RuntimeException("No reply from credit service");
    		}
    		return reply;
    	}
    
    	public void handleReply(String reply) {
    		String personID = lookup(reply);
    		BlockingQueue<String> queue = this.pendingReplies.get(personID);
    		if (queue == null) {
    			logger.error("Reply too late:" + reply);
    		}
    		else {
    			try {
    				queue.put(reply);
    			}
    			catch (InterruptedException e) {
    				Thread.currentThread().interrupt();
    			}
    			this.pendingReplies.remove(personID);
    		}
    	}
    
    	private String lookup(String reply) {
    		// find personID in reply
    		return reply;
    	}
    
    	public interface RequestHandler {
    		void send(String personID);
    	}
    }
    Code:
    	<context:annotation-config />
    
    	<bean id="client" class="foo.Foo" />
    
    	<int:gateway id="requestHandler" service-interface="foo.Foo$RequestHandler"
    		default-request-channel="foo"/>	
    
    	<int:channel id="foo" />
    ...
    
    
    
    <!-- INBOUND -- >
    
    ...
    	<int:service-activator input-channel="bar" 
    		ref="client" method="handleReply"/>
    Last edited by Gary Russell; Feb 11th, 2013 at 04:21 PM.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  10. #10
    Join Date
    Jan 2013
    Posts
    17

    Default

    Thanks for the suggestion. I tried doing something like that, but it wasn't working.

    So I went back to the tcp-multiplex example. I'm able to set the correlation_id in the headers for both the request and response, but am unsure how to tell the aggregator to match by that.

    I have this config:
    Code:
    <!-- CLIENT SIDE -->
    ...
    <int:header-enricher
    		id="addRequestCorrID"
    		input-channel="prefixAdded"
    		output-channel="finalRequest">
    		<int:correlation-id ref="isRequestUtils" method="getCorrelationID"/>
    	</int:header-enricher>
    	
    	<int-ip:tcp-outbound-channel-adapter
     		order="2"
    		id="message"
    		channel="finalRequest"
    		connection-factory="requestCF"/>
    	
    	<int-ip:tcp-connection-factory
    		id="requestCF"
    		type="client" 
    		host="${infoserver.address}" 
    		port="${infoserver.request.port}"
    		so-keep-alive="true"
    		single-use="false"
    		serializer="rawSerializer"/>
    		
    	<int:bridge
    		order="1"
    		input-channel="finalRequest"
    		output-channel="toAggregator"/>
    	
     	<int-ip:tcp-inbound-channel-adapter
     		id="fromServerSide"
     		channel="toAggregator"
     		connection-factory="requestCF"/>
    
    	<int:channel
    		id="toAggregator"
    		datatype="java.lang.String"/>
    		
    	<int:aggregator
    		input-channel="toAggregator"
    		correlation-strategy-expression="header.correlationid"
    		release-strategy-expression="size() == 2"/>
    
    <!-- SERVER SIDE -->
    ...
    <int:transformer
    		id="bytes2String"
    		input-channel="byteResponse"
    		output-channel="stringResponse"
    		expression="new String(payload)"/>
    
    	<int:header-enricher
    		id="addResponseCorrID"
    		input-channel="stringResponse"
    		output-channel="creditReport">
    		<int:correlation-id ref="isResponseUtils" method="getCorrelationID"/>
    	</int:header-enricher>
    	
    	<int:bridge
    		order="1"
    		input-channel="creditReport"
    		output-channel="toAggregator"/>
    	
    	<int:service-activator
    		order="2"
    		input-channel="creditReport"
    		ref="isResponseUtils"
    		method="persistResponse"/>
    		
    	<int:channel
    		id="creditReport"
    		datatype="java.lang.String"/>

Tags for this Thread

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •