Page 1 of 2 12 LastLast
Results 1 to 10 of 15

Thread: TCP Inbound Gateway with Splitter/Aggregator multi-threaded.

  1. #1

    Default TCP Inbound Gateway with Splitter/Aggregator multi-threaded.

    Here is my problem:

    I've implemented a socket server like this (using spring integration):

    Code:
    <task:executor id="socketExecutor" pool-size="${integration.socket.pool.max.size}"/>
    	
    <ip:tcp-connection-factory id="server" type="server" port="${integration.server.port}" using-nio="${integration.server.using-nio}"
    		single-use="${integration.server.single-use}" so-timeout="${integration.server.so-timeout}"
    		deserializer="deserializer" task-executor="socketExecutor" so-linger="${integration.server.so-linger}" 
    		so-send-buffer-size="${integration.server.so-send-buffer}" so-receive-buffer-size="${integration.server.so-receive-buffer}" />
    
    <ip:tcp-inbound-channel-adapter channel="tcpInputChannel" connection-factory="server" error-channel="errorChannel"/>
    <ip:tcp-outbound-channel-adapter channel="tcpOutpuChannel" connection-factory="server" />
    So each incoming tcp call is handle by a different thread from the 'socketExecutor' thread pool.

    It reads the socket and sends the input to the splitter/aggregator for further processing:

    Code:
    <task:executor id="asmTxExecutor" pool-size= "${integration.asmv.pool.max.size}" />
    
    <channel id="parallelTxChannel">
    	<dispatcher task-executor="asmTxExecutor" />
    </channel>
    
    <chain input-channel="tcpInputChannel" output-hannel="parallelTxChannel">
    	<splitter ref="splitter" />
    </chain>
    
    <chain input-channel="parallelTxChannel" output-channel= "tcpOutpuChannel">
    	<service-activator ref="transactionExecutorService" />
    	<aggregator ref="aggregator" />
    </chain>
    Here the intended flow (it's all direct channel, so it's like one big transaction)
    The input gets split (multi-thread processing) then aggregated and reply.

    The problem is once the input message reaches the splitter and the multi-threading starts, my tcp server loses focus and think there is a new connection and tries to read (actually re-read), but blocks, until the multi-processing is completed and replied onto the output-channel. With this action, the socket is closed and the extra 'blocked' read then throws a SocketException.

    Is it clear?
    Is there a way to prevent this extra read from happening?
    It's seems like the uses of 2 different thread pool is causing this problem.
    The extra read is eliminated when no second thread pool is used, eliminating the multi-threaded processing at the same time.

  2. #2
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,037

    Default

    Are you setting using-nio to true? Collaborating channel adapters (that use the same connection factory) are really intended for use with NIO. I would expect the behavior you are seeing with using-nio="false". When using NIO, threads from the pool are only used when data is available to be read from the socket; otherwise, they are returned to the pool. When the result is sent asynchronously, the channel (socket) will be closed (assuming single-use=true), and no error will appear on the read side (because there is no blocked thread).

    If I have misunderstood your situation, please attach a zip with a debug log illustrating the problem.

    Thanks
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  3. #3

    Default

    I've tough I solved my problems by using the scope attribute on the channel using the 'second' thread pool. Here' how it look:

    Code:
    <channel id="scopedTarget.parallelTxChannel" scope="thread">
       <dispatcher task-executor="asmTxExecutor" />
    </channel>
    
    <beans:bean class= "org.springframework.beans.factory.config.CustomScopeConfigurer">
       <beans:property name="scopes">
          <beans:map>
    	  <beans:entry key="thread" value="org.springframework.context.support.SimpleThreadScope" />
          </beans:map>
       </beans:property>
    </beans:bean>
    But this setting actually only eliminated the multi-threading.

    Here are my settings for the server:

    Code:
    integration.asmv.pool.max.size=10
    integration.socket.pool.max.size=10
    
    integration.server.port=14550
    integration.server.using-nio=true
    integration.server.single-use=true
    integration.server.so-timeout=60000
    integration.server.so-linger=60
    integration.server.so-send-buffer=25350
    integration.server.so-receive-buffer=43720
    Also, when in debug mode, it doesn't seem to use the TcpNioServerConnectionFactory, but the TcpNetServerConnectionFactory even though the 'using-nio' flags is on. Any reasons for this?

    Thanks, as always for the quick response.
    Last edited by Olivier Quirion; May 5th, 2011 at 09:46 AM.

  4. #4

    Default

    Hi Gary,
    As you can read from my previous-post I was mislead and going in a wrong direction. I've re-read the reference on collaborative channel adapter and you're post and by luck, tried this:

    Code:
    <ip:tcp-connection-factory id="server" type="server" port="${integration.server.port}" using-nio="true" single-use="true" 
    		so-timeout="${integration.server.so-timeout}" deserializer="deserializer" task-executor="socketExecutor" 
    		so-linger="${integration.server.so-linger}" so-send-buffer-size="${integration.server.so-send-buffer}" 
    		so-receive-buffer-size="${integration.server.so-receive-buffer}" />
    And it all started to work as expected. The problems was :

    Code:
    integration.server.using-nio=true
    integration.server.single-use=true
    ...
    
    using-nio="${integration.server.using-nio}"
    single-use="${integration.server.single-use}"
    It seems the 'property-placeholder' didn't work on the boolean field. By hard coding the value in the context it started to use the right TcpNioServerConnectionFactory.

    Can you corroborate this behavior?
    Thanks

  5. #5
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,037

    Default

    Thanks, I can confirm that using a placeholder (or SpEL expression) for the 'using-nio' attribute does not work; it causes a TcpNet* connection factory to be defined unconditionally.

    Would you mind entering a JIRA issue?

    I may not be able to get the fix done in time for the upcoming 2.0.4 release, however.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  6. #6

    Default

    Gary, my java application will be running in production on Z/Os, but I've ran into some problems when using TcpNioServerConnectionFactory on Z/Os (I run into a sort of deadlock after a few call to the server). Therefore, I've returned to the old TcpNet* and the extra read on the socket reappeared. From the reading of your post, this seems to be the normal behavior? Is this true and why? Should I return to an inbound gateway instead of using collaborative channel adapters? Would that solve the problem of the extra read? Collaborative channel adapters are purely for NIO? I've reed the documentation a few times and that didn't pop out to me.

    Regards,

  7. #7
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,037

    Default

    @Olivier,

    Collaborating channel adapters were introduced to alleviate a performance issue with gateways, particularly when one shared connection is being used for all messages.

    The reference guide says collaborating adapters are an alternative to using single-use connections with gateways, when the overhead of opening/closing sockets is too much,

    Another use case for collaborating adapters is where you want entirely asynchronous messaging (not request/reply) between two systems (again, usually using a single persistent connection).

    Given that you are using single-use connections, and request/response, there is no benefit to using collaborating adapters.

    I am not saying we couldn't eliminate the read-error log message you are seeing, it is just that I did not anticipate they would be used for single-use connections.

    For your use case, using an inbound gateway on the server would be the best practice.

    Further, unless you anticipate a very large number of (concurrent) connections, or your server process (time to handle a request) is very long, I would not expect you to see much, if any, difference between using NIO or not.

    Hope that helps...
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  8. #8

    Default

    Thanks Gary, for this clear and concise answer.
    I've tried the Inbound-Gateway, but I get an odd behavior. The different messages seems to lose there ways back to the gateway (they create their own TemporaryReturnChannel). I've pin pointed the problem to my usage of a task executor to perform parallel treatment.

    Here the flow

    message --> gateway (in) --> splitter --> service activator --> aggregator --> gateway (out)

    I've used collaborative adapters because the inbound gateway just does not work when I use the task executor.

    Code:
    <task:executor id="socketExecutor" pool-size="${integration.socket.pool.max.size}"/>
    
    <ip:tcp-connection-factory id="server" type="server" port="${integration.server.port}" using-nio="false" single-use="true"  deserializer="deserializer" task-executor="socketExecutor" />
    
    <ip:tcp-inbound-gateway request-channel="tcpInputChannel" reply-channel="tcpOutputChannel" connection-factory="server" error-channel="errorChannel"/>
    
    <chain input-channel="tcpInputChannel" output-channel="parallelTxChannel">
    	<transformer ref="unmarshallingTransformer" />
    	<splitter ref="splitter" />
    </chain>
    
    <chain input-channel="parallelTxChannel" output-channel="tcpOutputChannel" >
    	<service-activator ref="transactionExecutorService">
    		<task:executor id="asmTxExecutor"/>
    	</service-activator>
    	<aggregator ref="aggregator" />
    	<transformer ref="marshallingTransformer" />
    </chain>
    	
    <chain input-channel="errorChannel" output-channel="tcpOutputChannel">
    	<service-activator ref="exceptionHandlingService" method="handleException"  />
    	<transformer ref="marshallingTransformer" />
    </chain>
    
    <channel id="parallelTxChannel">
    	<dispatcher task-executor="asmTxExecutor" />
    </channel>
    
    <task:executor id="asmTxExecutor" pool-size="${integration.asmv.pool.max.size}" />
    I've tried without the executor and everything works fine (but single threaded), but when I add the task executor, the transaction flow (direct channel flow) seems broke by the usage of different thread. Is there a way to setup this flow so all direct channel stay connected and the end message gets routed back through the original gateway?

    Should I use scope channel and how? Should I quit and tell myself that the collaborative adapter is the way to go (they work but with mild problem, the extra read) ? How can I link the different thread used by splitter/service/aggregator to the original thread used by the gateway?

  9. #9
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,037

    Default

    Going async shouldn't make a difference; especially because you are putting an explicit reply channel on the second chain (this can really be omitted because SI will route to the temporary reply channel in the header if there is no output channel). You only really need to explicitly route to the reply channel if you want so do something else (such as wire tap, pub-sub etc for the reply).

    What does your aggregator look like?

    When an aggregator just returns a payload, SI will take care of making sure the headers are correct; merging the headers from inbound messages (however, it will remove the reply channel if two inbound messages have different reply channels).

    Can you attach a zip with a DEBUG level log (for org,springframework.integration classes) with the task executor in place?
    Last edited by Gary Russell; May 10th, 2011 at 08:33 AM.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  10. #10

    Default

    Here is my splitter/aggregator. I've removed all MessageBuilder type stuff from from my code so I don't screw the headers, so I'm all POJO.

    Code:
    public class SplitterImpl {
    	
    	public Object splitMessage(AtsRequest atsRequest) {
          return atsRequest.getRequests();
    	}	
    }
    
    public class AggregatorImpl {
    
    	public AtsResponse aggregate(List<Response> responses) {
    		AtsResponse atsResponse = new AtsResponse();
    		
    		for (Response response : responses) {
    			atsResponse.getResponses().add(response);
    		}
          return atsResponse;
    	}
    }
    ats.log.zip

Tags for this Thread

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •