欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 教育 > 幼教 > 深入详解 C# Task.Run异步任务

深入详解 C# Task.Run异步任务

2025/1/19 13:09:59 来源:https://blog.csdn.net/martian665/article/details/141717326  浏览:    关键词:深入详解 C# Task.Run异步任务

目录

Task.Run

Task.Run 的底层原理

默认并发数量

控制并发

使用 SemaphoreSlim

代码解析

使用 Parallel.ForEach

代码解析

注意事项

自定义任务调度器

代码解析

使用自定义任务调度器:

总结


Task.Run

Task.Run 是 .NET 中创建和启动异步任务的一种便捷方法。它通过将一个委托排队到 .NET 线程池来创建并运行任务。理解 Task.Run 的底层原理、默认并发数量以及并发控制方法对于优化并发程序至关重要。

Task.Run 的底层原理

  1. 任务创建与启动

    • Task.Run 本质上是调用了 Task.Factory.StartNew 方法,并设置了默认的任务调度选项和任务创建选项。
    • 具体来说,Task.Run 等价于 Task.Factory.StartNew(action, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default)
  2. 线程池

    • Task.Run 使用 .NET 线程池 (ThreadPool) 来执行任务。线程池是一个全局的、共享的资源,管理和复用一组线程。
    • 线程池通过一种自适应算法动态调整线程的数量,以应对当前的工作负载。
  3. 任务调度器

    • Task.Run 使用默认的任务调度器 (TaskScheduler.Default),它依赖于线程池来调度任务。
    • 默认任务调度器会将任务排队到线程池,然后线程池中的线程会从队列中获取任务并执行。

默认并发数量

  1. 线程池的并发策略

    • 线程池的初始线程数取决于系统的处理器核心数(CPU 核数)。对于每个逻辑处理器,线程池会保留一个或多个线程以处理任务。
    • 线程池有一个最小线程数和一个最大线程数。最小线程数可以通过 ThreadPool.SetMinThreads 方法设置,默认情况下等于逻辑处理器的数量。
    • 当线程池中的线程处于忙碌状态时,线程池会判断是否需要创建新线程。创建新线程的决策基于一个自适应的算法,该算法考虑了任务队列的长度、任务的执行时间等因素。
  2. 最大并发数

    虽然线程池可以动态调整线程数量,但默认情况下,线程池的最大线程数是相对较高的。可以通过 ThreadPool.GetMaxThreads 获取这个值。
    int workerThreads, completionPortThreads;
    ThreadPool.GetMaxThreads(out workerThreads, out completionPortThreads);
    Console.WriteLine($"Max Worker Threads: {workerThreads}, Max Completion Port Threads: {completionPortThreads}");
    
     

控制并发

使用 SemaphoreSlim

SemaphoreSlim 是一种轻量级的同步原语,可以用来控制同时执行的任务数量。通过在任务开始时等待信号量,在任务结束时释放信号量来实现并发控制。

// 异步方法,用于处理文档列表,通过信号量限制并发任务的数量
private async Task ProcessDocumentListAsync(List<string> documents)
{// 存储任务的列表var tasks = new List<Task>();// 创建一个信号量,限制并发任务数量为5var semaphore = new SemaphoreSlim(5);// 遍历文档列表foreach (var doc in documents){// 等待信号量,确保并发任务不超过5个await semaphore.WaitAsync();// 将处理文档的任务添加到任务列表tasks.Add(Task.Run(async () =>{try{// 异步处理文档await ProcessDocumentAsync(doc);}finally{// 任务完成后释放信号量semaphore.Release();}}));}// 等待所有任务完成await Task.WhenAll(tasks);
}// 异步方法,用于处理单个文档
private async Task ProcessDocumentAsync(string document)
{// 模拟文档处理,通过延时模拟耗时操作await Task.Delay(1000);// 打印处理完成的信息Console.WriteLine($"Processed document: {document}");
}

 

代码解析
  1. ProcessDocumentListAsync 方法

    • 参数:接受一个文档列表 (List<string> documents)。
    • 任务列表:创建一个空的任务列表 tasks 用于存储所有的处理任务。
    • 信号量:创建一个信号量 semaphore,初始计数为5,这意味着最多允许5个并发任务。
    • 遍历文档列表:使用 foreach 循环遍历所有文档。
      • 等待信号量:调用 await semaphore.WaitAsync(),当信号量计数大于0时,允许任务继续;否则,等待。
      • 添加任务:使用 Task.Run 启动一个新任务来处理文档,并将其添加到任务列表 tasks 中。
      • 任务内容:在任务中调用 ProcessDocumentAsync 方法异步处理文档。
        • try-finally 块:确保任务执行完成后,无论是否发生异常,都会释放信号量。
    • 等待所有任务完成await Task.WhenAll(tasks) 确保所有添加到任务列表中的任务全部完成。
  2. ProcessDocumentAsync 方法

    • 参数:接受一个文档 (string document)。
    • 模拟处理:使用 await Task.Delay(1000) 模拟耗时的文档处理操作,延时1秒。
    • 打印信息:处理完成后,打印处理文档的消息。

        该代码通过使用信号量 (SemaphoreSlim) 控制并发任务的数量,确保同一时刻最多只有5个文档处理任务在运行。每个文档处理任务通过 Task.Run 启动,并调用 ProcessDocumentAsync 方法来模拟实际的文档处理。任务完成后,无论是否发生异常,都会释放信号量,从而允许新的任务启动。最后,ProcessDocumentListAsync 方法等待所有任务完成后才返回。

