PDA

View Full Version : ThreadPoolTaskExecutor queue problem


kovy
Mar 8th, 2007, 11:00 AM
hey,

I have create a threadpool task executor. Now each time i execute this task i get a rejectedExecutionException. The last part (bottom of message) i execute it multiple times. Did i configure the treadpooltask executor wrong?


Here is my configiration:

<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPo olTaskExecutor">
<property name="corePoolSize" value="20" />
<property name="maxPoolSize" value="30" />
<property name="queueCapacity" value="50" />
<property name="keepAliveSeconds" value="600" />
</bean>

<bean id="sendMailTask" class="eu.trade.mmf.mail.thread.SendMailTask">
<constructor-arg ref="taskExecutor" />
</bean>

Here is my class:

public class SendMailTask {

private class MMFMailThread implements Runnable {


public MMFMailThread() {
}

/**
* {@inheritDoc}
*
* @see java.lang.Thread#run()
*/
public void run() {
sendMails();
}


public void sendMails() {
// process code
}

}

private ThreadPoolTaskExecutor taskExecutor;

public SendMailTask(ThreadPoolTaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
this.taskExecutor.setQueueCapacity(300);
this.taskExecutor.setKeepAliveSeconds(600);
}

public void sendMail(SendServer server, List<Mail> list) {
taskExecutor.execute(new MMFMailThread());
}


}


Here is my call :
task = (SendMailTask) MMFIBatis.getContext().getBean("sendMailTask");
task.sendMail();

Alarmnummer
Mar 8th, 2007, 11:07 AM
Could it be that the queue is full?

The default policy with the java.util.concurrent.ThreadPoolExecutor when the queue is full, is to reject tasks.

[edit]
There is room for improvement in your design.

Check out the following:


public interface SendMailService{
void sendAll(List<Mail> mailList);
}

class SendMailServiceImpl implements SendmailService{
private final SendServer;
//constructor with the given sendServer
public void sendAll(List<Mail> mailList){
....
}
}

//could be an internal class of Foo (see below)
class SendMailTask implements Runnable{
private List<Mail> mailList;
//constructor with the given mailList
void run(){
}
}

And now the calling part:
class Foo{
private final SendMailService sendMailService;
private final Executor executor;
//constructor with sendmailService and the executor

void foo(){
List<Mail> mailList = ...;
SendMailTask task = new SendMailTask(sendMailService);
executor.execute(task);
}
}

kovy
Mar 8th, 2007, 11:24 AM
hey, this seems most likely the reason, but is something wrong with the configuration?

Alarmnummer
Mar 8th, 2007, 11:31 AM
hey, this seems most likely the reason, but is something wrong with the configuration?

If the configuration wasn't correct, Spring would complain when the application started up.

But try to figure out if the queue is full. If you are adding tasks faster than they are processed, the queue could get full (there is a maximum capacity).

ballsuen
Jul 7th, 2007, 10:07 AM
I need to process a list of work items concurrently. But I could have over a million of work items in a batch so setting a queue capacity won't work.

Is there a bette approach for this problem?

Alarmnummer
Jul 7th, 2007, 10:58 AM
Yes, use an executor that blocks. I really find it a shame that this functionality is missing in the standard ThreadPoolExecutor.

