I have been trying to work out how to correctly configure a poller in Spring Integration but I have run into some problems (I raised a similar post a few weeks ago, but I didn't get any reply; see http://forum.springsource.org/showthread.php?t=93555).
Basically, I would like to emulate event-driven behaviour using a poller. This is described in the Spring Integration Reference Manual as follows:
In an attempt to use the "long polling" technique described, I have created a poller and task executor as shown below. The poller has a timeout of 5 seconds and a trigger interval of 50 ms, and I have 20 threads in my pool:The 'receiveTimeout' property specifies the amount of time the poller should wait if no messages are available when it invokes the receive operation. For example, consider two options that seem similar on the surface but are actually quite different: the first has an interval trigger of 5 seconds and a receive timeout of 50 milliseconds while the second has an interval trigger of 50 milliseconds and a receive timeout of 5 seconds. The first one may receive a message up to 4950 milliseconds later than it arrived on the channel (if that message arrived immediately after one of its poll calls returned). On the other hand, the second configuration will never miss a message by more than 50 milliseconds. The difference is that the second option requires a thread to wait, but as a result it is able to respond much more quickly to arriving messages. This technique, known as "long polling", can be used to emulate event-driven behavior on a polled source.
Now this initially appeared to be working fine, but during testing we started to see problems with memory depletion, and this was traced to the above configuration.Code:<si:service-activator input-channel="publishChannel" ref="publishErrorHandler" method="passThrough"> <si:poller receive-timeout="5000" task-executor="publicationTaskExecutor"> <si:interval-trigger interval="50"/> <si:transactional transaction-manager="databaseTransactionManager"/> </si:poller> </si:service-activator> ... <bean id="publicationTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="20"/> <property name="maxPoolSize" value="20"/> <property name="waitForTasksToCompleteOnShutdown" value="true"/> <property name="daemon" value="false"/> <property name="threadNamePrefix" value="publicationService-"/> </bean>
The problem is that the poller keeps scheduling new tasks even though all the threads are blocked waiting for either (i) a new message to arrive, or (ii) the timeout to expire. Given that there are 20 threads executing tasks with a 5 second timeout, they will be executed at a rate of 4 per second (5000/20 = 250ms). But, new tasks are being scheduled at a rate of 20 per second, so the internal queue in the task executor will grow at a rate of 16 per second (while the process is idle), so we essentially have a memory leak.
Obviously, this is not right, but how do I go about correctly configuring a poller for "long polling". Should I be using a different type of task executor? Should I be specifying a queue capacity of zero on the task executor, and then setting a 'Discard' policy for the rejection policy?
I can tweak the interval and timeout values until I get something that's workable, but I'd like to understand how this is supposed to work.