So far in the Executor Framework blog post series, I have covered the following topics:
Runnable
taskRunnable
task with a timeoutCallable
interfaceCallable
taskExecutorService
after shutdownExecutorService
to submit multiple Callable tasks for bulk executionCallable
tasks submitted for bulk executionIn this blog post, I’m writing about:
Callable
task from Runnable taskThreadPoolExecutor
Executing a single task from a collection of tasks
The invokeAny(Collection<? extends Callable<T>> tasks)
method of ExecutorService
submits a collection of Callable
tasks for execution and stops after any one task from the collection has executed normally or with exception. The result of the completed task is returned by this method. The remaining tasks which not started are cancelled. If any other task is in execution then the thread running it will throw InterruptedException
. Appropriate logic should be implemented in the code to stop the task if InterruptedException
is thrown. If the thread running other task cannot be interrupted then the task cannot be cancelled.
The collection of tasks should not be modified while this method is under execution. To ensure this Collections.unmodifiableCollection(tasks)
can be called to get the unmodifiable collection.
The tasks can be interrupted while waiting. If any of the task is interrupted this method throws InterruptedException
. If no task is completed successfully this method throws ExecutionException
.
The invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
method of ExecutorService
is used to assign a timeout period for execution. If any of the task is not executed before the timeout period is elapsed then this method throws TimeoutException
.
Creating a Callable task from Runnable task
The callable(Runnable task)
method of Executors
allows a Callable
task to be created from a Runnable
task. The Callable
task returns null
after execution. A return result instead of null can be specified by calling the callable(Runnable task, T result)
method of Executors
.
Cached Thread Pool
The fixed thread pool creates the fixed number of threads and keeps them alive until the ExecutorService
is shutdown. The cached thread pool creates new threads and adds them to the pool as they are needed instead of a fixed number of threads. If the previously created threads exist and are available then they are reused. The cached thread pool should be used when multiple concurrent short-lived asynchronous tasks have to be executed. If a thread remains unused for a minute then it is terminated and removed from the pool. Thus no resources are consumed if the cached thread pool is idle for more than a minute. The Executors.newCachedThreadPool()
method creates a cached thread pool and returns a ExecutorService
.
ThreadPoolExecutor
The ThreadPoolExecutor
class is an implementation of ExecutorService
. The Executors
factory methods provide preconfigured settings and should be preferred instead of creating this class instance directly. If you want fine tuning of thread pools which is not provided by the Executors
factory methods then only you should use this class.
There are many constructors of ThreadPoolExecutor
class which allow fine tuning. The core and maximum pool size are specified as first and second argument respectively in these constructors. When a task is submitted for execution to ThreadPoolExecutor
and if the number of threads is less than the core pool size then a new thread is started and assigned the task even if an existing thread is idle and available.
By default, the core threads are created and started only when a new task is submitted. However, a core thread can be created and started before the task is submitted to the ThreadPoolExecutor
by calling its preStartCoreThread()
method. This method creates a core thread if the number of core threads is less than the core pool size. If a core thread is started, this method returns true. All core threads can be pre-created and pre-started by calling the preStartAllCoreThreads()
method of ThreadPoolExecutor
. These threads will be idle till a task is assigned to them.
The maximum pool size specifies the maximum number of threads that can exist in a pool at any time. If the threads running in the pool are equal or greater than the core pool size and the queue which holds the submitted tasks is bounded and full and the number of threads in the pool has not reached the maximum pool size then a new thread is created and added in the pool. The maximum value that can be assigned to the maximum pool size is Integer.MAX_VALUE
.
A fixed sized thread pool is created by specifying same number to the core and maximum pool size.
The setCorePoolSize(int size)
and setMaximumPoolSize(int size)
allow the core and maximum pool size of ThreadPoolExecutor
to be changed dynamically.
The third and fourth argument passed in all ThreadPoolExecutor
constructors is the keepAliveTime
and its TimeUnit
respectively. When the number of threads in the pool exceeds the core pool size, the excess threads which are idle are kept alive for the keepAliveTime
. If they remain unutilized after their keepAliveTime
has elapsed then they are terminated. This keepAliveTime
timeout can also be assigned to the core threads by calling the allowCoreThreadTimeOut(true)
method of ThreadPoolExecutor
. The keepAliveTime
should be a non-zero value if this method is called. If false is passed as argument to this method, then the core threads are never terminated until the ThreadPoolExecutor
shuts down. The keepAliveTime
can be changed later by calling the setKeepAliveTime(long time, TimeUnit unit)
method of ExecutorService
.
A BlockingQueue<Runnable>
which holds the submitted tasks is the fifth argument passed to all ThreadPoolExecutor
constructors. If this queue size is less than the core pool size then a new thread is assigned the task and added to the pool. If the number of threads running is more than core pool size, then the submitted tasks are held by the queue until they are assigned to any available thread for execution. Three types of queues that can be used – direct handoffs, unbounded and bounded queues.
SynchronousQueue
is the default queue used when the factory methods of Executors
are called to create an ExecutorService
. This queue directly hands off a task to an available thread whenever a task is submitted for execution. If no thread is available then a new thread is constructed provided the number of threads in the pool is less than maximum pool size. If the number of threads in the pool exceeds maximum pool size then the submitted task is rejected according to the rejection policy.
When unbounded queue such as LinkedBlockingQueue
is used to hold the submitted tasks, the number of threads will never grow beyond the core pool size. The new tasks will wait in the queue as long as a core thread becomes available for executing it. When the tasks are submitted faster than they are processed, the size of the queue will keep growing.
When a bounded queue such as ArrayBlockingQueue
is used to hold the submitted tasks a new thread is created and added to the pool when the following conditions are true:
Tuning of ThreadPoolExecutor
, which uses bounded queue, is difficult and should be done only by an expert programmer.
If the number of threads reach the maximum pool size while using bounded queue then the submitted task is rejected. The ThreadPoolExecutor.AbortPolicy
is the rejection policy used by default and it throws RejectedExecutionException
.
Other policies for handling rejected tasks can be specified by using the appropriate constructor of ThreadPoolExecutor
.