通过这种方式,可以有效地控制并发任务数量,避免因过多并发任务导致的资源耗尽或性能下降。

使用 Parallel.ForEach

Parallel.ForEach 提供了并行化处理集合的方法,可以通过 ParallelOptions 来控制并行程度。

// 设置并行选项, 限制并行任务的最大数量为5个
var parallelOptions = new ParallelOptions
{MaxDegreeOfParallelism = 5
};// 使用 Parallel.ForEach 并行处理文档列表
Parallel.ForEach(documents, parallelOptions, (doc) =>
{// 调用异步方法处理文档,在此处使用 .Wait() 同步等待异步方法完成ProcessDocumentAsync(doc).Wait();
});
代码解析
  1. 设置并行选项:创建 ParallelOptions 对象,并设置 MaxDegreeOfParallelism 属性为 5,从而限制并行任务的最大数量。

  2. 并行处理文档:使用 Parallel.ForEach 方法遍历文档列表:

    • 传入 documents 作为文档列表。
    • 传入 parallelOptions 以控制并行度。
    • 对每个文档执行 lambda 表达式中的操作,调用 ProcessDocumentAsync(doc).Wait() 同步等待异步文档处理完成。
注意事项
  • 同步等待异步方法:使用 .Wait() 方法将异步方法 ProcessDocumentAsync 转为同步,这种方式可能会引发一些潜在的问题,如死锁。如果 ProcessDocumentAsync 方法内部有 await,应当注意避免在上下文中使用 .Wait().Result,因为它们会阻塞线程。

  • 上下文捕获:如果 ProcessDocumentAsync 方法有上下文捕获(如 UI 线程上下文),则使用 .Wait() 可能导致死锁或性能问题。可以使用 ConfigureAwait(false) 来避免这些问题。

 

自定义任务调度器

可以通过实现自定义的任务调度器 (TaskScheduler) 来更精细地控制任务的调度和并发。

// 自定义任务调度器,用于限制并发任务的最大数量
public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{// 最大并发任务数量private readonly int maxDegreeOfParallelism;// 用于存储待执行任务的队列private readonly LinkedList<Task> tasks = new LinkedList<Task>();// 当前正在运行的任务数量private int runningTasks;// 构造函数,初始化最大并发任务数量public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism){this.maxDegreeOfParallelism = maxDegreeOfParallelism;}// 返回当前计划的任务。这对于调试和监控工具非常有用。protected override IEnumerable<Task> GetScheduledTasks(){lock (tasks){// 返回任务队列中的任务数组return tasks.ToArray();}}// 将任务排队到任务队列中。如果运行的任务小于最大并发任务数,则立即执行任务。protected override void QueueTask(Task task){lock (tasks){// 将任务添加到队列尾部tasks.AddLast(task);// 如果当前运行的任务数量小于最大并发任务数量,则启动一个新任务if (runningTasks < maxDegreeOfParallelism){runningTasks++;// 通知线程池有待处理工作NotifyThreadPoolOfPendingWork();}}}// 通知线程池有待处理的工作private void NotifyThreadPoolOfPendingWork(){// 使用线程池执行任务ThreadPool.UnsafeQueueUserWorkItem(_ =>{// 从任务队列中取出第一个任务Task item;lock (tasks){item = tasks.First.Value;tasks.RemoveFirst();}// 尝试执行任务base.TryExecuteTask(item);lock (tasks){// 任务执行完成,减少运行中的任务计数runningTasks--;// 如果还有待执行的任务,则递归通知线程池if (tasks.Count > 0){runningTasks++;NotifyThreadPoolOfPendingWork();}}}, null);}// 尝试在线程中的同步上下文内执行任务protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued){// 如果任务已排队,则不在线程内执行if (taskWasPreviouslyQueued)return false;// 直接执行任务return base.TryExecuteTask(task);}
}
代码解析
  1. LimitedConcurrencyLevelTaskScheduler:这是一个继承自 TaskScheduler 的自定义任务调度器,主要用于限制并发任务的最大数量。

  2. 字段

    • maxDegreeOfParallelism:最大并发任务数量。
    • tasks:用于存储排队等待执行的任务队列。
    • runningTasks:当前正在运行的任务数量。
  3. 构造函数:初始化最大并发任务数量。

  4. GetScheduledTasks 方法:返回当前排队的任务数组,这对于调试和监控非常有用。

  5. QueueTask 方法:将任务添加到任务队列中,并在当前运行的任务数量少于最大并发数量时启动新任务。

  6. NotifyThreadPoolOfPendingWork 方法:通知线程池有待处理的任务,从任务队列中取出任务并执行,递归地处理后续任务。

  7. TryExecuteTaskInline 方法:尝试在线程中的同步上下文内直接执行任务,如果任务已经排队则返回 false,否则直接执行任务。

使用自定义任务调度器:

var scheduler = new LimitedConcurrencyLevelTaskScheduler(5);
var tasks = new List<Task>();foreach (var doc in documents)
{tasks.Add(Task.Factory.StartNew(() =>ProcessDocumentAsync(doc).Wait(),CancellationToken.None,TaskCreationOptions.None,scheduler));
}Task.WaitAll(tasks.ToArray());

总结

  • Task.Run 将任务排队到线程池中执行,线程池会根据工作负载动态调整线程数量。
  • 默认并发数量 由线程池管理,系统会根据当前的负载自动调整。
  • 控制并发 可以通过使用 SemaphoreSlimParallel.ForEach、自定义任务调度器等方式来实现。

通过理解和应用这些机制,可以更好地控制并发任务的执行,实现高效的并发编程。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com