Along with CountDownLatch
, FutureTask
and Semaphore
, the Concurrency API in Java 5 also introduced a barrier – CyclicBarrier
. This class is used as a barrier for a set of threads, to keep them waiting until all the threads have reached it. Once all the threads reach the barrier, it is tripped, allowing them to perform further execution. The number of threads in WAITING
state, which when reached should trip the barrier, is passed as an argument to the constructor of CyclicBarrier
. A Runnable
action to be performed when the barrier is tripped can also be defined optionally while instantiating CyclicBarrier
. The last thread to enter the barrier executes this action before the other waiting threads are released. If the action throws exception, the barrier is broken and all the other threads receive a checked exception BrokenBarrierException
.
Unlike CountDownLatch
which is a one-time latch, the CyclicBarrier
can be tripped multiple times. A CyclicBarrier
is usually used when you want to execute a task more than once by dividing it into multiple subtasks. A separate thread executes each subtask and thus multiple threads execute the subtasks concurrently. When a thread completes the execution of its subtask, it waits until the barrier is tripped. When all the threads executing subtasks have completed their execution, the barrier is tripped to carry on the execution of next task. Some action also can be performed optionally, when all the threads have completed executing subtasks.
Like CountDownLatch
, this class also provides await()
method which when called by a thread makes it to go in WAITING
state. The thread will keep waiting until the number of waiting threads is equal to the number passed to the CyclicBarrier
constructor or the waiting thread is interrupted or the barrier is broken or reset. If the waiting thread is interrupted by any other thread, then the InterruptedException
is thrown. Like any blocking method, InterruptedException
needs to be handled when await()
is called. If any other thread waiting at the barrier is interrupted, the barrier is broken and other waiting threads receive a BrokenBarrierException
. If a thread is still running when the barrier is broken, it will get this exception when it calls the barrier’s await()
method. The Runnable
action is not executed for a broken barrier.
A timed await(long timeout, TimeUnit unit)
method also exists in CyclicBarrier
. This is used to specify the maximum time period for which a thread can wait for other threads to reach the barrier. If the timeout is elapsed before the barrier trips, the barrier is broken. When the await with timeout is called by a thread, it waits until the barrier is tripped or the timeout is elapsed or the thread is interrupted or the barrier is broken or reset. A checked exception TimeoutException
also needs to be handled for the timed await method. This exception is thrown when the time period is elapsed before the barrier trips. When a thread times out, other threads receive a BrokenBarrierException
.
The number of threads waiting at the barrier can be obtained by calling method getNumberWaiting()
. The number of threads required to trip the barrier can be obtained by calling method getParties()
. The isBroken()
method returns true if the barrier is broken.
A barrier can be reset to its initial state by calling the reset()
method.
Example
An example to demonstrate the use of CyclicBarrier
is giving in Listing 1. The RNG class is a random number generator which generates the number of random numbers equal to the count passed in its constructor. The generated random numbers can be obtained by calling the getResult()
method. The task is to generate 20 random numbers twice. This task is divided within 4 threads where each thread generates 5 random numbers. After all the threads have completed execution, the RandomNumberAssembler
gets the random numbers generated by all threads and prints them. The Executor framework is used to generate the child threads which will generate random numbers. A common CyclicBarrier
instance is shared between all the threads. Once a thread completes its random number generation it waits for the other threads to complete their execution. The last thread to finish the random number generation will cause the barrier to trip when it completes execution.
public class RNG implements Runnable { private Random random = new Random(); private int count; private CclicBarrier barrier; private List<Integer> randomNumbers = new ArrayList<Integer>(); public List<Integer> getResult() { return randomNumbers; } public RNG(int count,CyclicBarrier barrier) { this.count = count; this.barrier = barrier; } public void run() { randomNumbers.clear(); for(int i=1; i <= count; i++) { randomNumbers.add(Math.abs(random.nextInt(100))); } try { barrier.await(); } catch (InterruptedException e) { randomNumbers.clear(); } catch (BrokenBarrierException e) { randomNumbers.clear(); } } } public class RandomNumberAssembler implements Runnable { private List<RNG> rngs; public RandomNumberAssembler(List<RNG> rngs) { this.rngs = rngs; } public void run() { System.out.println("Random numbers............."); for(RNG rng : rngs) { for(Integer randomNumber : rng.getResult()) { System.out.println(randomNumber); } } } } public class CyclicBarrierDemo { public static void main(String[] args) { //generate 20 random numbers using 4 threads twice final int THREAD_COUNT = 4; final int RANDOM_NUMBER_PER_THREAD_COUNT = 5; final int TASK_COUNT = 2; List<RNG> rngs = new ArrayList<RNG>(); RandomNumberAssembler assembler = new RandomNumberAssembler(rngs); CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT,assembler); for(int i=1; i <= THREAD_COUNT; i++) { rngs.add(new RNG(RANDOM_NUMBER_PER_THREAD_COUNT, barrier)); } ExecutorService service = Executors.newFixedThreadPool(THREAD_COUNT); for (int k=1; k <= TASK_COUNT; k++) { for(int i=1; i <= THREAD_COUNT; i++) { service.submit(rngs.get(i-1)); } } service.shutdown(); } }
Listing 1. Generate 20 random numbers twice using 4 threads
Difference between FutureTask and CyclicBarrier
The above example can also be implemented by implementing Callable
on RNG
and using the submit()
method of ExecutorService
which returns the Future
object.
If the timed await method is used in the original example to define the maximum time period for which a thread can wait for other threads to complete execution, similar behavior can be obtained by calling the get()
method of Future
with timeout. However, there is no implicit way to let the other threads know about this timeout. Also, there is no implicit way to avoid the action from being called when a thread times out. Such a behavior can be best implemented using CyclicBarrier
.