I am not sure what's going on for you, but I just wrote this test case and all works as expected for me...
Code:
@Test
public void testInterceptSendAck() throws Exception {
final int port = SocketUtils.findAvailableServerSocket();
AbstractConnectionFactory ccf = new TcpNetClientConnectionFactory("localhost", port);
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean done = new AtomicBoolean();
final AtomicReference<Object> ack = new AtomicReference<Object>();
final CountDownLatch ackLatch = new CountDownLatch(1);
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
try {
ServerSocket server = ServerSocketFactory.getDefault().createServerSocket(port, 100);
latch.countDown();
while (true) {
Socket socket = server.accept();
ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
ois.readObject();
ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
oos.writeObject("bar");
ois = new ObjectInputStream(socket.getInputStream());
ack.set(ois.readObject());
ackLatch.countDown();
}
} catch (Exception e) {
if (!done.get()) {
e.printStackTrace();
}
}
}
});
ccf.setSerializer(new DefaultSerializer());
ccf.setDeserializer(new DefaultDeserializer());
ccf.setSoTimeout(10000);
ccf.setSingleUse(true);
TcpConnectionInterceptorFactoryChain interceptorFactoryChain = new TcpConnectionInterceptorFactoryChain();
interceptorFactoryChain.setInterceptors(new TcpConnectionInterceptorFactory[] {new TcpConnectionInterceptorFactory() {
public TcpConnectionInterceptor getInterceptor() {
return new AbstractTcpConnectionInterceptor() {
@Override
public boolean onMessage(Message<?> message) {
boolean b = super.onMessage(message);
try {
this.send(MessageBuilder.withPayload("baz").build());
}
catch (Exception e) {
e.printStackTrace();
}
return b;
}
};
}
}});
ccf.setInterceptorFactoryChain(interceptorFactoryChain );
ccf.afterPropertiesSet();
ccf.start();
assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
TcpOutboundGateway gateway = new TcpOutboundGateway();
gateway.setConnectionFactory(ccf);
QueueChannel replyChannel = new QueueChannel();
gateway.setRequiresReply(true);
gateway.setOutputChannel(replyChannel);
gateway.afterPropertiesSet();
gateway.handleMessage(MessageBuilder.withPayload("foo").build());
assertTrue(ackLatch.await(10000, TimeUnit.MILLISECONDS));
Message<?> reply = replyChannel.receive(10000);
assertNotNull(reply);
assertEquals("bar", reply.getPayload());
assertEquals("baz", ack.get());
}
Here's what I see in the log...
Code:
2012-12-17 07:37:30,542 [main] DEBUG: org.springframework.integration.ip.tcp.connection.TcpNetConnection - Message sent [Payload=foo][Headers={timestamp=1355747850530, id=cd45d182-0858-4901-ab40-2a3063adee04}]
2012-12-17 07:37:30,546 [pool-2-thread-1] DEBUG: org.springframework.integration.ip.tcp.connection.TcpNetConnection - Message received [Payload=bar][Headers={timestamp=1355747850546, id=a08c45da-3817-4876-9975-6ffeb2f5cbf6, ip_tcp_remotePort=59336, ip_address=127.0.0.1, ip_hostname=localhost, ip_connectionId=localhost:59336:558c1d07-67f9-498c-9329-c3c1b3ec6135}]
2012-12-17 07:37:30,547 [main] DEBUG: org.springframework.integration.ip.tcp.TcpOutboundGateway - Respose [Payload=bar][Headers={timestamp=1355747850546, id=a08c45da-3817-4876-9975-6ffeb2f5cbf6, ip_tcp_remotePort=59336, ip_address=127.0.0.1, ip_hostname=localhost, ip_connectionId=localhost:59336:558c1d07-67f9-498c-9329-c3c1b3ec6135}]
2012-12-17 07:37:30,547 [pool-2-thread-1] DEBUG: org.springframework.integration.ip.tcp.connection.TcpNetConnection - Message sent [Payload=baz][Headers={timestamp=1355747850546, id=bb2047f3-13d3-414f-a2e8-25b974ee773c}]
2012-12-17 07:37:30,547 [pool-2-thread-1] DEBUG: org.springframework.integration.ip.tcp.connection.TcpNetConnection - Closing single use socket after inbound message localhost:59336:558c1d07-67f9-498c-9329-c3c1b3ec6135
I suggest you compare your log with this.
I would also suggest you subclass AbstractTcpConnectionInterceptor rather than "basing" your interceptor on it.