e-Zest members share technology ideas to foster digital transformation.

The Executor Framework – Part IV

Written by Madhura Oak | Feb 25, 2013 12:49:32 PM

In the Part III of Executor Framework blog post series, I had written about Callable interface, creating Callable task and assigning a timeout to ExecutorService after shutdown.

In this blog post, I am writing about use of ExecutorService to submit multiple Callable tasks for bulk execution and assigning a timeout to Callable tasks submitted for bulk execution.

Submitting Callable tasks for bulk execution

The invokeAll(Collection<? extends Callable<T>> tasks) method of ExecutorService enables submitting a collection of Callable<T> tasks for execution. This method returns List<Future<T>> which contains Future<T> objects to get the result of all Callable<T> tasks after all the tasks in the collection are executed. The Callable tasks submitted to this method may either be executed normally or may have thrown an exception.

The order of Future<T> objects in the returned list is same as the Callable<T> tasks in the collection. The get() method of Future<T> can be called to get its return value. If the task threw an exception during execution, the get() method throws ExecutionException.

The collection submitted to this method should not be modified after it is called. To ensure this, the Collections.unmodifiableCollection(Collection<? extends T> c) method can be called to get an unmodifiable collection which can be passed to the invokeAll method.

The thread which calls the invokeAll method can be interrupted while waiting. If it is interrupted, it throws InterruptedException and cancels all unfinished tasks.

A Callable task which computes the sum of numbers is given in Listing 1.

public class ComputeSum implements Callable<Integer> {
    private List<Integer> numbers;

    public void setNumbers(List<Integer> numbers) {
        this.numbers = numbers;
    }

    public Integer call() throws Exception {
        int sum = 0;
	for(Integer number : numbers) {
	    System.out.println("Adding number : " + number);
	    sum += number;
	}
	return sum;
    }
}

Listing 1. A Callable task which computes sum of numbers

Listing 2 shows a code which submits a collection of Callable tasks to ExecutorService for bulk execution.

//Generate 30 random numbers in the range of 0 to 100
Random randomGenerator = new Random();
List<Integer> numbers = new ArrayList<Integer>();
for(int i=1; i <= 30; i++) {
    numbers.add(randomGenerator.nextInt(100));
}

ExecutorService service = Executors.newSingleThreadExecutor();

//create three tasks and assign the numbers to them
ComputeSum task1 = new ComputeSum();
ComputeSum task2 = new ComputeSum();
ComputeSum task3 = new ComputeSum();

//first task computes sum of 1 through 10 numbers in the list
task1.setNumbers(numbers.subList(0, 10));

//second task computes sum of 11 through 20 numbers in the list
task2.setNumbers(numbers.subList(10, 20));

//third task computes sum of remaining numbers
task3.setNumbers(numbers.subList(20, 30));

boolean allTasksCompletedNormally = true;

//create a collection of tasks
Collection<ComputeSum> collection = new ArrayList<ComputeSum>();
collection.add(task1);
collection.add(task2);
collection.add(task3);

//submit the tasks for execution using unmodifiable collection
try {
    List<Future<Integer>> futures =
        service.invokeAll(
        Collections.unmodifiableCollection(collection));

    int sum = 0;

    //Get the result of all tasks
    for(Future<Integer> future : futures) {
        try {
	    sum += future.get();
	}
	catch (ExecutionException e) {
	    allTasksCompletedNormally = false;
	}
    }

    //print sum only if all tasks are executed normally
    if(allTasksCompletedNormally) {
        System.out.println("Sum = " + sum);
    }
    else {
        System.out.println("Some task threw an exception. ");
    }
}
catch (InterruptedException e) {
    e.printStackTrace();
}

Listing 2. Code which submits multiple Callable tasks for bulk execution

Assigning a timeout for all Callable tasks

The invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) method of ExecutorService can be called to assign a maximum time period to wait for all the tasks to complete execution. This method stops execution and returns a List when all Callable tasks in the collection are executed normally or with exception. It throws CancellationException when the timeout period elapses before completion of all tasks. The thread calling this method can be interrupted to stop execution of tasks. If the thread is interrupted, this method throws InterruptedException. The execution of tasks is stopped when CancellationException or InterruptedException is thrown by this method.