Results 1 to 9 of 9

Thread: how to do to send other message before close connection in single use client ?

  1. #1
    Join Date
    Nov 2012
    Posts
    11

    Default how to do to send other message before close connection in single use client ?

    Hi, Is there any way to send other message ( different the original ) before tcpnetconnection run closeconnection?? I'm connecting to a linux service and I need a message after receiving response and befor close connection.

  2. #2
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,147

    Default

    See http://static.springsource.org/sprin...p-interceptors

    You can add an interceptor and intercept the onMessage() method and send the data either before or after calling super.onMessage() (which will send the reply to the sender).

    When the intercepted method returns, the connection will be closed.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  3. #3
    Join Date
    Nov 2012
    Posts
    11

    Default

    hi Gary, I implemented a interceptor factory class and also implemented a interceptor with onMessage method ( based on AbstractTcpConnectionInterceptor) :


    public boolean onMessage(Message<?> message) {
    if (this.tcpListener == null) {
    throw new NoListenerException("No listener registered for message reception");
    }
    boolean b = this.tcpListener.onMessage(message);
    try {
    this.theConnection.send( MessageBuilder.withPayload("XXXXXX").build());
    } catch (Exception e) {
    log.debug(e);
    // TODO Auto-generated catch block
    throw new NoListenerException("ERROR");
    }
    return b;
    }


    XXXX is the message. The problem is sometimes appear in the log:
    16:33:06,072|ERROR||org.springframework.integratio n.ip.tcp.TcpOutboundGateway|Cannot correlate response - no pending reply
    or
    16:33:06,072|DEBUG||com.booleancore.ws.ConexionInt erceptor|java.net.SocketException: Broken pipe

    I used a tcp-connection-factory type client and tcp-outbound-gateway,

    like I said , I need to send a message with the gateway ( because i need the response ) but I need to send a message ( without response ) before close connection.

    Please help me, thks!

  4. #4
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,147

    Default

    I am not sure what's going on for you, but I just wrote this test case and all works as expected for me...

    Code:
    	@Test
    	public void testInterceptSendAck() throws Exception {
    		final int port = SocketUtils.findAvailableServerSocket();
    		AbstractConnectionFactory ccf = new TcpNetClientConnectionFactory("localhost", port);
    		final CountDownLatch latch = new CountDownLatch(1);
    		final AtomicBoolean done = new AtomicBoolean();
    		final AtomicReference<Object> ack = new AtomicReference<Object>();
    		final CountDownLatch ackLatch = new CountDownLatch(1);
    		Executors.newSingleThreadExecutor().execute(new Runnable() {
    			public void run() {
    				try {
    					ServerSocket server = ServerSocketFactory.getDefault().createServerSocket(port, 100);
    					latch.countDown();
    					while (true) {
    						Socket socket = server.accept();
    						ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
    						ois.readObject();
    						ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
    						oos.writeObject("bar");
    						ois = new ObjectInputStream(socket.getInputStream());
    						ack.set(ois.readObject());
    						ackLatch.countDown();
    					}
    				} catch (Exception e) {
    					if (!done.get()) {
    						e.printStackTrace();
    					}
    				}
    			}
    		});
    		ccf.setSerializer(new DefaultSerializer());
    		ccf.setDeserializer(new DefaultDeserializer());
    		ccf.setSoTimeout(10000);
    		ccf.setSingleUse(true);
    		TcpConnectionInterceptorFactoryChain interceptorFactoryChain = new TcpConnectionInterceptorFactoryChain();
    		interceptorFactoryChain.setInterceptors(new TcpConnectionInterceptorFactory[] {new TcpConnectionInterceptorFactory() {
    			public TcpConnectionInterceptor getInterceptor() {
    				return new AbstractTcpConnectionInterceptor() {
    
    					@Override
    					public boolean onMessage(Message<?> message) {
    						boolean b = super.onMessage(message);
    						try {
    							this.send(MessageBuilder.withPayload("baz").build());
    						}
    						catch (Exception e) {
    							e.printStackTrace();
    						}
    						return b;
    					}
    				};
    			}
    		}});
    		ccf.setInterceptorFactoryChain(interceptorFactoryChain );
    		ccf.afterPropertiesSet();
    		ccf.start();
    		assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
    		TcpOutboundGateway gateway = new TcpOutboundGateway();
    		gateway.setConnectionFactory(ccf);
    		QueueChannel replyChannel = new QueueChannel();
    		gateway.setRequiresReply(true);
    		gateway.setOutputChannel(replyChannel);
    		gateway.afterPropertiesSet();
    		gateway.handleMessage(MessageBuilder.withPayload("foo").build());
    		assertTrue(ackLatch.await(10000, TimeUnit.MILLISECONDS));
    		Message<?> reply = replyChannel.receive(10000);
    		assertNotNull(reply);
    		assertEquals("bar", reply.getPayload());
    		assertEquals("baz", ack.get());
    	}
    Here's what I see in the log...

    Code:
    2012-12-17 07:37:30,542 [main] DEBUG: org.springframework.integration.ip.tcp.connection.TcpNetConnection - Message sent [Payload=foo][Headers={timestamp=1355747850530, id=cd45d182-0858-4901-ab40-2a3063adee04}]
    2012-12-17 07:37:30,546 [pool-2-thread-1] DEBUG: org.springframework.integration.ip.tcp.connection.TcpNetConnection - Message received [Payload=bar][Headers={timestamp=1355747850546, id=a08c45da-3817-4876-9975-6ffeb2f5cbf6, ip_tcp_remotePort=59336, ip_address=127.0.0.1, ip_hostname=localhost, ip_connectionId=localhost:59336:558c1d07-67f9-498c-9329-c3c1b3ec6135}]
    2012-12-17 07:37:30,547 [main] DEBUG: org.springframework.integration.ip.tcp.TcpOutboundGateway - Respose [Payload=bar][Headers={timestamp=1355747850546, id=a08c45da-3817-4876-9975-6ffeb2f5cbf6, ip_tcp_remotePort=59336, ip_address=127.0.0.1, ip_hostname=localhost, ip_connectionId=localhost:59336:558c1d07-67f9-498c-9329-c3c1b3ec6135}]
    2012-12-17 07:37:30,547 [pool-2-thread-1] DEBUG: org.springframework.integration.ip.tcp.connection.TcpNetConnection - Message sent [Payload=baz][Headers={timestamp=1355747850546, id=bb2047f3-13d3-414f-a2e8-25b974ee773c}]
    2012-12-17 07:37:30,547 [pool-2-thread-1] DEBUG: org.springframework.integration.ip.tcp.connection.TcpNetConnection - Closing single use socket after inbound message localhost:59336:558c1d07-67f9-498c-9329-c3c1b3ec6135
    I suggest you compare your log with this.

    I would also suggest you subclass AbstractTcpConnectionInterceptor rather than "basing" your interceptor on it.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  5. #5
    Join Date
    Nov 2012
    Posts
    11

    Default

    Thks Gary, your example is perfect! but the weird in my testing and production, this run good 1 time and the next time not, then run good again and then not, and go on... i dont know the reason. The time is bad in the log shows:

    |ERROR||org.springframework.integration.ip.tcp.Tcp OutboundGateway|Cannot correlate response - no pending reply

    when is good, dont show that. The linux service only return values under request, but the other message before close connection doesnt response ( and it's correct ) , but i dont understand why run ok one time and the next not.

  6. #6
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,147

    Default

    That's weird; it implies a reply is being received before the request was sent out.

    Can you post a DEBUG log showing a good and bad event?
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  7. #7
    Join Date
    Nov 2012
    Posts
    11

    Default

    the correct is:

    Code:
    16:29:49,365|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory|Opening new socket connection to 192.168.1.2:1025
    16:29:49,374|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetConnection|Reading...
    16:29:49,375|DEBUG||com.mylib.ws.ByteToStringSerializer|Bytes disponibles para lectura:0
    16:29:49,375|DEBUG||org.springframework.integration.ip.tcp.TcpOutboundGateway|Added 192.168.1.2:1025:5578e9c1-adb1-4651-9542-c81342439656
    16:29:49,376|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetConnection|Message sent [Payload=XXXXX][Headers={timestamp=1355779789363, id=2703f41e-8542-4ebf-87f3-15b00d079485, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@2d0d719, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@2d0d719}]
    16:29:49,919|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetConnection|Message received [Payload=[B@1bcbb330][Headers={timestamp=1355779789919, id=6622103b-53d4-4d73-9428-ebcf11ad4fb0, ip_tcp_remotePort=1025, ip_address=192.168.1.2, ip_hostname=192.168.1.2, ip_connectionId=192.168.1.2:1025:5578e9c1-adb1-4651-9542-c81342439656}]
    16:29:49,919|DEBUG||com.mylib.ConexionInterceptor|INIT
    16:29:49,919|DEBUG||com.mylib.ConexionInterceptor|BEFORE
    16:29:49,920|DEBUG||com.mylib.ConexionInterceptor|ENTER
    16:29:49,920|DEBUG||org.springframework.integration.ip.tcp.TcpOutboundGateway|Respose [Payload=[B@1bcbb330][Headers={timestamp=1355779789919, id=6622103b-53d4-4d73-9428-ebcf11ad4fb0, ip_tcp_remotePort=1025, ip_address=192.168.1.2, ip_hostname=192.168.1.2, ip_connectionId=192.168.1.2:1025:5578e9c1-adb1-4651-9542-c81342439656}]
    16:29:49,921|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetConnection|Message sent [Payload=YYYY][Headers={timestamp=1355779789920, id=24ac770b-c398-46ad-9280-e2cee805ad24}]
    16:29:49,921|DEBUG||org.springframework.integration.ip.tcp.TcpOutboundGateway|handler 'org.springframework.integration.ip.tcp.TcpOutboundGateway#0' sending reply Message: [Payload=[B@1bcbb330][Headers={timestamp=1355779789921, id=40fa7230-ce9d-4da2-9752-a8194f9c1ec6, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@2d0d719, ip_address=192.168.1.2, ip_tcp_remotePort=1025, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@2d0d719, ip_hostname=192.168.1.2, ip_connectionId=192.168.1.2:1025:5578e9c1-adb1-4651-9542-c81342439656}]
    16:29:49,921|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetConnection|Closing single use socket after inbound message 192.168.1.2:1025:5578e9c1-adb1-4651-9542-c81342439656
    16:29:49,921|DEBUG||org.springframework.beans.factory.support.DefaultListableBeanFactory|Returning cached instance of singleton bean 'integrationConversionService'
    16:29:49,922|DEBUG||org.springframework.integration.channel.QueueChannel|preSend on channel 'reply', message: [Payload=RXXXXXX][Headers={timestamp=1355779789922, id=eb6d2237-59b8-460b-9bca-d363dc5d8846, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@2d0d719, ip_tcp_remotePort=1025, ip_address=192.168.1.2, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@2d0d719, ip_hostname=192.168.1.2, ip_connectionId=10.10.183.114:1025:5578e9c1-adb1-4651-9542-c81342439656}]
    and the bad is:


    Code:
    16:29:52,904|DEBUG||org.springframework.integration.channel.DirectChannel|preSend on channel 'input', message: [Payload=XXXXX][Headers={timestamp=1355779792904, id=e90e1f6d-910b-462d-b566-e5f72b15bc7f, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@3f731338, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@3f731338}]
    16:29:52,904|DEBUG||org.springframework.integration.ip.tcp.TcpSendingMessageHandler|org.springframework.integration.ip.tcp.TcpSendingMessageHandler#0 received message: [Payload=XXXXX][Headers={timestamp=1355779792904, id=e90e1f6d-910b-462d-b566-e5f72b15bc7f, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@3f731338, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@3f731338}]
    16:29:52,904|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory|Opening new socket connection to 192.168.1.2:1025
    16:29:52,905|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetConnection|Reading...
    16:29:52,905|DEBUG||org.springframework.integration.ip.tcp.TcpSendingMessageHandler|Got Connection 192.168.1.2:1025:b2a886c1-568f-48dd-b70d-209191d34bff
    16:29:52,905|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetConnection|Message sent [Payload=XXXXX][Headers={timestamp=1355779792904, id=e90e1f6d-910b-462d-b566-e5f72b15bc7f, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@3f731338, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@3f731338}]
    16:29:52,906|DEBUG||org.springframework.integration.channel.DirectChannel|postSend (sent=true) on channel 'input', message: [Payload=XXXXX][Headers={timestamp=1355779792904, id=e90e1f6d-910b-462d-b566-e5f72b15bc7f, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@3f731338, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@3f731338}]
    16:29:52,952|TRACE||org.springframework.integration.channel.QueueChannel|postReceive on channel 'reply', message is null
    16:29:52,952|DEBUG||org.springframework.integration.endpoint.PollingConsumer|Received no Message during the poll, returning 'false'
    16:29:52,963|TRACE||org.springframework.integration.channel.QueueChannel|preReceive on channel 'reply'
    16:29:53,049|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetConnection|Message received [Payload=[B@3179aaa6][Headers={timestamp=1355779793049, id=ab69790f-5854-433b-9729-e762b97fa48c, ip_tcp_remotePort=1025, ip_address=192.168.1.2, ip_hostname=192.168.1.2, ip_connectionId=192.168.1.2:1025:b2a886c1-568f-48dd-b70d-209191d34bff}]
    16:29:53,049|DEBUG||com.mylib.ConexionInterceptor|INIT
    16:29:53,049|DEBUG||com.mylib.ConexionInterceptor|BEFORE
    16:29:53,049|ERROR||org.springframework.integration.ip.tcp.TcpOutboundGateway|Cannot correlate response - no pending reply
    16:29:53,049|DEBUG||com.mylib.ConexionInterceptor|ENTER
    16:29:53,050|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetConnection|Message sent [Payload=YYYY][Headers={timestamp=1355779793049, id=fe4e694c-06e1-44b8-aa22-718ffa249754}]
    16:29:53,050|DEBUG||org.springframework.integration.ip.tcp.connection.TcpNetConnection|Closing single use socket after inbound message 192.168.1.2:1025:b2a886c1-568f-48dd-b70d-209191d34bff
    16:29:53,908|TRACE||org.springframework.integration.core.MessagingTemplate|failed to receive message from channel 'org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@3f731338' within timeout: 1000
    the interceptor class is the copy from AbstractTcpConnectionInterceptor

    Code:
    public class ConexionInterceptor  implements TcpConnectionInterceptor {
    	 protected final Logger log = Logger.getLogger(getClass());
    ....
    
    	public boolean onMessage(Message<?> message) {
    		log.debug("INIT");
    		if (this.tcpListener == null) {
    			throw new NoListenerException("No listener registered for message reception");
    		}
    		log.debug("BEFORE");
    		boolean b = this.tcpListener.onMessage(message);
    		try {
    			log.debug("ENTER");
    			
    			this.theConnection.send( MessageBuilder.withPayload("YYYY").build());
    			
    		} catch (Exception e) {
    			log.debug("ERROR");
    			log.debug(e);
    			
    			
    			
    			// TODO Auto-generated catch block
    			throw new NoListenerException("ERROR");
    		}
    		return b;
    	}



    The linux service doesnt have problems. The bad code doesnt return the response.

  8. #8
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,147

    Default

    Can you show you SI configuration? It looks like you have both an <ip:outbound-gateway/> and an <ip:outbound-channel-adapter/> subscribed to channel 'input'.

    When more than one subscriber is subscribed to a direct channel, the framework round-robins the messages between the subscribers. Hence the first goes through the gateway, the second goes through the adapter. The gateway doesn't know anything about messages sent through the adapter (which is for one-way integration), there is no way it can handle the reply...

    Good...
    Code:
    16:29:49,375|DEBUG||org.springframework.integration.ip.tcp.TcpOutboundGateway|Added 192.168.1.2:1025:5578e9c1-adb1-4651-9542-c81342439656
    Bad...
    Code:
    16:29:52,904|DEBUG||org.springframework.integration.channel.DirectChannel|preSend on channel 'input', message: [Payload=XXXXX][...
    ...
    16:29:52,904|DEBUG||org.springframework.integration.ip.tcp.TcpSendingMessageHandler|org.springframework.integration.ip.tcp.TcpSendingMessageHandler#0 received message: [Payload=XXXXX][Headers=...
    ...
    16:29:53,049|ERROR||org.springframework.integration.ip.tcp.TcpOutboundGateway|Cannot correlate response - no pending reply
    The good/bad will alternate because the dispatcher alternates the requests between the gateway and the adapter.
    Last edited by Gary Russell; Dec 18th, 2012 at 07:56 AM.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  9. #9
    Join Date
    Nov 2012
    Posts
    11

    Default

    You're in the correct!. Everything OK right now, Thank you very much, and congratulations for this excellent module.

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •