文章目录
- 简述
- 内部机制
- 构造函数
- 使用案例
- 异常处理
简述
CyclicBarrier
是另一个用于协调多个线程之间操作的同步辅助类,它允许一组线程互相等待彼此到达一个共同的屏障点(barrier)。与 CountDownLatch
不同的是,CyclicBarrier
可以被重置并且可以重复使用13。这意味着一旦所有参与的线程都到达了屏障点并继续执行后,CyclicBarrier
可以再次用于下一轮的同步操作。这使得 CyclicBarrier
在循环任务或需要多次同步的情况下非常有用。
CyclicBarrier
提供了一个构造函数来指定参与的线程数量,并且可以选择性地提供一个当最后一个线程到达屏障时要执行的动作。这个动作将在所有线程到达之前但在任何线程继续前进之后被执行。如果任意一个线程在等待过程中被打断或失败,那么所有的线程都会收到一个 BrokenBarrierException
异常。
内部机制
CyclicBarrier 的内部实现依赖于计数器和条件变量(通常是锁和相关的等待/通知机制)。每当一个线程调用了 await() 方法时,它会首先检查计数器是否达到了创建 CyclicBarrier 时指定的“阈值”(即需要等待的线程数)。如果计数器尚未达到阈值,线程会被阻塞并等待其他线程的到来。当另一个线程也调用了 await() 方法并且计数器达到了阈值时,所有等待在屏障点的线程都会被唤醒,并继续执行。此时,计数器会被重置为0,屏障进入下一轮使用。
此外,CyclicBarrier 还提供了一个可选的 Runnable 参数。当所有线程都到达屏障点时,这个 Runnable 任务会在最后一个到达屏障点的线程中执行。这通常用于进行一些额外的初始化、汇总或清理工作。
构造函数
CyclicBarrier 提供了两个构造函数:
- CyclicBarrier(int parties):创建一个新的 CyclicBarrier 实例,该实例将在给定数量的参与者(线程)到达后触发。
- CyclicBarrier(int parties, Runnable barrierAction):除了指定参与者数量外,还指定了一个当所有参与者都到达屏障点时要执行的动作。
使用案例
一个典型的 CyclicBarrier 使用场景是多线程计算数据,当所有线程完成各自的计算后,在 CyclicBarrier 回调线程中合并计算结果。例如,统计用户每个季度的银行流水情况,可以先使用多线程分别统计每个月的数据,之后再将这些数据汇总起来得到整个季度的结果。
public class CyclicBarrierTest implements Runnable {private CyclicBarrier cb = new CyclicBarrier(3, this); // 创建CyclicBarrier实例,屏障数据设为3,处理完之后执行当前类的run方法private Executor executor = Executors.newFixedThreadPool(3); // 创建线程池,只有三个月的数据,所以只需三个线程private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<>(); // 保存每个sheet计算出的结果public void count() {for (int i = 0; i < 3; i++) {executor.execute(() -> {// 模拟复杂逻辑处理代码sheetBankWaterCount.put(Thread.currentThread().getName(), 1);try {cb.await(); // 线程完成工作后调用await设置屏障} catch (BrokenBarrierException | InterruptedException e) {e.printStackTrace();}});}}@Overridepublic void run() {int res = 0;for (Map.Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) {res += sheet.getValue();}sheetBankWaterCount.put("result", res);System.out.println(res);}public static void main(String[] args) {CyclicBarrierTest test = new CyclicBarrierTest();test.count(); // 注意,此时不需要调用test.run,最后一个await方法会调用run方法}
}
在这个例子中,我们创建了一个 CyclicBarrier 实例,并设置了屏障点为,意味着我们需要等待三个线程都到达屏障点。同时,我们也定义了一个 Runnable 作为回调函数,用于在线程们全部到达屏障点后执行汇总操作。
异常处理
需要注意的是,如果某个线程在等待过程中因为中断或异常而退出,那么所有等待在屏障点的线程都将收到一个 BrokenBarrierException 异常。这是因为屏障已经被“破坏”,无法再保证所有线程都能正常通过。因此,在实际应用中应该妥善处理这类异常情况,确保系统的健壮性1。
总之,CyclicBarrier 是一种非常有用的工具,特别适合那些需要多次同步多个线程的应用场景。通过合理地配置参数和编写回调函数,可以有效地协调多个线程之间的协作行为。