I'm trying to work out how I can do custom message correlation on the client side of a tcp connection. Below is the listings for a simple echo test that has each component connected via tcp connections:
1) Echo Client
2) Interceptor (listening on port 41100)
3) Echo Server (listening on port 41200)
The main sends a "Help me" message to the Echo Client which then sends the message via tcp connection to the Interceptor which forwards the message onto the Echo Server using another tcp connection. The Echo Server returns the same message as its response. The Interceptor receives this message and when it tries to locate the connection to return the response to a connection cannot be found:
Unable to find outbound socket for [Payload=[B@40671416][Headers={timestamp=1326622013300, id=8e3a17a8-2b8f-40c8-a6bb-8f0d4df8eafa, ip_address=127.0.0.1, ip_connection_seq=1, ip_hostname=localhost, ip_tcp_remote_port=41200, ip_connection_id=localhost:41200:f5c2e15e-7c09-4802-965f-293ba7028194}]] []
I've read the manual and understand the requirement to supply my own correlation logic on the client side but I don't see how I can achieve this. Is there a way to link the interceptors server connection factory with the interceptors client connection factory such that a connection on the server side can send and receive to one connection on the client side?
test.xml
Echo Service Gateway (EchoService.java)Code:<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int-ip="http://www.springframework.org/schema/integration/ip" xmlns:int="http://www.springframework.org/schema/integration" xsi:schemaLocation= "http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/ip http://www.springframework.org/schema/integration/ip/spring-integration-ip-2.0.xsd"> <!-- CLIENT --> <int:gateway id="echoClientService" service-interface="EchoService" default-request-channel="echoClientRequestChannel" default-reply-channel="echoClientResponseChannel"/> <int:channel id="echoClientRequestChannel"/> <int:channel id="echoClientResponseChannel"/> <int-ip:tcp-connection-factory id="echoClient" type="client" port="41100" host="localhost" using-nio="true" single-use="false"/> <int-ip:tcp-outbound-channel-adapter id="echoClientOutbound" channel="echoClientRequestChannel" connection-factory="echoClient"/> <int-ip:tcp-inbound-channel-adapter id="echoClientInbound" channel="echoClientResponseChannel" connection-factory="echoClient"/> <!-- INTERCEPTOR --> <int-ip:tcp-connection-factory id="interceptorServer" type="server" port="41100" using-nio="true" single-use="false"/> <int-ip:tcp-inbound-channel-adapter id="interceptorServerInbound" channel="interceptorServerRequestChannel" connection-factory="interceptorServer"/> <int-ip:tcp-outbound-channel-adapter id="interceptorServerOutbound" channel="interceptorServerReplyChannel" connection-factory="interceptorServer"/> <int:channel id="interceptorServerRequestChannel" /> <int:channel id="interceptorServerReplyChannel" /> <int:service-activator id="interceptorRequests" input-channel="interceptorServerRequestChannel" output-channel="interceptorClientRequestChannel"> <bean class="TestInterceptor"/> </int:service-activator> <int:service-activator id="interceptorResponses" input-channel="interceptorClientResponseChannel" output-channel="interceptorServerReplyChannel"> <bean class="TestInterceptor"/> </int:service-activator> <int:channel id="interceptorClientRequestChannel"/> <int:channel id="interceptorClientResponseChannel"/> <int-ip:tcp-connection-factory id="interceptorClient" type="client" port="41200" host="localhost" using-nio="true" single-use="false"/> <int-ip:tcp-outbound-channel-adapter id="interceptorClientOutbound" channel="interceptorClientRequestChannel" connection-factory="interceptorClient"/> <int-ip:tcp-inbound-channel-adapter id="interceptorClientInbound" channel="interceptorClientResponseChannel" connection-factory="interceptorClient"/> <!-- ECHO SERVER --> <int-ip:tcp-connection-factory id="echoServer" type="server" port="41200" using-nio="true" single-use="false"/> <int-ip:tcp-inbound-channel-adapter id="echoServerInbound" channel="echoServerRequestChannel" connection-factory="echoServer"/> <int-ip:tcp-outbound-channel-adapter id="echoServerOutbound" channel="echoServerReplyChannel" connection-factory="echoServer"/> <int:channel id="echoServerRequestChannel" /> <int:channel id="echoServerReplyChannel" /> <int:service-activator input-channel="echoServerRequestChannel" output-channel="echoServerReplyChannel"> <bean class="TestEchoService"/> </int:service-activator> <int:poller id="defaultPoller" max-messages-per-poll="-1" default="true" fixed-rate="1000"/> </beans>
Echo Service Implementation (TestEchoService.java)Code:public interface EchoService { public String echo(String request); }
Interceptor (TestInterceptor.java)Code:import org.springframework.integration.Message; public class TestEchoService { public Message<byte[]> echo(Message<byte[]> request) { byte[] message = request.getPayload(); System.out.println(new String(message)); return request; } }
Test Main (test.java)Code:import org.springframework.integration.Message; public class TestInterceptor { public Message<byte[]> intercept(Message<byte[]> request) { byte[] message = request.getPayload(); System.out.println("Intercepted Message: " + new String(message)); return request; } }
Code:import org.springframework.context.support.ClassPathXmlApplicationContext; public class Test { ClassPathXmlApplicationContext applicationContext; public Test() { applicationContext = new ClassPathXmlApplicationContext(new String[] {"test.xml"}); EchoService echoService = (EchoService)applicationContext.getBean("echoClientService"); String echo = echoService.echo("Help me"); System.out.println(echo); } public static void main(String[] args) { Test application = new Test(); } }


Reply With Quote
