Let me take one step back and state the problem -
We need to be able to process 50 TPS through the [gateway|channel adapter|whatever] over a persistent tcp/ip connection.
Now onto how we thought we'd do that...
I have single-use=false, so-keep-alive=true and so-timeout=0 for the connection factory.
Regarding thread sending to and receiving from, I think I see why it looks this way. The code is using a gateway to send the message, and the gateway by nature waits on the response on the same thread right? (Which now looking closer at this, it might be this wrong setup causing the issue. Here is the context file that has the adapters and gateway configured:
Code:
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans ...>
<poller id="defaultPoller" default="true" max-messages-per-poll="500" fixed-rate="40"/>
<!--
Program will first send the message through local gateway if it fails it will send the message to remote gateway
-->
<gateway id="localGateWay" service-interface="com.test.TestGateway"
default-request-channel="localRequestChannel" />
<gateway id="remoteGateWay" service-interface="com.test.TestGateway"
default-request-channel="remoteRequestChannel" />
<!-- channel declaration for sending message from gateways -->
<publish-subscribe-channel id="localRequestChannel"/> <!-- task-executor="myExecutor" -->
<publish-subscribe-channel id="remoteRequestChannel"/> <!-- task-executor="myExecutor" -->
<!--
channel declaration for sending messages to outbound adapters
these channels will take care of roundrobin and failover
-->
<channel id="localRequestChannelForOutbound">
<dispatcher failover="true" load-balancer="round-robin"/>
</channel>
<channel id="remoteRequestChannelForOutbound">
<dispatcher failover="true" load-balancer="round-robin"/>
</channel>
<!-- declaration of channels used for storing the message received from servers -->
<channel id="localReplyChannelFromServer">
<queue capacity="500"/>
</channel>
<channel id="remoteReplyChannelFromServer">
<queue capacity="500"/>
</channel>
<!--
when request message is kept in request channel from gateway
one copy of it will be sent replychannelfromserver which will be used in aggregator to match with the
reply message
one more copy will be send to requestChannelForOutbound which will be used by outbound adapter to process
-->
<bridge input-channel="localRequestChannel" output-channel="localReplyChannelFromServer" order="1"/>
<bridge input-channel="localRequestChannel" output-channel="localRequestChannelForOutbound" order="2"/>
<bridge input-channel="remoteRequestChannel" output-channel="remoteReplyChannelFromServer" order="1"/>
<bridge input-channel="remoteRequestChannel" output-channel="remoteRequestChannelForOutbound" order="2"/>
<!-- out bound /in bound adapter declaration for local instance -->
<ip:tcp-outbound-channel-adapter id="adapter_outbound_local_client_1"
channel="localRequestChannelForOutbound"
order="1"
connection-factory="local_client_1"/>
<ip:tcp-inbound-channel-adapter id="adapter_inbound_local_client_1"
channel="localReplyChannelFromServer"
connection-factory="local_client_1"/>
<ip:tcp-outbound-channel-adapter id="adapter_outbound_local_client_2"
channel="localRequestChannelForOutbound"
order="2"
connection-factory="local_client_2"/>
<ip:tcp-inbound-channel-adapter id="adapter_inbound_local_client_2"
channel="localReplyChannelFromServer"
connection-factory="local_client_2"/>
<!-- out bound /in bound adapter declaration for remote instance -->
<ip:tcp-outbound-channel-adapter id="adapter_outbound_remote_client_1"
channel="remoteRequestChannelForOutbound"
order="1"
connection-factory="remote_client_1"/>
<ip:tcp-inbound-channel-adapter id="adapter_inbound_remote_client_1"
channel="remoteReplyChannelFromServer"
connection-factory="remote_client_1"/>
<ip:tcp-outbound-channel-adapter id="adapter_outbound_remote_client_2"
channel="remoteRequestChannelForOutbound"
order="2"
connection-factory="remote_client_2"/>
<ip:tcp-inbound-channel-adapter id="adapter_inbound_remote_client_2"
channel="remoteReplyChannelFromServer"
connection-factory="remote_client_2"/>
<!-- following aggregator will aggregate message received from server with corresponding message from client
while request is send , the same is saved in ReplyChannelFromServer, and when response comes it will also come to
ReplyChannelFromServer then the correlation strategy will check whether the echo of both request and response are same
then it considers as match and there are 2 such messages (release strategy) it will send them to output channel
-->
<aggregator id="localAggregator" input-channel="localReplyChannelFromServer"
output-channel="clientBytes2StringForSecondPayLoadChn"
correlation-strategy="correlationStrategyBean"
send-timeout="4000"
release-strategy-expression="size() == 2" />
<aggregator id="remoteAggregator" input-channel="remoteReplyChannelFromServer"
output-channel="clientBytes2StringForSecondPayLoadChnRemote"
correlation-strategy="correlationStrategyBean"
send-timeout="4000"
release-strategy-expression="size() == 2" />
<!-- always second message will be the message received from the server -->
<transformer id="clientBytes2StringForSecondPayLoadLocal" input-channel="clientBytes2StringForSecondPayLoadChn" expression="new String(payload.get(1))" />
<transformer id="clientBytes2StringForSecondPayLoadRemote" input-channel="clientBytes2StringForSecondPayLoadChnRemote" expression="new String(payload.get(1))" />
<beans:bean id="correlationStrategyBean" class="com.test.CorrelationStrategy"/>
</beans:beans>
I guess what you're saying about using the work manager thread for all endpoints applies to the aggregator too, so I should probably add that to the config for those. I think the weird thing about the way we are using channel adapters in this case is that it is supposed to be 1-way communication, but we actually need the reply in the same thread that we are sending the request.
The code is roughly like so:
Code:
receive request from outside application
use request data to form different request to another system over tcp/ip
get response from other system
use response to create reply to original request from outside of application
send reply to original requestor
So we actually need the reply from that inbound channel before we can move on in the code logic. Originally, we were using a simple gateway because it naturally sends the request then waits for the response to move on. But because of speed requirements, (we need to process up to 50 transactions per second) we changed to using the channel adapters. The other reason we switched to adapters is because even when we had the single-use false set on the gateway, it would not keep a persistent connection and the connection would be closed after every request...sometimes even before the response returned. Being unable to get the gateway to keep a persistent connection, we moved over to the channel adapters.
But looking at our current configuration, it seems like we are still in effect using a gateway here. And whatever we are doing is causing the application to hang at this point when multiple threads arrive at the same time. If we send one or two single requests, no issues. But when we start the load... it chokes... and dies.