I have added a version of the blocking Executor to Prometheus (http://prometheus.codehaus.org), the library I'm working on:
http://prometheus.codehaus.org/javadoc/main/org/codehaus/prometheus/blockingexecutor/BlockingExecutor.html

But you could also create a blocking executor by overriding the execute method and place the task on the BlockingQueue with a put (instead of an offer with 0 timeout)

Something to watch out for:
if you are using a single transaction to read everything from the database, this could lead to problems (like running out of undo space, or a database with terrible performance because the locks have escalated).

ballsuen
Jul 7th, 2007, 01:33 PM
Thanks ! but which type of executor would block?

Alarmnummer
Jul 7th, 2007, 02:29 PM
You have to modify the java.util.concurrent.ThreadPoolExecutor by overriding the execute method and place a task with a put instead of an offer with a zero timeout on the workqueue.

Or you could use the BlockingExecutor (and the ThreadPoolBlockingExecutor as standard implementation) from Prometheus. The primary reason of existence of these classes was to create an Executor that is able to block.

ballsuen
Jul 9th, 2007, 02:16 PM
Since I want my queue to be unbounded, can using newFixedThreadPool solve my problem?

Alarmnummer
Jul 9th, 2007, 02:26 PM
Since I want my queue to be unbounded
I don't know in what kind of environment your application is running, but if it is an enterprise environment, you don't want to leave your queues unbounded. If the system is under load, it could lead to a crash (out of memory error for example).

So unbounded queues are a no no :)


can using newFixedThreadPool solve my problem?
Nope. You want a ThreadPoolExecutor that blocks and it doesn't do that out of the box. You can modify (subclass) the ThreadPoolExecutor and override the execute method and place the task on the workqueue (I have done so in the past). See the ScheduledThreadPoolExecutor for more detailed information (they are also playing with the workqueue).

ballsuen
Jul 9th, 2007, 02:39 PM
Yes, it is an enterprise environment. Actually, the number of entries is a finite set (~ 1 million for batch execution) and will not grow endlessly. But I don't want to put a max to the queue because the number of entries can grow as the business grows.

I thought that the max. resources consumed in the system should be limited by the max. number of threads in the thread pool. (provided that the entries in the queue won't take much memory). Is it not true?

Why can't newFixedThreadPool help my case?

Sorry that my questions might sound naive as I'm now to the Concurrent library. And thanks for your answer!

Alarmnummer
Jul 9th, 2007, 02:50 PM
Yes, it is an enterprise environment. Actually, the number of entries is a finite set (~ 1 million for batch execution) and will not grow endlessly. But I don't want to put a max to the queue because the number of entries can grow as the business grows.

What you normally do is make the queue bounded. So when an item is placed on that queue, and the queue if full, the call blocks until space is available in that queue.

So you would expect that the executor (that uses blockingqueue as workqueue) also would block when task is submitted for execution (by the execute method) .

But the problem is that the execute doesn't do a BlockingQueue.put but a BlockingQueue.offer with a zero timeout (see the documentation of the BlockingQueue for the differences between these methods). This means that when the queue is full, the call doesn't block (blocking is the behavior you want) but directly calls the rejectedexecutionhandler! And standard a handler is used the throws the RejectedExecutionException.

So current situation:
when queue full -> rejectedexecutionexception

Desired situation:
when queue full -> block

We only can get to the second situation if you modify the ThreadPoolExecutor so that it doesn't do an blockingqueue.offer, but a blockingqueue.put. You could check the sourcecode of the ThreadPoolExecutor for more information.


I thought that the max. resources consumed in the system should be limited by the max. number of threads in the thread pool. (provided that the entries in the queue won't take much memory).

Is it not true?

It depends. What happens when a new batch starts, and the previous batch hasn't completed? Are you 100% sure you can always control the maximum size of the batch and the number of overlapping batches?


Why can't newFixedThreadPool help my case?

Because there is no functionality that changes the blockingqueue.offer to blockingqueue.put.


Sorry that my questions might sound naive as I'm now to the Concurrent library. And thanks for your answer!
No problem.. Concurrency control is my favorite subject :)

ballsuen
Jul 10th, 2007, 12:05 AM
JMS is providing similar producer and consumer pattern as the ThreadPoolTaskExecutor. Based on what condition shall I choose either one of these solution?

I have additional requirement in which some entries on my queue have higher priority than others. (They must be executed ASAP once they are on the queue.) I also require transaction management when executing my tasks.

Can either JMS/TaskExecutor solve my problem? Or I need to create a separate pool/executor for handling high prirotiy task?

Alarmnummer
Jul 10th, 2007, 01:18 AM
JMS is providing similar producer and consumer pattern as the ThreadPoolTaskExecutor. Based on what condition shall I choose either one of these solution?

I would not pick JMS just for blocking behavior. I'll see if I can find a piece of code where I enhanced the ThreadPoolExecutor so it blocks.


I have additional requirement in which some entries on my queue have higher priority than others. (They must be executed ASAP once they are on the queue.)

If you use a ThreadPoolExecutor, you can use a PriorityBlockingQueue. The big problem with this implementation is that is unbounded. So it is quite useless in an enterprise environment. I'm thinking about creating a 'AsGoodAsItGetsPriorityBlockingQueue'. It will order the items in the BlockingQueue, but items that are pending to be placed, that have a even higher priority, are ignored until space gets available.


I also require transaction management when executing my tasks.

That should not be any problem for the normal Executor. I usually create some kind of service that is called from the task, that is transactional.


class FooService{
@transactional
void foo(long fooId){}
}

class FooServiceRunnable implements Runnable{
FooService fooService;
long fooId;

void run(){
fooService.foo(fooId);
}
}



Can either JMS/TaskExecutor solve my problem?

I guess it can be build in JMS.


Or I need to create a separate pool/executor for handling high prirotiy task?
That would be a solution for bypassing the limitations of the PriorityBlockingQueue.

ballsuen
Jul 10th, 2007, 01:56 AM
So under what situation shall I use MDP?

Alarmnummer
Jul 10th, 2007, 02:19 AM
I don't know.

I'm currently working on Prometheus: a concurrency library with functionality I found missing in the java.util.concurrent library (BlockingExecutors for example). So as soon as I need something I can't find in the java.util.concurrent library, I'll add it. I just created a JIRA issue for a bounded blockingqueue.

Prometheus also provides functionality for setting up high volume data processing applications. It extracts most plumbing like threading, blocking, exceptionhandling, monitoring etc and lets you focus on the core business problems. But most of that functionality is still in the sandbox and won't be released before 0.2 (I'm currently at 0.1rc1).

