Page 3 of 4 FirstFirst 1234 LastLast
Results 21 to 30 of 39

Thread: DelayQueue contribution

  1. #21

    Default

    I like the idea to make delayer an endpoint. It's very disturbing to cheat on channel and wrap messages with Delayed. Actually, it breaks the contract, as QueueChannel accepts in constructor BlockedQueue<Message> and not BlockedQueue<? extends Message> for a reason. I had to erase type in my super(new DelayQueue()) to get around this.

  2. #22

    Default

    As we did not decide on a better solution, I created a patch for the interceptor option. The patch includes simple delay queue with delay configured at channel level and the message header variant, where every message has a header with a time stamp. Please review: http://jira.springframework.org/secu...layQueue.patch

    P.S. The patch includes unit tests.

  3. #23
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,844

    Default

    Thanks for adding the attachment. For now I've assigned the issue to 2.0-M1 since a) it's essentially a new feature and b) we need to be sure about the channel vs. endpoint implementation. I'm still not sure which is better at the moment, but I think experimenting with the options a bit will clarify that. If all goes well, we may be able to bump this to 1.0.3.

  4. #24
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,844

    Default

    Andrew I have an alternate implementation that is now based on our new option for passing a TaskExecutor for a channel.

    Basically, I implemented a DelayedExecutor that implements Spring's TaskExecutor and accepts a delay value. Internally, it delegates to a ScheduledExecutorService in its execute(Runnable) method.

    Therefore, when you send(Message) it will delay based on the interval and then invoked the handler.

    So, I'm wondering... does this meet your needs?

    Let me know if you have any questions.

    -Mark

  5. #25

    Default

    Mark, I cannot find any file with "Delay" in name in trunk. Where can see your implementation?

  6. #26
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,844

    Default

    Andrew,

    Sorry for the confusion; I did not commit this code yet.

    It's basically like this:
    Code:
    public class DelayedExecutor implements TaskExecutor {
    
        private volatile long defaultDelay;
    
        private volatile String delayHeaderName;
    
        private final ScheduledExecutorService scheduler;
    
    
        public DelayedExecutor(long defaultDelay) {
            this(defaultDelay, null);
        }
    
        public DelayedExecutor(long defaultDelay, ScheduledExecutorService scheduler) {
            this.defaultDelay = defaultDelay;
            this.scheduler = (scheduler != null ? scheduler : Executors.newScheduledThreadPool(1));
        }
    
        public void setDefaultDelay(long defaultDelay) {
            this.defaultDelay = defaultDelay;
        }
    
        public void execute(Runnable task) {
            Long delay = null;
            if (task instanceof MessageAwareRunnable && this.delayHeaderName != null) {
                Message<?> message = ((MessageAwareRunnable) task).getMessage();
                delay = message.getHeaders().get(this.delayHeaderName, Long.class);
            }
            if (delay == null) {
                delay = this.defaultDelay;
            }
            this.scheduler.schedule(task, delay, TimeUnit.MILLISECONDS);  
        }
    
    }

  7. #27

    Default

    I like this solution, but have one minor note about delayHeader property. IMO it does not cover all cases. What do you think about a strategy accepting a Message and returning the delay?

  8. #28
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,844

    Default

    Andrew,

    I think the role of populating the header from Message content should be handled by a Transformer that would be invoked prior to sending the Message to the queue. That is consistent with several other components in the framework where the receiver only checks for a header value without any awareness of how it may have been populated upstream (e.g. the outbound file adapters check for a filename in a header).

    Does that make sense?
    -Mark

  9. #29

    Default

    Yes, transformer can do the job also. I thought about a strategy because I liked how the aggregator allows correlation strategy and completeness strategy to be specified on the endpoint itself without extra endpoints.

    Are you going to extend the configuration schema to support delay executor? I suppose the schema goes to be changed at least for executor per channel configuration.

  10. #30
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,844

    Default

    Andrew,

    In 1.0.3, we'll have a "task-executor" reference option for channels. We might not introduce the delayed executor until 2.0, but I could at least commit it in the sandbox.

Tags for this Thread

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •