e-Zest members share technology ideas to foster digital transformation.

The Executor Framework – Part VII

Written by Madhura Oak | Oct 7, 2013 1:14:01 PM

In the last blog post on Executor Framework, I had written about ThreadPoolExecutor. In this blog post, I’m writing about ScheduledThreadPoolExecutor and ForkJoinPool.

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor like ThreadPoolExecutor is a class of Concurrency API. It is an implementation of ScheduledExecutorService interface and is a subclass of ThreadPoolExecutor.

The ScheduledExecutorService provides methods to execute Callable or Runnable tasks periodically or after a certain delay. An initial delay can also be specified while executing these tasks. The schedule(Runnable command, long delay, TimeUnit unit) and schedule(Callable<V> callable, long delay, TimeUnit unit) methods can be called for giving a minimum time delay before the execution of task starts. The task is executed only once. The former one allows defining a Runnable task and latter one allows defining a Callable task. Both the methods return ScheduledFuture<V>.

The scheduleAtFixedRate method of ScheduledExecutorService allows execution of a Runnable task after a fixed time period periodically. An initial delay can also be specified with the help of this method. The task starts running after the initial delay is elapsed. In subsequent execution, it runs periodically after the specified period. Thus it runs at regular intervals. If the task throws exception then the subsequent executions do not occur but the task is not terminated. The task will terminate only after the executor is shutdown. If an execution of a task takes longer time then the subsequent execution will start late. They will never execute concurrently.

The scheduleWithFixedDelay method of ScheduledExecutorService allows execution of a Runnable task after the given delay periodically. The task starts running after the specified initial delay. In subsequent execution, it runs periodically once the specified time delay after the previous execution is elapsed. Thus it runs after periodic delay. If a task execution throws exception then the subsequent executions do not occur.

The newScheduledThreadPool(int corePoolSize) method of Executors factory class can be called to create an instance of ScheduledExecutorService.

The shutdown() method can be called on the ScheduledExecutorService instance to stop the executor. After this method is called, the executor will not schedule any new tasks for execution. The setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) if set to true will allow the tasks to execute even after shutdown() is called on the executor. The only way to shutdown the executor then is to set this policy to false or call shutdownNow() method. Similarly, the setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) is used to define whether the existing delayed tasks should run after shutdown() is called. The default values of both these properties is false.

The getQueue() method of ScheduledThreadPoolExecutor returns the task queue used by this executor. The return type of this method is BlockingQueue<Runnable>. The order in which the tasks are saved in this queue does not determine their execution order.

Java 7 has added a getter-setter method on this class – setRemoveOnCancelPolicy(boolean value) and its getter. If this property is set to true then the cancelled tasks are immediately removed from the work queue at the time of cancellation. The default value of this property is false. The cancel() method of Future is called to cancel a task.

ForkJoinPool

Java 7 also introduced classes such as ForkJoinPool, RecursiveTask and RecursiveAction in Concurrency API for implementing fork-join.

ForkJoinPool class uses work stealing algorithm to run worker threads which can steal a task from other threads when the latter are busy. This class allows submission of ForkJoinTask tasks for execution and runs with a parallelism which defaults to the number of available processors – Runtime.availableProcessors(). The desired parallelism level can also be specified by using its constructor.

A ForkJoinTask instance is like a thread but light-weight than a normal thread. It is also a lightweight form of Future. Both RecursiveTask and RecursiveAction are abstract subclasses of abstract class ForkJoinTask. The RecursiveTask returns a result whereas RecursiveAction is resultless. Both RecursiveTask and RecursiveAction implementations should override compute() method which defines the computation performed by the task.

The fork() method of ForkJoinTask asynchronously executes the task. A task should not be forked more than once unless it has been completed or reinitialized. For the tasks to run asychronously, this method should not use synchronized blocks or call synchronized methods.

The join() method of ForkJoinTask returns the result of computation. Abnormal execution returns RuntimeException or Error unlike the ExecutionException thrown by get().

The submit(ForkJoinTask<T> task) method of ForkJoinPool can be called for submitting the task for execution or its invoke(ForkJoinTask<T> task) method can be called to execute the task. The invoke method returns the result whereas the submit method returns ForkJoinTask<T> instance. The join() method can be called on this instance to get the return value.

A simple example which demonstrates the use of RecursiveTask is given in Listing 1. This example computes the sum of the numbers in the array using fork-join.

public class CalculateSum extends RecursiveTask<Integer> {
	private int[] numbers;
	private int startIndex, endIndex;
	public CalculateSum(int[] numbers, int startIndex,
                           int endIndex) {
		this.numbers = numbers;
		this.endIndex = endIndex;
		this.startIndex = startIndex;
	}

	/**
	 * Compute the sum of numbers in array from startIndex to
         * endIndex
	 */
	protected Integer compute() {
		if((endIndex - startIndex) < 3) {
			//perform sequential sum if the count of numbers
                       //is less than 3
			return sequentialSum();
		}
		else {
			int split = (endIndex - startIndex)/2;
			CalculateSum leftHandSide = new
                                 CalculateSum(numbers, startIndex,
                                 startIndex+split);
			leftHandSide.fork();
			CalculateSum rightHandSide = new
                                CalculateSum(numbers,
                                startIndex+split+1, endIndex);
			rightHandSide.fork();
			int lhsSum = leftHandSide.join();
			int rhsSum = rightHandSide.join();
			return lhsSum + rhsSum;
		}
	}	

	/**
	 * Calculate sum of numbers in array sequentially
	 */
	private int sequentialSum() {
		int sum = 0;
		for(int i=startIndex; i <= endIndex; i++) {
			sum+= numbers[i];
		}
		return sum;
	}
}

public class RecursiveTaskDemo {
	public static void main(String[] args) {
		CalculateSum finder = new CalculateSum(
                new int[] {29,46,89,48,36,22,21,78,67,23,99,10,100,102,56,28,37,35,98,54,83,72,51},0,22);
		ForkJoinPool pool = new ForkJoinPool();
		System.out.println(pool.invoke(finder)); //prints 1284
	}
}

Listing 1. Fork/Join example

Java 8 uses fork/join framework in Streams and in java.util.Arrays parallelSort() methods.

I would be writing more about the Fork/Join Framework in my next blog posts.