e-Zest members share technology ideas to foster digital transformation.

The Executor Framework – Part V

Written by Madhura Oak | Apr 2, 2013 4:22:10 PM

So far in the Executor Framework blog post series, I have covered the following topics:

  1. Creating a single worker thread for sequential execution of tasks
  2. Creating a thread pool of fixed number of threads
  3. Creating asynchronous task
  4. Creating cancellable Runnable task
  5. Creating Runnable task with a timeout
  6. Callable interface
  7. creating Callable task
  8. assigning a timeout to ExecutorService after shutdown
  9. use of ExecutorService to submit multiple Callable tasks for bulk execution
  10. assigning a timeout to Callable tasks submitted for bulk execution

In this blog post, I’m writing about:

  1. Executing a single task from a collection of tasks
  2. Creating a Callable task from Runnable task
  3. Cached Thread Pool
  4. ThreadPoolExecutor

Executing a single task from a collection of tasks

The invokeAny(Collection<? extends Callable<T>> tasks) method of ExecutorService submits a collection of Callable tasks for execution and stops after any one task from the collection has executed normally or with exception. The result of the completed task is returned by this method. The remaining tasks which not started are cancelled. If any other task is in execution then the thread running it will throw InterruptedException. Appropriate logic should be implemented in the code to stop the task if InterruptedException is thrown. If the thread running other task cannot be interrupted then the task cannot be cancelled.

The collection of tasks should not be modified while this method is under execution. To ensure this Collections.unmodifiableCollection(tasks) can be called to get the unmodifiable collection.

The tasks can be interrupted while waiting. If any of the task is interrupted this method throws InterruptedException. If no task is completed successfully this method throws ExecutionException.

The invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) method of ExecutorService is used to assign a timeout period for execution. If any of the task is not executed before the timeout period is elapsed then this method throws TimeoutException.

Creating a Callable task from Runnable task

The callable(Runnable task) method of Executors allows a Callable task to be created from a Runnable task. The Callable task returns null after execution. A return result instead of null can be specified by calling the callable(Runnable task, T result) method of Executors.

Cached Thread Pool

The fixed thread pool creates the fixed number of threads and keeps them alive until the ExecutorService is shutdown. The cached thread pool creates new threads and adds them to the pool as they are needed instead of a fixed number of threads. If the previously created threads exist and are available then they are reused. The cached thread pool should be used when multiple concurrent short-lived asynchronous tasks have to be executed. If a thread remains unused for a minute then it is terminated and removed from the pool. Thus no resources are consumed if the cached thread pool is idle for more than a minute. The Executors.newCachedThreadPool() method creates a cached thread pool and returns a ExecutorService.

ThreadPoolExecutor

The ThreadPoolExecutor class is an implementation of ExecutorService. The Executors factory methods provide preconfigured settings and should be preferred instead of creating this class instance directly. If you want fine tuning of thread pools which is not provided by the Executors factory methods then only you should use this class.

There are many constructors of ThreadPoolExecutor class which allow fine tuning. The core and maximum pool size are specified as first and second argument respectively in these constructors. When a task is submitted for execution to ThreadPoolExecutor and if the number of threads is less than the core pool size then a new thread is started and assigned the task even if an existing thread is idle and available.

By default, the core threads are created and started only when a new task is submitted. However, a core thread can be created and started before the task is submitted to the ThreadPoolExecutor by calling its preStartCoreThread() method. This method creates a core thread if the number of core threads is less than the core pool size. If a core thread is started, this method returns true. All core threads can be pre-created and pre-started by calling the preStartAllCoreThreads() method of ThreadPoolExecutor. These threads will be idle till a task is assigned to them.

The maximum pool size specifies the maximum number of threads that can exist in a pool at any time. If the threads running in the pool are equal or greater than the core pool size and the queue which holds the submitted tasks is bounded and full and the number of threads in the pool has not reached the maximum pool size then a new thread is created and added in the pool. The maximum value that can be assigned to the maximum pool size is Integer.MAX_VALUE.

A fixed sized thread pool is created by specifying same number to the core and maximum pool size.

The setCorePoolSize(int size) and setMaximumPoolSize(int size) allow the core and maximum pool size of ThreadPoolExecutor to be changed dynamically.

The third and fourth argument passed in all ThreadPoolExecutor constructors is the keepAliveTime and its TimeUnit respectively. When the number of threads in the pool exceeds the core pool size, the excess threads which are idle are kept alive for the keepAliveTime. If they remain unutilized after their keepAliveTime has elapsed then they are terminated. This keepAliveTime timeout can also be assigned to the core threads by calling the allowCoreThreadTimeOut(true) method of ThreadPoolExecutor. The keepAliveTime should be a non-zero value if this method is called. If false is passed as argument to this method, then the core threads are never terminated until the ThreadPoolExecutor shuts down. The keepAliveTime can be changed later by calling the setKeepAliveTime(long time, TimeUnit unit) method of ExecutorService.

A BlockingQueue<Runnable> which holds the submitted tasks is the fifth argument passed to all ThreadPoolExecutor constructors. If this queue size is less than the core pool size then a new thread is assigned the task and added to the pool. If the number of threads running is more than core pool size, then the submitted tasks are held by the queue until they are assigned to any available thread for execution. Three types of queues that can be used – direct handoffs, unbounded and bounded queues.

SynchronousQueue is the default queue used when the factory methods of Executors are called to create an ExecutorService. This queue directly hands off a task to an available thread whenever a task is submitted for execution. If no thread is available then a new thread is constructed provided the number of threads in the pool is less than maximum pool size. If the number of threads in the pool exceeds maximum pool size then the submitted task is rejected according to the rejection policy.

When unbounded queue such as LinkedBlockingQueue is used to hold the submitted tasks, the number of threads will never grow beyond the core pool size. The new tasks will wait in the queue as long as a core thread becomes available for executing it. When the tasks are submitted faster than they are processed, the size of the queue will keep growing.

When a bounded queue such as ArrayBlockingQueue is used to hold the submitted tasks a new thread is created and added to the pool when the following conditions are true:

  1. the queue is full
  2. no core thread is available for execution
  3. the number of threads is more than the core pool size
  4. the number of threads is less than the maximum pool size

Tuning of ThreadPoolExecutor, which uses bounded queue, is difficult and should be done only by an expert programmer.

If the number of threads reach the maximum pool size while using bounded queue then the submitted task is rejected. The ThreadPoolExecutor.AbortPolicy is the rejection policy used by default and it throws RejectedExecutionException.

Other policies for handling rejected tasks can be specified by using the appropriate constructor of ThreadPoolExecutor.