总目录
前言
Barrier
是 C# 中用于多线程分阶段协同工作的同步工具,位于 System.Threading 命名空间下。它允许多个线程在指定阶段(Phase)的屏障点(Barrier Point)同步,所有线程到达屏障点后,才能一起进入下一阶段。适用于需要分步骤并行处理的任务(例如并行计算、流水线处理)。
一、核心概念
Barrier
是一种同步机制,用于协调多个线程在执行某个阶段工作时进行等待,直到所有参与的线程都达到某个同步点后再继续执行。这对于需要在多个线程之间进行阶段性同步的场景非常有用
- 分阶段协作:
- 线程在执行过程中按阶段同步,所有线程完成当前阶段/相位(phase)后,才能继续下一阶段/相位(phase)。
- Barrier 允许多个线程在多个阶段的工作中进行同步。每个线程在每个阶段完成工作后调用 SignalAndWait() 方法,通知 Barrier 自己已经到达同步点。
- 参与者(Participants):注册到
Barrier
的线程数量,初始化时指定,可以通过AddParticipant()
和RemoveParticipant()
动态调整参与者的数量。 - 阶段编号(Phase Number):从 0 开始递增,每次所有线程到达屏障点后,阶段编号自动加 1。
- 回调函数(Post-Phase Action):
- 可指定一个委托,在所有线程到达屏障点时触发(适合清理或日志操作)。
- 可以在每个阶段结束时执行一个回调函数,例如收集数据、更新进度等。
- 适用场景:适用于需要多阶段并行处理的场景,如并行计算、多阶段数据处理等。
- 多线程计算:每个线程负责计算一部分数据,所有线程在每个计算阶段结束后需要同步。
- 多步流水线处理:每个线程负责流水线中的一个步骤,所有线程在每一步结束后需要同步。
二、基本用法
1. 构造函数
public Barrier(int participantCount);
public Barrier(int participantCount, Action<Barrier>? postPhaseAction);
// 初始化时指定参与者数量(线程数)和可选的阶段完成回调
var barrier = new Barrier(participantCount: 3, postPhaseAction: phase =>
{Console.WriteLine($"阶段 {phase} 完成");
});
participantCount
:参与同步的线程数量。postPhaseAction
:每个阶段结束时执行的回调函数,参数为当前 Barrier 对象。
2. 主要方法和属性
方法 | 作用 |
---|---|
SignalAndWait() | 通知屏障当前线程已到达屏障点,并阻塞直到所有参与者到达。 表示当前线程已完成当前阶段的工作,并等待所有其他参与者也完成该阶段的工作。 |
AddParticipant() | 增加一个参与者。 |
RemoveParticipant() | 减少一个参与者。 |
ParticipantsRemaining | 获取当前相位中还未到达屏障点的参与者数量。 |
CurrentPhaseNumber | 获取当前屏障的相位编号。 |
Dispose() | 释放资源。 |
三、示例
示例 1:多线程分阶段处理
using System.Threading;class Program
{static Barrier barrier = new Barrier(3, phase => {Console.WriteLine($"\n所有线程完成阶段 {phase.CurrentPhaseNumber}\n");});static void Main(){new Thread(DoWork).Start("A");new Thread(DoWork).Start("B");new Thread(DoWork).Start("C");}static void DoWork(object name){for (int phase = 0; phase < 2; phase++){Console.WriteLine($"{name} 正在执行阶段 {phase}");Thread.Sleep(100);barrier.SignalAndWait(); // 等待其他线程}}
}
输出:
A 正在执行阶段 0
B 正在执行阶段 0
C 正在执行阶段 0
所有线程完成阶段 0A 正在执行阶段 1
B 正在执行阶段 1
C 正在执行阶段 1
所有线程完成阶段 1
using System;
using System.Threading;
using System.Threading.Tasks;class Program
{static Barrier barrier = new Barrier(3, b =>{Console.WriteLine($"所有参与者完成了第 {b.CurrentPhaseNumber} 阶段");});static void Main(string[] args){Console.WriteLine("启动三个任务...");for (int i = 1; i <= 3; i++){int taskId = i;Task.Run(() =>{for (int phase = 1; phase <= 2; phase++) // 模拟两个阶段{Console.WriteLine($"任务 {taskId} 开始执行阶段 {phase}");Thread.Sleep(1000); // 模拟任务执行时间Console.WriteLine($"任务 {taskId} 完成阶段 {phase}");barrier.SignalAndWait(); // 等待所有任务完成当前阶段}});}// 等待所有任务完成Console.ReadKey();}
}
输出
启动三个任务...
任务 2 开始执行阶段 1
任务 3 开始执行阶段 1
任务 1 开始执行阶段 1
任务 3 完成阶段 1
任务 1 完成阶段 1
任务 2 完成阶段 1
所有参与者完成了第 0 阶段
任务 2 开始执行阶段 2
任务 3 开始执行阶段 2
任务 1 开始执行阶段 2
任务 1 完成阶段 2
任务 2 完成阶段 2
任务 3 完成阶段 2
所有参与者完成了第 1 阶段
示例 2:动态调整参与者
using System;
using System.Threading;class Program
{static Barrier barrier = new Barrier(2, phase =>{Console.ForegroundColor = ConsoleColor.Red;Console.WriteLine($"\n[阶段 {phase.CurrentPhaseNumber} 完成] 当前参与者数量: {barrier.ParticipantCount}\n");Console.ResetColor();});static void Main(){// 初始启动2个线程StartThread("A");StartThread("B");Console.ReadKey();}static void StartThread(string name){new Thread(() =>{for (int phase = 0; phase < 4; phase++){// 模拟工作Thread.Sleep(100);Console.WriteLine($"{name} 完成阶段 {phase}");// 等待其他线程到达屏障点barrier.SignalAndWait();// 动态调整参与者(仅在特定阶段触发)if (phase == 1 && name == "A"){// 阶段1完成后新增1个参与者barrier.AddParticipant();Console.WriteLine("新增参与者:线程C");StartThread("C");}else if (phase == 2 && name == "A"){// 阶段2完成后移除1个参与者barrier.RemoveParticipant();Console.WriteLine("移除参与者:线程A");break; // 确保线程A能够退出}}}){ Name = name }.Start();}
}
输出:
A 完成阶段 0
B 完成阶段 0[阶段 0 完成] 当前参与者数量: 2B 完成阶段 1
A 完成阶段 1[阶段 1 完成] 当前参与者数量: 2新增参与者:线程C
A 完成阶段 2
C 完成阶段 0
B 完成阶段 2[阶段 2 完成] 当前参与者数量: 3移除参与者:线程A
C 完成阶段 1
B 完成阶段 3[阶段 3 完成] 当前参与者数量: 2C 完成阶段 2
示例3:多任务下载
using System;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;class Program
{static void Main(){// 假设我们有三个文件需要下载string[] urls = new string[]{"https://example.com/file1","https://example.com/file2","https://example.com/file3"};// 设置参与的线程数为文件数,并在每个阶段结束时输出状态Barrier barrier = new Barrier(urls.Length, (b) =>{Console.WriteLine($"所有任务在阶段 {b.CurrentPhaseNumber} 完成。");});// 创建并启动下载任务for (int i = 0; i < urls.Length; i++){int localI = i;Task.Run(async () =>{using (HttpClient client = new HttpClient()){for (int phase = 0; phase < 3; phase++) // 设定3个阶段{// 假设每个阶段下载一部分Console.WriteLine($"任务 {localI} 在阶段 {phase} 开始下载。");await DownloadPartialFile(client, urls[localI], phase);Console.WriteLine($"任务 {localI} 在阶段 {phase} 完成下载。");// 等待其他任务barrier.SignalAndWait();}}});}// 等待所有任务完成Console.ReadLine();}// 模拟分段下载static async Task DownloadPartialFile(HttpClient client, string url, int phase){// 这里我们只是模拟下载,实际应用中可以根据 URL 和 phase 来下载文件的不同部分await Task.Delay(new Random().Next(1000, 2000)); // 模拟下载时间Console.WriteLine($"下载 {url} 的阶段 {phase} 部分完成。");}
}
四、高级用法
1. 自定义阶段回调
var barrier = new Barrier(3, phase =>
{if (phase.CurrentPhaseNumber == 0) {Console.WriteLine("第一阶段数据已就绪");}else if (phase.CurrentPhaseNumber == 1){Console.WriteLine("第二阶段计算完成");}
});
2. 超时设置
- 若某个线程未调用
SignalAndWait()
,其他线程将永久阻塞。 - 解决方法:结合超时机制。
bool success = barrier.SignalAndWait(TimeSpan.FromSeconds(5)); if (!success) Console.WriteLine("等待超时");
五、替代方案
CountdownEvent
:适合一次性等待多个任务完成,而非分阶段。ManualResetEvent
:适合简单的事件通知,无阶段概念。Task
和async/await
:更适合基于任务的异步编程模型。
结语
回到目录页:C#/.NET 知识汇总
希望以上内容可以帮助到大家,如文中有不对之处,还请批评指正。