ballsuen
Jul 19th, 2007, 01:17 PM
Do I require J2SE 5 to run ThreadPoolTaskExecutor in Spring?

Alarmnummer
Jul 19th, 2007, 01:31 PM
Yes:

see
http://www.springframework.org/docs/api/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.html

But there are other implementations that work under java 1.4, for example:
http://www.springframework.org/docs/api/org/springframework/scheduling/quartz/SimpleThreadPoolTaskExecutor.html

ballsuen
Jul 19th, 2007, 01:44 PM
What kind of queue does ThreadPoolTaskExecutor implement? Bound or unbound? Block or not?


Is there any way to pause the threadpool and the threads submitted by it?

Alarmnummer
Jul 19th, 2007, 02:15 PM
What kind of queue does ThreadPoolTaskExecutor implement? Bound or unbound? Block or not?

As far as I know none of them are bound.


Is there any way to pause the threadpool and the threads submitted by it?
which side do you want to pause? Placement of tasks (so the outstanding tasks are processed and the workers eventually pause) or taking of tasks (let every workers stop as soon as it has returned from his current job assignment).

Pausing would not be that difficult (you could use some structure that can be opened or closed) and make sure that the taking/placing of items always calls this structure before they do anything. You could create a blocking queue with pausing behavior, but it would only work for the taking of items, and not for the placement of items unless you break the contract of the offer method.

Why do you want to pause btw?

madhav288
Nov 27th, 2007, 07:21 PM
It is strange that ThreadPoolExecutor class does not block in execute method. Did you report this in Java bugzilla? I would love to report it in there, if its not already reported.

Thanks.

ballsuen
Nov 29th, 2007, 06:30 AM
No, I didn't realize the Java bugzilla. It would be great if you can report that.