<img alt="" src="https://secure.leadforensics.com/150446.png " style="display:none;">
Go to top icon

Synchronizers In Java - CyclicBarrier

Madhura Oak Dec 03, 2012

CyclicBarrier Java Multithreading Concurrency API Technology synchronizers

Along with CountDownLatch, FutureTask and Semaphore, the Concurrency API in Java 5 also introduced a barrier – CyclicBarrier. This class is used as a barrier for a set of threads, to keep them waiting until all the threads have reached it. Once all the threads reach the barrier, it is tripped, allowing them to perform further execution. The number of threads in WAITING state, which when reached should trip the barrier, is passed as an argument to the constructor of CyclicBarrier. A Runnable action to be performed when the barrier is tripped can also be defined optionally while instantiating CyclicBarrier. The last thread to enter the barrier executes this action before the other waiting threads are released. If the action throws exception, the barrier is broken and all the other threads receive a checked exception BrokenBarrierException.

Unlike CountDownLatch which is a one-time latch, the CyclicBarrier can be tripped multiple times. A CyclicBarrier is usually used when you want to execute a task more than once by dividing it into multiple subtasks. A separate thread executes each subtask and thus multiple threads execute the subtasks concurrently. When a thread completes the execution of its subtask, it waits until the barrier is tripped. When all the threads executing subtasks have completed their execution, the barrier is tripped to carry on the execution of next task. Some action also can be performed optionally, when all the threads have completed executing subtasks.

Like CountDownLatch, this class also provides await() method which when called by a thread makes it to go in WAITING state. The thread will keep waiting until the number of waiting threads is equal to the number passed to the CyclicBarrier constructor or the waiting thread is interrupted or the barrier is broken or reset. If the waiting thread is interrupted by any other thread, then the InterruptedException is thrown. Like any blocking method, InterruptedException needs to be handled when await() is called. If any other thread waiting at the barrier is interrupted, the barrier is broken and other waiting threads receive a BrokenBarrierException. If a thread is still running when the barrier is broken, it will get this exception when it calls the barrier’s await() method. The Runnable action is not executed for a broken barrier.

A timed await(long timeout, TimeUnit unit) method also exists in CyclicBarrier. This is used to specify the maximum time period for which a thread can wait for other threads to reach the barrier. If the timeout is elapsed before the barrier trips, the barrier is broken. When the await with timeout is called by a thread, it waits until the barrier is tripped or the timeout is elapsed or the thread is interrupted or the barrier is broken or reset. A checked exception TimeoutException also needs to be handled for the timed await method. This exception is thrown when the time period is elapsed before the barrier trips. When a thread times out, other threads receive a BrokenBarrierException.

The number of threads waiting at the barrier can be obtained by calling method getNumberWaiting(). The number of threads required to trip the barrier can be obtained by calling method getParties(). The isBroken() method returns true if the barrier is broken.

A barrier can be reset to its initial state by calling the reset() method.

Example

An example to demonstrate the use of CyclicBarrier is giving in Listing 1. The RNG class is a random number generator which generates the number of random numbers equal to the count passed in its constructor. The generated random numbers can be obtained by calling the getResult() method. The task is to generate 20 random numbers twice. This task is divided within 4 threads where each thread generates 5 random numbers. After all the threads have completed execution, the RandomNumberAssembler gets the random numbers generated by all threads and prints them. The Executor framework is used to generate the child threads which will generate random numbers. A common CyclicBarrier instance is shared between all the threads. Once a thread completes its random number generation it waits for the other threads to complete their execution. The last thread to finish the random number generation will cause the barrier to trip when it completes execution.

public class RNG implements Runnable {
    private Random random = new Random();
    private int count;
    private CclicBarrier barrier;
    private List<Integer> randomNumbers =
        new ArrayList<Integer>();

    public List<Integer> getResult() {
        return randomNumbers;
    }

    public RNG(int count,CyclicBarrier barrier) {
        this.count = count;
	this.barrier = barrier;
    }

    public void run() {
        randomNumbers.clear();
	for(int i=1; i <= count; i++) {
	    randomNumbers.add(Math.abs(random.nextInt(100)));
	}
	try {
	    barrier.await();
	} catch (InterruptedException e) {
	    randomNumbers.clear();
	} catch (BrokenBarrierException e) {
	    randomNumbers.clear();
        }
    }
}

public class RandomNumberAssembler implements Runnable {
    private List<RNG> rngs;

    public RandomNumberAssembler(List<RNG> rngs) {
        this.rngs = rngs;
    }

    public void run() {
        System.out.println("Random numbers.............");
	for(RNG rng : rngs) {
	    for(Integer randomNumber : rng.getResult()) {
	        System.out.println(randomNumber);
	    }
	}
    }
}

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        //generate 20 random numbers using 4 threads twice
	final int THREAD_COUNT = 4;
	final int RANDOM_NUMBER_PER_THREAD_COUNT = 5;
	final int TASK_COUNT = 2;

	List<RNG> rngs = new ArrayList<RNG>();
	RandomNumberAssembler assembler =
            new RandomNumberAssembler(rngs);
	CyclicBarrier barrier = new
            CyclicBarrier(THREAD_COUNT,assembler);
	for(int i=1; i <= THREAD_COUNT; i++) {
            rngs.add(new
                RNG(RANDOM_NUMBER_PER_THREAD_COUNT,
                barrier));
	}
	ExecutorService service =
            Executors.newFixedThreadPool(THREAD_COUNT);

	for (int k=1; k <= TASK_COUNT; k++) {
	    for(int i=1; i <= THREAD_COUNT; i++) {
	        service.submit(rngs.get(i-1));
	    }
	}
	service.shutdown();
    }
}

Listing 1. Generate 20 random numbers twice using 4 threads

Difference between FutureTask and CyclicBarrier

The above example can also be implemented by implementing Callable on RNG and using the submit() method of ExecutorService which returns the Future object.

If the timed await method is used in the original example to define the maximum time period for which a thread can wait for other threads to complete execution, similar behavior can be obtained by calling the get() method of Future with timeout. However, there is no implicit way to let the other threads know about this timeout. Also, there is no implicit way to avoid the action from being called when a thread times out. Such a behavior can be best implemented using CyclicBarrier.

Similar Blog

e-Zest is a leading digital innovation partner for enterprises and technology companies that utilizes emerging technologies for creating engaging customers experiences. Being a customer-focused and technology-driven company, it always helps clients in crafting holistic business value for their software development efforts. It offers software development and consulting services for cloud computing, enterprise mobility, big data and analytics, user experience and digital commerce.