目录
Task.Run
Task.Run 的底层原理
默认并发数量
控制并发
使用 SemaphoreSlim
代码解析
使用 Parallel.ForEach
代码解析
注意事项
自定义任务调度器
代码解析
使用自定义任务调度器:
总结
Task.Run
Task.Run
是 .NET 中创建和启动异步任务的一种便捷方法。它通过将一个委托排队到 .NET 线程池来创建并运行任务。理解 Task.Run
的底层原理、默认并发数量以及并发控制方法对于优化并发程序至关重要。
Task.Run
的底层原理
-
任务创建与启动:
Task.Run
本质上是调用了Task.Factory.StartNew
方法,并设置了默认的任务调度选项和任务创建选项。- 具体来说,
Task.Run
等价于Task.Factory.StartNew(action, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default)
。
-
线程池:
Task.Run
使用 .NET 线程池 (ThreadPool
) 来执行任务。线程池是一个全局的、共享的资源,管理和复用一组线程。- 线程池通过一种自适应算法动态调整线程的数量,以应对当前的工作负载。
-
任务调度器:
Task.Run
使用默认的任务调度器 (TaskScheduler.Default
),它依赖于线程池来调度任务。- 默认任务调度器会将任务排队到线程池,然后线程池中的线程会从队列中获取任务并执行。
默认并发数量
-
线程池的并发策略:
- 线程池的初始线程数取决于系统的处理器核心数(CPU 核数)。对于每个逻辑处理器,线程池会保留一个或多个线程以处理任务。
- 线程池有一个最小线程数和一个最大线程数。最小线程数可以通过
ThreadPool.SetMinThreads
方法设置,默认情况下等于逻辑处理器的数量。 - 当线程池中的线程处于忙碌状态时,线程池会判断是否需要创建新线程。创建新线程的决策基于一个自适应的算法,该算法考虑了任务队列的长度、任务的执行时间等因素。
-
最大并发数:
虽然线程池可以动态调整线程数量,但默认情况下,线程池的最大线程数是相对较高的。可以通过ThreadPool.GetMaxThreads
获取这个值。int workerThreads, completionPortThreads; ThreadPool.GetMaxThreads(out workerThreads, out completionPortThreads); Console.WriteLine($"Max Worker Threads: {workerThreads}, Max Completion Port Threads: {completionPortThreads}");
控制并发
使用 SemaphoreSlim
SemaphoreSlim
是一种轻量级的同步原语,可以用来控制同时执行的任务数量。通过在任务开始时等待信号量,在任务结束时释放信号量来实现并发控制。
// 异步方法,用于处理文档列表,通过信号量限制并发任务的数量
private async Task ProcessDocumentListAsync(List<string> documents)
{// 存储任务的列表var tasks = new List<Task>();// 创建一个信号量,限制并发任务数量为5var semaphore = new SemaphoreSlim(5);// 遍历文档列表foreach (var doc in documents){// 等待信号量,确保并发任务不超过5个await semaphore.WaitAsync();// 将处理文档的任务添加到任务列表tasks.Add(Task.Run(async () =>{try{// 异步处理文档await ProcessDocumentAsync(doc);}finally{// 任务完成后释放信号量semaphore.Release();}}));}// 等待所有任务完成await Task.WhenAll(tasks);
}// 异步方法,用于处理单个文档
private async Task ProcessDocumentAsync(string document)
{// 模拟文档处理,通过延时模拟耗时操作await Task.Delay(1000);// 打印处理完成的信息Console.WriteLine($"Processed document: {document}");
}
代码解析
ProcessDocumentListAsync
方法:
- 参数:接受一个文档列表 (
List<string> documents
)。- 任务列表:创建一个空的任务列表
tasks
用于存储所有的处理任务。- 信号量:创建一个信号量
semaphore
,初始计数为5,这意味着最多允许5个并发任务。- 遍历文档列表:使用
foreach
循环遍历所有文档。
- 等待信号量:调用
await semaphore.WaitAsync()
,当信号量计数大于0时,允许任务继续;否则,等待。- 添加任务:使用
Task.Run
启动一个新任务来处理文档,并将其添加到任务列表tasks
中。- 任务内容:在任务中调用
ProcessDocumentAsync
方法异步处理文档。
try-finally
块:确保任务执行完成后,无论是否发生异常,都会释放信号量。- 等待所有任务完成:
await Task.WhenAll(tasks)
确保所有添加到任务列表中的任务全部完成。
ProcessDocumentAsync
方法:
- 参数:接受一个文档 (
string document
)。- 模拟处理:使用
await Task.Delay(1000)
模拟耗时的文档处理操作,延时1秒。- 打印信息:处理完成后,打印处理文档的消息。
该代码通过使用信号量 (
SemaphoreSlim
) 控制并发任务的数量,确保同一时刻最多只有5个文档处理任务在运行。每个文档处理任务通过Task.Run
启动,并调用ProcessDocumentAsync
方法来模拟实际的文档处理。任务完成后,无论是否发生异常,都会释放信号量,从而允许新的任务启动。最后,ProcessDocumentListAsync
方法等待所有任务完成后才返回。通过这种方式,可以有效地控制并发任务数量,避免因过多并发任务导致的资源耗尽或性能下降。
使用 Parallel.ForEach
Parallel.ForEach
提供了并行化处理集合的方法,可以通过 ParallelOptions
来控制并行程度。
// 设置并行选项, 限制并行任务的最大数量为5个
var parallelOptions = new ParallelOptions
{MaxDegreeOfParallelism = 5
};// 使用 Parallel.ForEach 并行处理文档列表
Parallel.ForEach(documents, parallelOptions, (doc) =>
{// 调用异步方法处理文档,在此处使用 .Wait() 同步等待异步方法完成ProcessDocumentAsync(doc).Wait();
});
代码解析
设置并行选项:创建
ParallelOptions
对象,并设置MaxDegreeOfParallelism
属性为5
,从而限制并行任务的最大数量。并行处理文档:使用
Parallel.ForEach
方法遍历文档列表:
- 传入
documents
作为文档列表。- 传入
parallelOptions
以控制并行度。- 对每个文档执行 lambda 表达式中的操作,调用
ProcessDocumentAsync(doc).Wait()
同步等待异步文档处理完成。注意事项
同步等待异步方法:使用
.Wait()
方法将异步方法ProcessDocumentAsync
转为同步,这种方式可能会引发一些潜在的问题,如死锁。如果ProcessDocumentAsync
方法内部有await
,应当注意避免在上下文中使用.Wait()
或.Result
,因为它们会阻塞线程。上下文捕获:如果
ProcessDocumentAsync
方法有上下文捕获(如 UI 线程上下文),则使用.Wait()
可能导致死锁或性能问题。可以使用ConfigureAwait(false)
来避免这些问题。
自定义任务调度器
可以通过实现自定义的任务调度器 (TaskScheduler
) 来更精细地控制任务的调度和并发。
// 自定义任务调度器,用于限制并发任务的最大数量
public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{// 最大并发任务数量private readonly int maxDegreeOfParallelism;// 用于存储待执行任务的队列private readonly LinkedList<Task> tasks = new LinkedList<Task>();// 当前正在运行的任务数量private int runningTasks;// 构造函数,初始化最大并发任务数量public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism){this.maxDegreeOfParallelism = maxDegreeOfParallelism;}// 返回当前计划的任务。这对于调试和监控工具非常有用。protected override IEnumerable<Task> GetScheduledTasks(){lock (tasks){// 返回任务队列中的任务数组return tasks.ToArray();}}// 将任务排队到任务队列中。如果运行的任务小于最大并发任务数,则立即执行任务。protected override void QueueTask(Task task){lock (tasks){// 将任务添加到队列尾部tasks.AddLast(task);// 如果当前运行的任务数量小于最大并发任务数量,则启动一个新任务if (runningTasks < maxDegreeOfParallelism){runningTasks++;// 通知线程池有待处理工作NotifyThreadPoolOfPendingWork();}}}// 通知线程池有待处理的工作private void NotifyThreadPoolOfPendingWork(){// 使用线程池执行任务ThreadPool.UnsafeQueueUserWorkItem(_ =>{// 从任务队列中取出第一个任务Task item;lock (tasks){item = tasks.First.Value;tasks.RemoveFirst();}// 尝试执行任务base.TryExecuteTask(item);lock (tasks){// 任务执行完成,减少运行中的任务计数runningTasks--;// 如果还有待执行的任务,则递归通知线程池if (tasks.Count > 0){runningTasks++;NotifyThreadPoolOfPendingWork();}}}, null);}// 尝试在线程中的同步上下文内执行任务protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued){// 如果任务已排队,则不在线程内执行if (taskWasPreviouslyQueued)return false;// 直接执行任务return base.TryExecuteTask(task);}
}
代码解析
LimitedConcurrencyLevelTaskScheduler
类:这是一个继承自TaskScheduler
的自定义任务调度器,主要用于限制并发任务的最大数量。字段:
maxDegreeOfParallelism
:最大并发任务数量。tasks
:用于存储排队等待执行的任务队列。runningTasks
:当前正在运行的任务数量。构造函数:初始化最大并发任务数量。
GetScheduledTasks
方法:返回当前排队的任务数组,这对于调试和监控非常有用。
QueueTask
方法:将任务添加到任务队列中,并在当前运行的任务数量少于最大并发数量时启动新任务。
NotifyThreadPoolOfPendingWork
方法:通知线程池有待处理的任务,从任务队列中取出任务并执行,递归地处理后续任务。
TryExecuteTaskInline
方法:尝试在线程中的同步上下文内直接执行任务,如果任务已经排队则返回false
,否则直接执行任务。
使用自定义任务调度器:
var scheduler = new LimitedConcurrencyLevelTaskScheduler(5);
var tasks = new List<Task>();foreach (var doc in documents)
{tasks.Add(Task.Factory.StartNew(() =>ProcessDocumentAsync(doc).Wait(),CancellationToken.None,TaskCreationOptions.None,scheduler));
}Task.WaitAll(tasks.ToArray());
总结
Task.Run
将任务排队到线程池中执行,线程池会根据工作负载动态调整线程数量。- 默认并发数量 由线程池管理,系统会根据当前的负载自动调整。
- 控制并发 可以通过使用
SemaphoreSlim
、Parallel.ForEach
、自定义任务调度器等方式来实现。
通过理解和应用这些机制,可以更好地控制并发任务的执行,实现高效的并发编程。