Hi, Is there any way to send other message ( different the original ) before tcpnetconnection run closeconnection?? I'm connecting to a linux service and I need a message after receiving response and befor close connection.
Hi, Is there any way to send other message ( different the original ) before tcpnetconnection run closeconnection?? I'm connecting to a linux service and I need a message after receiving response and befor close connection.
See http://static.springsource.org/sprin...p-interceptors
You can add an interceptor and intercept the onMessage() method and send the data either before or after calling super.onMessage() (which will send the reply to the sender).
When the intercepted method returns, the connection will be closed.
Gary P. Russell
Spring Integration Team
SpringSource, a division of VMware
hi Gary, I implemented a interceptor factory class and also implemented a interceptor with onMessage method ( based on AbstractTcpConnectionInterceptor) :
public boolean onMessage(Message<?> message) {
if (this.tcpListener == null) {
throw new NoListenerException("No listener registered for message reception");
}
boolean b = this.tcpListener.onMessage(message);
try {
this.theConnection.send( MessageBuilder.withPayload("XXXXXX").build());
} catch (Exception e) {
log.debug(e);
// TODO Auto-generated catch block
throw new NoListenerException("ERROR");
}
return b;
}
XXXX is the message. The problem is sometimes appear in the log:
16:33:06,072|ERROR||org.springframework.integratio n.ip.tcp.TcpOutboundGateway|Cannot correlate response - no pending reply
or
16:33:06,072|DEBUG||com.booleancore.ws.ConexionInt erceptor|java.net.SocketException: Broken pipe
I used a tcp-connection-factory type client and tcp-outbound-gateway,
like I said , I need to send a message with the gateway ( because i need the response ) but I need to send a message ( without response ) before close connection.
Please help me, thks!
I am not sure what's going on for you, but I just wrote this test case and all works as expected for me...
Here's what I see in the log...Code:@Test public void testInterceptSendAck() throws Exception { final int port = SocketUtils.findAvailableServerSocket(); AbstractConnectionFactory ccf = new TcpNetClientConnectionFactory("localhost", port); final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean done = new AtomicBoolean(); final AtomicReference<Object> ack = new AtomicReference<Object>(); final CountDownLatch ackLatch = new CountDownLatch(1); Executors.newSingleThreadExecutor().execute(new Runnable() { public void run() { try { ServerSocket server = ServerSocketFactory.getDefault().createServerSocket(port, 100); latch.countDown(); while (true) { Socket socket = server.accept(); ObjectInputStream ois = new ObjectInputStream(socket.getInputStream()); ois.readObject(); ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream()); oos.writeObject("bar"); ois = new ObjectInputStream(socket.getInputStream()); ack.set(ois.readObject()); ackLatch.countDown(); } } catch (Exception e) { if (!done.get()) { e.printStackTrace(); } } } }); ccf.setSerializer(new DefaultSerializer()); ccf.setDeserializer(new DefaultDeserializer()); ccf.setSoTimeout(10000); ccf.setSingleUse(true); TcpConnectionInterceptorFactoryChain interceptorFactoryChain = new TcpConnectionInterceptorFactoryChain(); interceptorFactoryChain.setInterceptors(new TcpConnectionInterceptorFactory[] {new TcpConnectionInterceptorFactory() { public TcpConnectionInterceptor getInterceptor() { return new AbstractTcpConnectionInterceptor() { @Override public boolean onMessage(Message<?> message) { boolean b = super.onMessage(message); try { this.send(MessageBuilder.withPayload("baz").build()); } catch (Exception e) { e.printStackTrace(); } return b; } }; } }}); ccf.setInterceptorFactoryChain(interceptorFactoryChain ); ccf.afterPropertiesSet(); ccf.start(); assertTrue(latch.await(10000, TimeUnit.MILLISECONDS)); TcpOutboundGateway gateway = new TcpOutboundGateway(); gateway.setConnectionFactory(ccf); QueueChannel replyChannel = new QueueChannel(); gateway.setRequiresReply(true); gateway.setOutputChannel(replyChannel); gateway.afterPropertiesSet(); gateway.handleMessage(MessageBuilder.withPayload("foo").build()); assertTrue(ackLatch.await(10000, TimeUnit.MILLISECONDS)); Message<?> reply = replyChannel.receive(10000); assertNotNull(reply); assertEquals("bar", reply.getPayload()); assertEquals("baz", ack.get()); }
I suggest you compare your log with this.Code:2012-12-17 07:37:30,542 [main] DEBUG: org.springframework.integration.ip.tcp.connection.TcpNetConnection - Message sent [Payload=foo][Headers={timestamp=1355747850530, id=cd45d182-0858-4901-ab40-2a3063adee04}] 2012-12-17 07:37:30,546 [pool-2-thread-1] DEBUG: org.springframework.integration.ip.tcp.connection.TcpNetConnection - Message received [Payload=bar][Headers={timestamp=1355747850546, id=a08c45da-3817-4876-9975-6ffeb2f5cbf6, ip_tcp_remotePort=59336, ip_address=127.0.0.1, ip_hostname=localhost, ip_connectionId=localhost:59336:558c1d07-67f9-498c-9329-c3c1b3ec6135}] 2012-12-17 07:37:30,547 [main] DEBUG: org.springframework.integration.ip.tcp.TcpOutboundGateway - Respose [Payload=bar][Headers={timestamp=1355747850546, id=a08c45da-3817-4876-9975-6ffeb2f5cbf6, ip_tcp_remotePort=59336, ip_address=127.0.0.1, ip_hostname=localhost, ip_connectionId=localhost:59336:558c1d07-67f9-498c-9329-c3c1b3ec6135}] 2012-12-17 07:37:30,547 [pool-2-thread-1] DEBUG: org.springframework.integration.ip.tcp.connection.TcpNetConnection - Message sent [Payload=baz][Headers={timestamp=1355747850546, id=bb2047f3-13d3-414f-a2e8-25b974ee773c}] 2012-12-17 07:37:30,547 [pool-2-thread-1] DEBUG: org.springframework.integration.ip.tcp.connection.TcpNetConnection - Closing single use socket after inbound message localhost:59336:558c1d07-67f9-498c-9329-c3c1b3ec6135
I would also suggest you subclass AbstractTcpConnectionInterceptor rather than "basing" your interceptor on it.
Gary P. Russell
Spring Integration Team
SpringSource, a division of VMware
Thks Gary, your example is perfect! but the weird in my testing and production, this run good 1 time and the next time not, then run good again and then not, and go on... i dont know the reason. The time is bad in the log shows:
|ERROR||org.springframework.integration.ip.tcp.Tcp OutboundGateway|Cannot correlate response - no pending reply
when is good, dont show that. The linux service only return values under request, but the other message before close connection doesnt response ( and it's correct ) , but i dont understand why run ok one time and the next not.
That's weird; it implies a reply is being received before the request was sent out.
Can you post a DEBUG log showing a good and bad event?
Gary P. Russell
Spring Integration Team
SpringSource, a division of VMware
the correct is:
and the bad is:Code:16:29:49,365|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory|Opening new socket connection to 192.168.1.2:1025 16:29:49,374|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetConnection|Reading... 16:29:49,375|DEBUG||com.mylib.ws.ByteToStringSerializer|Bytes disponibles para lectura:0 16:29:49,375|DEBUG||org.springframework.integration.ip.tcp.TcpOutboundGateway|Added 192.168.1.2:1025:5578e9c1-adb1-4651-9542-c81342439656 16:29:49,376|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetConnection|Message sent [Payload=XXXXX][Headers={timestamp=1355779789363, id=2703f41e-8542-4ebf-87f3-15b00d079485, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@2d0d719, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@2d0d719}] 16:29:49,919|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetConnection|Message received [Payload=[B@1bcbb330][Headers={timestamp=1355779789919, id=6622103b-53d4-4d73-9428-ebcf11ad4fb0, ip_tcp_remotePort=1025, ip_address=192.168.1.2, ip_hostname=192.168.1.2, ip_connectionId=192.168.1.2:1025:5578e9c1-adb1-4651-9542-c81342439656}] 16:29:49,919|DEBUG||com.mylib.ConexionInterceptor|INIT 16:29:49,919|DEBUG||com.mylib.ConexionInterceptor|BEFORE 16:29:49,920|DEBUG||com.mylib.ConexionInterceptor|ENTER 16:29:49,920|DEBUG||org.springframework.integration.ip.tcp.TcpOutboundGateway|Respose [Payload=[B@1bcbb330][Headers={timestamp=1355779789919, id=6622103b-53d4-4d73-9428-ebcf11ad4fb0, ip_tcp_remotePort=1025, ip_address=192.168.1.2, ip_hostname=192.168.1.2, ip_connectionId=192.168.1.2:1025:5578e9c1-adb1-4651-9542-c81342439656}] 16:29:49,921|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetConnection|Message sent [Payload=YYYY][Headers={timestamp=1355779789920, id=24ac770b-c398-46ad-9280-e2cee805ad24}] 16:29:49,921|DEBUG||org.springframework.integration.ip.tcp.TcpOutboundGateway|handler 'org.springframework.integration.ip.tcp.TcpOutboundGateway#0' sending reply Message: [Payload=[B@1bcbb330][Headers={timestamp=1355779789921, id=40fa7230-ce9d-4da2-9752-a8194f9c1ec6, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@2d0d719, ip_address=192.168.1.2, ip_tcp_remotePort=1025, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@2d0d719, ip_hostname=192.168.1.2, ip_connectionId=192.168.1.2:1025:5578e9c1-adb1-4651-9542-c81342439656}] 16:29:49,921|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetConnection|Closing single use socket after inbound message 192.168.1.2:1025:5578e9c1-adb1-4651-9542-c81342439656 16:29:49,921|DEBUG||org.springframework.beans.factory.support.DefaultListableBeanFactory|Returning cached instance of singleton bean 'integrationConversionService' 16:29:49,922|DEBUG||org.springframework.integration.channel.QueueChannel|preSend on channel 'reply', message: [Payload=RXXXXXX][Headers={timestamp=1355779789922, id=eb6d2237-59b8-460b-9bca-d363dc5d8846, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@2d0d719, ip_tcp_remotePort=1025, ip_address=192.168.1.2, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@2d0d719, ip_hostname=192.168.1.2, ip_connectionId=10.10.183.114:1025:5578e9c1-adb1-4651-9542-c81342439656}]
the interceptor class is the copy from AbstractTcpConnectionInterceptorCode:16:29:52,904|DEBUG||org.springframework.integration.channel.DirectChannel|preSend on channel 'input', message: [Payload=XXXXX][Headers={timestamp=1355779792904, id=e90e1f6d-910b-462d-b566-e5f72b15bc7f, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@3f731338, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@3f731338}] 16:29:52,904|DEBUG||org.springframework.integration.ip.tcp.TcpSendingMessageHandler|org.springframework.integration.ip.tcp.TcpSendingMessageHandler#0 received message: [Payload=XXXXX][Headers={timestamp=1355779792904, id=e90e1f6d-910b-462d-b566-e5f72b15bc7f, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@3f731338, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@3f731338}] 16:29:52,904|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory|Opening new socket connection to 192.168.1.2:1025 16:29:52,905|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetConnection|Reading... 16:29:52,905|DEBUG||org.springframework.integration.ip.tcp.TcpSendingMessageHandler|Got Connection 192.168.1.2:1025:b2a886c1-568f-48dd-b70d-209191d34bff 16:29:52,905|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetConnection|Message sent [Payload=XXXXX][Headers={timestamp=1355779792904, id=e90e1f6d-910b-462d-b566-e5f72b15bc7f, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@3f731338, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@3f731338}] 16:29:52,906|DEBUG||org.springframework.integration.channel.DirectChannel|postSend (sent=true) on channel 'input', message: [Payload=XXXXX][Headers={timestamp=1355779792904, id=e90e1f6d-910b-462d-b566-e5f72b15bc7f, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@3f731338, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@3f731338}] 16:29:52,952|TRACE||org.springframework.integration.channel.QueueChannel|postReceive on channel 'reply', message is null 16:29:52,952|DEBUG||org.springframework.integration.endpoint.PollingConsumer|Received no Message during the poll, returning 'false' 16:29:52,963|TRACE||org.springframework.integration.channel.QueueChannel|preReceive on channel 'reply' 16:29:53,049|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetConnection|Message received [Payload=[B@3179aaa6][Headers={timestamp=1355779793049, id=ab69790f-5854-433b-9729-e762b97fa48c, ip_tcp_remotePort=1025, ip_address=192.168.1.2, ip_hostname=192.168.1.2, ip_connectionId=192.168.1.2:1025:b2a886c1-568f-48dd-b70d-209191d34bff}] 16:29:53,049|DEBUG||com.mylib.ConexionInterceptor|INIT 16:29:53,049|DEBUG||com.mylib.ConexionInterceptor|BEFORE 16:29:53,049|ERROR||org.springframework.integration.ip.tcp.TcpOutboundGateway|Cannot correlate response - no pending reply 16:29:53,049|DEBUG||com.mylib.ConexionInterceptor|ENTER 16:29:53,050|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetConnection|Message sent [Payload=YYYY][Headers={timestamp=1355779793049, id=fe4e694c-06e1-44b8-aa22-718ffa249754}] 16:29:53,050|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetConnection|Closing single use socket after inbound message 192.168.1.2:1025:b2a886c1-568f-48dd-b70d-209191d34bff 16:29:53,908|TRACE||org.springframework.integration.core.MessagingTemplate|failed to receive message from channel 'org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@3f731338' within timeout: 1000
Code:public class ConexionInterceptor implements TcpConnectionInterceptor { protected final Logger log = Logger.getLogger(getClass()); .... public boolean onMessage(Message<?> message) { log.debug("INIT"); if (this.tcpListener == null) { throw new NoListenerException("No listener registered for message reception"); } log.debug("BEFORE"); boolean b = this.tcpListener.onMessage(message); try { log.debug("ENTER"); this.theConnection.send( MessageBuilder.withPayload("YYYY").build()); } catch (Exception e) { log.debug("ERROR"); log.debug(e); // TODO Auto-generated catch block throw new NoListenerException("ERROR"); } return b; }
The linux service doesnt have problems. The bad code doesnt return the response.
Can you show you SI configuration? It looks like you have both an <ip:outbound-gateway/> and an <ip:outbound-channel-adapter/> subscribed to channel 'input'.
When more than one subscriber is subscribed to a direct channel, the framework round-robins the messages between the subscribers. Hence the first goes through the gateway, the second goes through the adapter. The gateway doesn't know anything about messages sent through the adapter (which is for one-way integration), there is no way it can handle the reply...
Good...
Bad...Code:16:29:49,375|DEBUG||org.springframework.integration.ip.tcp.TcpOutboundGateway|Added 192.168.1.2:1025:5578e9c1-adb1-4651-9542-c81342439656
The good/bad will alternate because the dispatcher alternates the requests between the gateway and the adapter.Code:16:29:52,904|DEBUG||org.springframework.integration.channel.DirectChannel|preSend on channel 'input', message: [Payload=XXXXX][... ... 16:29:52,904|DEBUG||org.springframework.integration.ip.tcp.TcpSendingMessageHandler|org.springframework.integration.ip.tcp.TcpSendingMessageHandler#0 received message: [Payload=XXXXX][Headers=... ... 16:29:53,049|ERROR||org.springframework.integration.ip.tcp.TcpOutboundGateway|Cannot correlate response - no pending reply
Last edited by Gary Russell; Dec 18th, 2012 at 07:56 AM.
Gary P. Russell
Spring Integration Team
SpringSource, a division of VMware
You're in the correct!. Everything OK right now, Thank you very much, and congratulations for this excellent module.