Page 2 of 2 FirstFirst 12
Results 11 to 15 of 15

Thread: tcp-gateway, how to end tcp-communication

  1. #11
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,020

    Default

    @posta07

    Thanks for doing the research; my intention was that I only wanted to set SOLinger *if* a specific value had been set in the config (even 0). I intended to initialize the variable to -1 so the initialization code wouldn't set the option by default.

    Can you create a JIRA ticket here https://jira.springsource.org ??

    You'll have to create an account if you don't have one. I can enter the ticket but if you do it, you can track the progress.

    I will do my utmost to get the fix in to 2.0.2 and, if you're comfortable with nightlies, you could get the fix even faster. In the mean time, I assume the workaround is ok for now?

    Again, many thanks for grounding this out.

    Gary

  2. #12
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,020

    Default

    FYI, the root cause was indeed fixed by setting the soLinger field in AbstractTcpConnectioFactory to -1. Of course, it still took the timeout before the FIN was sent so that, alone won't help you, but at least we're sending a FIN instead of a RST.

    However, when I unconditionally close the socket immediately after the send, a whole slew of the existing test cases fail - I need to get to the bottom of that and figure out if those use cases are legitimate. I may have to make your desired behavior an option (and suppress the error on the read, of course).

    BTW, because of the way NIO is processed, we get no error when the socket is "prematurely" closed because there's no thread hanging on a read. You may want to consider using NIO to suppress the error while you use the interceptor workaround.

    Here is my test case for the FIN processing; they work when we don't set SOLinger...

    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xmlns:int="http://www.springframework.org/schema/integration"
    	xmlns:int-ip="http://www.springframework.org/schema/integration/ip"
    	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    		http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
    		http://www.springframework.org/schema/integration/ip http://www.springframework.org/schema/integration/ip/spring-integration-ip-2.0.xsd">
    
    	<bean id="tcpIpUtils" class="org.springframework.integration.ip.util.SocketTestUtils" />
    	
    	<int-ip:tcp-connection-factory id="inCFNet"
    		type="server"
    		port="#{tcpIpUtils.findAvailableServerSocket(9000)}"
    		so-timeout="1000"
    		single-use="true"
    		/>
    	
    	<int-ip:tcp-inbound-gateway request-channel="echo"
    		connection-factory="inCFNet" />
    	
    	<int-ip:tcp-connection-factory id="inCFNio"
    		type="server"
    		port="#{tcpIpUtils.findAvailableServerSocket(9100)}"
    		so-timeout="1000"
    		single-use="true"
    		using-nio="true"
    		/>
    	
    	<int-ip:tcp-inbound-gateway request-channel="echo"
    		connection-factory="inCFNio" />
    	
    	<int:service-activator input-channel="echo" ref="testService"/>
    	
    	<bean id="testService" class="org.springframework.integration.ip.tcp.TestService"/>
    
    </beans>
    Code:
    package org.springframework.integration.ip.tcp.connection;
    
    import static org.junit.Assert.assertEquals;
    import static org.junit.Assert.fail;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.net.Socket;
    
    import javax.net.SocketFactory;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
    
    /**
     * @author Gary Russell
     * @since 2.0.2
     *
     */
    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration
    public class SOLingerTests {
    
    	@Autowired
    	private AbstractServerConnectionFactory inCFNet;
    	
    	@Autowired
    	private AbstractServerConnectionFactory inCFNio;
    	
    	@Test
    	public void configOk() {}
    
    	@Test
    	public void finReceivedNet() {
    		finReceived(inCFNet);
    	}
    
    	@Test
    	public void finReceivedNio() {
    		finReceived(inCFNio);
    	}
    	
    	private void finReceived(AbstractServerConnectionFactory inCF) {
    		int port = inCF.getPort();
    		int n = 0;
    		while (!inCF.isListening()) {
    			try {
    				Thread.sleep(100);
    			} catch (InterruptedException e) {
    				Thread.currentThread().interrupt();
    				fail("Interrupted");
    			}
    			if (n++ > 100) {
    				fail("Failed to start");
    			}
    		}
    		try {
    			Socket socket = SocketFactory.getDefault().createSocket("localhost", port);
    			String test = "Test\r\n";
    			socket.getOutputStream().write(test.getBytes());
    			byte[] buff = new byte[test.length() + 5];
    			readFully(socket.getInputStream(), buff);
    			assertEquals("echo:" + test, new String(buff));
    			n = socket.getInputStream().read();
    			// we expect an orderly close
    			assertEquals(-1, n);
    		} catch (Exception e) {
    			e.printStackTrace();
    			fail("Unexpected Exception  " + e.getMessage());
    		}
    	
    	}
    	
    	private void readFully(InputStream is, byte[] buff) throws IOException {
    		for (int i = 0; i < buff.length; i++) {
    			buff[i] = (byte) is.read();
    		}
    	}
    	
    }

  3. #13
    Join Date
    Aug 2008
    Location
    Phoenix, AZ
    Posts
    76

    Default

    Yep, using NIO does not produce the error messages. This is more desirable, so I will go with that for now.

    Thanks again for your help and suggestions.

    BTW, this is off topic as it's not directly related to my problem, but I had a question about the interceptors.

    I was looking at the code, and I'm not clear about how multiple interceptors would get run.

    From AbstractConnectionFactory, in the wrapConnection method it looks like you loop through the interceptors, assign the interceptor as a listener of a connection, then replace the connection object with the interceptor. From there the loop will continue with the regsiterListener now being called on the the previous wrapper (which sets the original connection's listener and so forth). What I noticed is that a connection can have only one listener. So if the wrapper keeps overwriting the connection's listener, won't there be only one interceptor associated with the connection? I'm probably missing something.

    Thanks!
    Enterprise Software Consultant
    http://www.christianposta.com/blog

  4. #14
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,020

    Default

    No; the interceptor only becomes the 'listener' in wrapConnection if there is no real listener for this connection (it is a send-only connection).

    For connections that can receive messages, we register a listener with the connection - this is usually a receiving channel adapter, or a gateway.

    We can also register a sender with a connection, this is the object that sends messages (either an outbound adapter or a gateway).

    Now, TcpConnectionInterceptor extends TcpConnection, TcpListener, and TcpSender.

    In the wrapConnection method, for each interceptor in the chain, the interceptor (variable 'wrapper') is given a reference to the connection.

    In the case of a gateway, there is always a listener and sender. So, for each interceptor in the chain, the interceptor is given a reference to the connection and this interceptor becomes the new "connection"; if there are two interceptors, the first gets a reference to the the real connection, the second gets a reference to the first interceptor. We finally return the second interceptor as "the" connection.

    Finally, in initializeConnection(), we register the listener (gateway) with "the" connection (which is our second interceptor in this case).

    registerListener() in AbstractTcpConnectionInterceptor looks like this...

    Code:
    	public void registerListener(TcpListener listener) {
    		this.theConnection.registerListener(this);
    		this.tcpListener = listener;
    	}
    This effectively chains the interceptors - interceptor 2's listener is the gateway, interceptor 1's listener is interceptor 2, the real connection's listener is interceptor 1. Now, when a new message is assembled, the message is passed up through the interceptor chain.

    If you want to see it in action, check out the source from git and take a look at InterceptedSharedConnectionTests where I nested two hello world interceptors. These interceptors do some handshaking during the initial send.

    Code:
    	<bean id="helloWorldInterceptors" class="org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactoryChain">
    		<property name="interceptors">
    			<array>
    				<bean class="org.springframework.integration.ip.tcp.connection.HelloWorldInterceptorFactory"/>
    				<bean class="org.springframework.integration.ip.tcp.connection.HelloWorldInterceptorFactory">
    					<constructor-arg value="Hi"/>
    					<constructor-arg value="planet!"/>
    				</bean>
    			</array>
    		</property>
    	</bean>
    On the client side, when we send a message, the second (outer) interceptor sees negotiation is needed so holds on to the original messge and sends 'Hi' which is intercepted by the inner interceptor who also needs negotiation, so he holds on to the 'Hi' sends 'Hello' and . On the server side, the inner interceptor sees the Hello and sends 'world!' which is received by the inner client interceptor and because negotiation is complete, sends on the 'Hi'. On the server side, the inner interceptor is fully negotiated so he passes the Hi up the chain; the outer interceptor completes his negotiation and, eventually, the original payload is sent and from that point on, both interceptors are pass-throughs.

    Code:
    2010-12-22 11:29:06,564 DEBUG [main] DirectChannel: preSend on channel 'input', message: [Payload=Test][Headers={timestamp=1293035346564, id=bd9cb8e6-39b6-4486-a1ae-132dba8840f2}]
    ...
    2010-12-22 11:29:06,576 DEBUG [main] HelloWorldInterceptor: Sending Hi
    2010-12-22 11:29:06,576 DEBUG [main] HelloWorldInterceptor: Sending Hello
    ...
    2010-12-22 11:29:06,585 DEBUG [pool-1-thread-3] HelloWorldInterceptor: sending world!
    ...
    2010-12-22 11:29:06,587 DEBUG [pool-2-thread-3] HelloWorldInterceptor: received world!
    ...
    2010-12-22 11:29:06,619 DEBUG [pool-1-thread-2] HelloWorldInterceptor: sending planet!
    ...
    2010-12-22 11:29:06,661 DEBUG [pool-2-thread-5] HelloWorldInterceptor: received planet!
    ...
    2010-12-22 11:29:06,662 DEBUG [main] DirectChannel: postSend (sent=true) on channel 'input', message: [Payload=Test][Headers={timestamp=1293035346564, id=bd9cb8e6-39b6-4486-a1ae-132dba8840f2}]
    
    ...
    
    2010-12-22 11:29:06,702 DEBUG [main] QueueChannel: postReceive on channel 'replies', message: [Payload=Test][Headers={timestamp=1293035346702, id=cad2f313-16ba-48ee-9173-beef6dce86c5, ip_address=127.0.0.1, ip_connection_seq=3, ip_hostname=localhost, ip_tcp_remote_port=10100, ip_connection_id=localhost:10100:1022146175}]
    BTW, the interceptor chain was conceived for doing handshaking/negotiation such as this, but we did foresee that it might be used for other things, so I am pleased we were able to use it as a work-around for your problem.

  5. #15
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,020

    Default

    Hi Christian,

    I managed to get the fix into tonight's nightly build.

    It would be great if you have an opportunity to test it in your environment before we release 2.0.2.

    If you have time, you can change your pom to pull down Spring Integration 2.0.2.BUILD-SNAPSHOT.


    Thanks again for finding this and your help with tracking it down.

    Gary

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •