Results 1 to 6 of 6

Thread: Poller in a loop has no delay, while fixed-rate is set

  1. #1
    Join Date
    Apr 2009
    Location
    Warsaw, Poland
    Posts
    5

    Default Poller in a loop has no delay, while fixed-rate is set

    I have a following splitter & aggregator config:

    HTML Code:
    <int:poller id="fixedThreadPoolPoller" default="true" fixed-rate="3000"
        task-executor="fixedThreadPoolTaskExecutor" max-messages-per-poll="-1" />
    
    <int:channel id="tasksIncoming"><int:queue /></int:channel>
    
    <int:splitter id="taskToTaskAssetsSplitter" expression="payload.assets" 
        input-channel="tasksIncoming" output-channel="tasksAssetsWaitingForContentFiles" />	
    
    <int:channel id="tasksAssetsWaitingForContentFiles">
        <int:queue message-store="tasksWaitingForAssetsMessageStore" capacity="100" />
    </int:channel>
    
    <int:chain id="moveContentFilesChain" input-channel="tasksAssetsWaitingForContentFiles">
        <int:service-activator ref="ingestTaskHandler" method="moveContentFileIfExists" />
        <int:router expression="payload.gathered" >
            <int:mapping value="false" channel="tasksAssetsWaitingForContentFiles" />
            <int:mapping value="true" channel="tasksAssetsWithContentFilesFound" />
        </int:router>
    </int:chain>
    
    <int:channel id="tasksAssetsWithContentFilesFound">
        <int:queue message-store="tasksAssetsWithContentFilesFoundMessageStore" capacity="100" />
    </int:channel>
    	
    <int:aggregator id="taskAssetsToTaskAggregator" ref="ingestTaskHandler"
        method="aggregateTaskAssets" release-strategy="aggregateTaskAssetsReleaseStrategy"
        input-channel="tasksAssetsWithContentFilesFound" output-channel="tasksReadyToProcess" />
    
    <int:channel id="tasksReadyToProcess"><int:queue /></int:channel>
    The workflow is:
    1) task are incoming to the 'tasksIncoming'
    2) tasks are split into task assets
    3) the split assets are queued in 'tasksAssetsWaitingForContentFiles'
    4) the service avtivator in 'moveContentFilesChain' checks if assets content file exists in some location, and if so than it copies it to some other location and sets the 'payload.gathered' to true; if content file doesn't exist the message is passed without change;
    5) the router in 'moveContentFilesChain' chcaks of the file was dound and routes either back to 'tasksAssetsWaitingForContentFiles' or to 'tasksAssetsWithContentFilesFound'
    6) if files fo all assets are found the task is aggregated by the 'taskAssetsToTaskAggregator' and passed to 'tasksReadyToProcess' for further processing

    The problem is, that I want to poll 'moveContentFilesChain' in a fixed rate of 3000 ms, as set in default poller, but instead after first poll it falls into a continous loop and there is no delay betwen each nest 'moveContentFilesChain'.

    Please help me correct my config to make the 'moveContentFilesChain' poll every 3000 ms and not continously.

    Thanks in advance,
    Darek

  2. #2
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,840

    Default

    The continuous polling is due to the -1 value for max-messages-per-poll (that means "no limit" to the number of times it will actually call receive() for a given poll interval). If you change that to 1, it will only receive at most 1 message per-poll.

    If you are using that same poller elsewhere, and do want to keep that setting, then simply provide an explicit (non-default) poller sub-element for the chain where you want the "at most one" behavior.

    Hope that helps.
    -Mark

  3. #3
    Join Date
    Jan 2008
    Location
    Mohnton, PA USA (that's near Philadelphia)
    Posts
    2,148

    Default

    Well, your 'max-messages-per-poll' is set to -1 which means it will attempt to receive as many messages per single poll as it can (until receive() returns null). Set max-messages-per-poll to how many messages you want to process per single poll. For example: If you set it to 1, than at most one message will be processed each 3 sec.

  4. #4
    Join Date
    Apr 2009
    Location
    Warsaw, Poland
    Posts
    5

    Default

    Thanks a lot for Your answers. When I set the 'max-messages-per-poll' to 1 I indeed have one poll per 3 seconds, but this means I check only one file each 3 seconds.

    I'd really like to check all files avalable in the 'tasksAssetsWaitingForContentFiles' queue, and then sleep for 3 seconds, and than again check all files available, and so on. Is sth like this possible?

    Basically the use case is, that some other process is providing a package description from a remote resource, that I pick up and read the conents, which is a list of accompanying files that will also soon arrive from that remote location, so I need to wait for all this files before I proceed. The loop is the most problematic for me here.

    Thanks a lot,
    Darek

  5. #5
    Join Date
    Jun 2011
    Posts
    25

    Default

    Have you tried 'fixed-delay' instead of 'fixed-rate'? I have a hazy memory of someone describing the difference, and it was to do with this; but I can find no proper description (so I might be wrong).

    Could we get a line added to the manual guys, expanding on the elusive "As an alternative to 'fixed-rate' you cna also use 'fixed-delay' attribute." [and correct the typo ]

  6. #6
    Join Date
    Apr 2009
    Location
    Warsaw, Poland
    Posts
    5

    Default

    Quote Originally Posted by dnlegge View Post
    Have you tried 'fixed-delay' instead of 'fixed-rate'?
    Tried as You suggested, buy it didn't help - the only difference using 'fixed-delay' is that the timers starts counting from task completition, and not from task start.

    I added expiration check and ended up using a delayer in 'expirationCheckChain':

    HTML Code:
    <task:scheduler id="tasksScheduler" pool-size="10" />
    	
    <task:scheduled-tasks scheduler="tasksScheduler">
        <task:scheduled ref="taskAggregatorMessageStoreReaper" method="run" fixed-rate="1000" />
    </task:scheduled-tasks>
    
    <int:poller id="fixedThreadPoolPoller" default="true" fixed-rate="3000"
            task-executor="fixedThreadPoolTaskExecutor" max-messages-per-poll="-1" />
    
    <int:channel id="tasksIncoming"><int:queue /></int:channel>
    <int:channel id="tasksAssetsWaitingForContentFiles"><int:queue /></int:channel>
    <int:channel id="tasksForExpirationCheck"><int:queue /></int:channel>
    <int:channel id="tasksExpired"><int:queue /></int:channel>
    <int:channel id="tasksAssetsWithContentFilesFound">	<int:queue /></int:channel>
    <int:channel id="tasksReadyToProcess"><int:queue /></int:channel>
    
    <int:splitter id="taskToTaskAssetsSplitter" expression="payload.assets" 
            input-channel="tasksIncoming" output-channel="tasksAssetsWaitingForContentFiles" />    
    
    <int:chain id="moveContentFilesChain" input-channel="tasksAssetsWaitingForContentFiles">
        <int:service-activator ref="filesHandler" method="moveContentFileIfExists" />
        <int:router expression="payload.gathered" >
            <int:mapping value="false" channel="tasksForExpirationCheck" />
            <int:mapping value="true" channel="tasksAssetsWithContentFilesFound" />
        </int:router>
    </int:chain>
    
    <int:chain id="expirationCheckChain" input-channel="tasksForExpirationCheck">
        <int:delayer default-delay="1000" scheduler="tasksScheduler" />
        <int:router expression="payload.expired">
            <int:mapping value="false" channel="tasksAssetsWaitingForContentFiles" />
            <int:mapping value="true" channel="tasksExpired" />
        </int:router>
    </int:chain>
    
    <int:service-activator id="taskExpirationHandler" ref="ingestTaskHandler" method="handleTaskAssetExpiration"
            input-channel="tasksExpired" />
    
    <int:aggregator id="taskAssetsToTaskAggregator" expression="#this[0].payload.task" 
            message-store="taskAggregatorMessageStore" discard-channel="nullChannel"    
            input-channel="tasksAssetsWithContentFilesFound" output-channel="tasksReadyToProcess" />
    Last edited by roadrunn; Aug 26th, 2011 at 06:41 AM.

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •