Task是微软极力推荐的一种多线程处理方式,在线程的延续、取消,以及多个线程的等待处理等方面要优于Thread,ThreadPool,以下介绍的是Task的基本使用。
1. 环境准备
基于.NET6创建控制台项目进行测试,下面提前准备几个方法,防止后续举例时代码重复。
//封装一个打印调试方法,输出线程ID和时间戳
private static void LogDebug(string msg)
{Console.WriteLine($"{msg} 【ThreadId {Thread.CurrentThread.ManagedThreadId.ToString("00")}】 {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")}");
}//封装一个方法模拟一个长时间的计算
private static long DoSomethingLong()
{long lResult = 0;for (int i = 0; i < 500000000; i++){lResult += i;}return lResult;
}//封装一个方法标识某个任务正在进行工作
private static void DoSomeWork(object? name)
{LogDebug($"{name} start work");for (int i = 0; i < 1000000000; i++){}LogDebug($"{name} end work");
}
2. 线程启动
2.1 Task.Run启动
通过Task静态方法Run()
启动:
Task.Run(() =>
{LogDebug("启动子线程");Thread.Sleep(1000);
});
2.2 Start启动
构造一个Task对象,调用Start()
方法启动:
Task task = new Task(() =>
{LogDebug("启动子线程");Thread.Sleep(1000);
});
task.Start();
2.3 TaskFactory启动
以上两种方式通常用于简单启动,对于以下复杂的场景我更愿意使用TaskFactory启动:
-
启动无返回值任务:
public Task StartNew(Action action);
Task.Factory.StartNew(() => {LogDebug("启动子线程,无返回值");Thread.Sleep(1000); });
-
启动有返回值任务:
public Task<TResult> StartNew<TResult>(Func<TResult> function);
Task<string> task = Task.Factory.StartNew(() => {LogDebug("启动子线程,有返回值");return DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"); }); string time = task.Result; Console.WriteLine(time);
-
传入状态参数:
public Task StartNew(Action<object?> action, object? state);
state是object类型,用户可根据需要传入该参数,例如启动多个子线程时可传入状态参数来标识任务。Task的AsyncState保存该参数。Task task = Task.Factory.StartNew(state => {//子线程内部直接访问状态参数LogDebug($"启动子线程,传入状态标志: {state}"); }, "Hello"); //通过Task的AsyncState属性访问状态参数 LogDebug($"{task.AsyncState}");
-
传入启动选项参数:
TaskCreationOptions
LongRunning
:
Task是基于ThreadPool进行的封装,而线程池面向的是“小任务”,如果没有空闲任务,后续的Task会被阻塞,因此如果有一个耗时的任务可以标记为TaskCreationOptions.LongRunning
,则任务不会使用线程池,在一个单独的线程中运行。AttachedToParent
:
在父Task中创建子Task,父Task不会等待子Task结束;如果父Task需要等待子Task结束可以在启动子Task时设置TaskCreationOptions.AttachedToParent
。DenyChildAttach
:
如果一个父线程设置了TaskCreationOptions.DenyChildAttach
,该线程启动的子线程即使设置了TaskCreationOptions.AttachedToParent
也是失效的。
3. 线程等待
3.1 Wait
针对单个Task实例,可以进行等待,等待会阻塞当前线程。重载方法还可以传入超时时间。
static void Main(string[] args)
{LogDebug("执行主线程任务");Task task = Task.Factory.StartNew(() => {LogDebug("执行子线程任务");DoSomethingLong();LogDebug("子线程执行完毕");});//阻塞等待子线程结束task.Wait();LogDebug("主线程执行完毕");Console.Read();
}
执行结果:
3.2 WaitAny
阻塞等待,等到线程列表中任何一个线程执行完毕即可返回,继续向下执行。
public static int WaitAny(params Task[] tasks);
- 参数tasks:传入Task数组
- 返回值:返回完成Task对象在数组中的索引
- 重载方法可以传入等待超时时间
static void Main(string[] args)
{LogDebug("Main start");List<Task> tasks = new List<Task>();tasks.Add(Task.Factory.StartNew(state => DoSomeWork(state), "Task1"));tasks.Add(Task.Factory.StartNew(state => DoSomeWork(state), "Task2"));tasks.Add(Task.Factory.StartNew(state => DoSomeWork(state), "Task3"));int index = Task.WaitAny(tasks.ToArray());LogDebug($"{tasks.ToArray()[index].AsyncState} 务完成了");LogDebug("Main end");Console.Read();
}
执行结果:
3.3 WaitAll
阻塞等待,等到线程列表中所有线程全部完成,继续向下执行。
public static void WaitAll(params Task[] tasks);
- 参数tasks:传入Task数组
- 重载方法可以传入等待超时时间
static void Main(string[] args)
{LogDebug("Main start");List<Task> tasks = new List<Task>();tasks.Add(Task.Factory.StartNew(state => DoSomeWork(state), "Task1"));tasks.Add(Task.Factory.StartNew(state => DoSomeWork(state), "Task2"));tasks.Add(Task.Factory.StartNew(state => DoSomeWork(state), "Task3"));Task.WaitAll(tasks.ToArray());LogDebug($"所有任务都完成了");LogDebug("Main end");Console.Read();
}
执行结果:
3.4 WhenAny + ContinueWith
不会阻塞当前线程,通常与ContinueWith
配合使用,等到线程列表中任何一个线程执行完毕,继续执行ContinueWith中的任务。
public static Task<Task> WhenAny(params Task[] tasks);
- 参数:传入Task数组
- 返回值:返回泛型Task对象,该对象表示线程列表中执行完毕的任务;并且该返回值将作为ContinueWith中委托的参数进行传递
public Task ContinueWith(Action<Task<TResult>> continuationAction);
- 参数:传入一个Action委托,委托的参数
Task<TResult>
来自WhenAny的返回值Task<Task>
,通过Result
属性可以获得已完成任务的Task对象 - 返回值:返回一个Task对象
static void Main(string[] args)
{LogDebug("Main start");List<Task> tasks = new List<Task>();tasks.Add(Task.Factory.StartNew(state => DoSomeWork(state), "Task1"));tasks.Add(Task.Factory.StartNew(state => DoSomeWork(state), "Task2"));tasks.Add(Task.Factory.StartNew(state => DoSomeWork(state), "Task3"));//通过Result从泛型Task中获得Task对象Task.WhenAny(tasks.ToArray()).ContinueWith(task => LogDebug($"【{task.Result.AsyncState}】 任务完成了"));LogDebug("Main end");Console.Read();
}
执行结果:
3.5 WhenAll + ContinueWith
不会阻塞当前线程,通常与ContinueWith
配合使用,等到任务列表中全部任务执行完毕,继续执行ContinueWith中的任务。
public static Task WhenAll(params Task[] tasks);
- 参数tasks:传入Task数组
- 返回值:返回一个Task对象,用于表示任务列表中所有任务完成,那为什么不像WaitAll一样返回void,废话,返回void还怎么组合ContinueWith做任务延续
static void Main(string[] args)
{LogDebug("Main start");List<Task> tasks = new List<Task>();tasks.Add(Task.Factory.StartNew(state => DoSomeWork(state), "Task1"));tasks.Add(Task.Factory.StartNew(state => DoSomeWork(state), "Task2"));tasks.Add(Task.Factory.StartNew(state => DoSomeWork(state), "Task3"));Task.WhenAll(tasks.ToArray()).ContinueWith(task => LogDebug($"所有任务都完成了"));LogDebug("Main end");Console.Read();
}
执行结果:
3.6 ContinueWhenAny
ContinueWhenAny是TaskFactory的方法,不会阻塞当前线程,等价于Task的WhenAny + ContinueWith。
public Task ContinueWhenAny(Task[] tasks, Action<Task> continuationAction);
- 参数tasks:传入Task数组
- 参数continuationAction:某一个任务完成之后,把完成的Task作为参数执行一个委托
- 返回值:返回一个Task对象,可以继续使用Task提供的功能
static void Main(string[] args)
{LogDebug("Main start");List<Task> tasks = new List<Task>();tasks.Add(Task.Factory.StartNew(state => DoSomeWork(state), "Task1"));tasks.Add(Task.Factory.StartNew(state => DoSomeWork(state), "Task2"));tasks.Add(Task.Factory.StartNew(state => DoSomeWork(state), "Task3"));Task.Factory.ContinueWhenAny(tasks.ToArray(), task => LogDebug($"【{task.AsyncState}】 任务完成了"));LogDebug("Main end");Console.Read();
}
执行结果:
3.7 ContinueWhenAll
ContinueWhenAny是TaskFactory的方法,不会阻塞当前线程,等价于Task的WhenAll + ContinueWith。
public Task ContinueWhenAll(Task[] tasks, Action<Task[]> continuationAction);
- 参数tasks:传入Task数组
- 参数continuationAction:所有任务完成后,Task列表作为参数执行一个委托
- 返回值:返回一个Task对象,可以继续使用Task提供的功能
static void Main(string[] args)
{LogDebug("Main start");List<Task> tasks = new List<Task>();tasks.Add(Task.Factory.StartNew(state => DoSomeWork(state), "Task1"));tasks.Add(Task.Factory.StartNew(state => DoSomeWork(state), "Task2"));tasks.Add(Task.Factory.StartNew(state => DoSomeWork(state), "Task3"));Task.Factory.ContinueWhenAll(tasks.ToArray(), taskList => {LogDebug($"所有任务都完成了");//可以对完成的任务进行遍历foreach (Task t in taskList){LogDebug($"【{t.AsyncState}】 任务完成了");}});LogDebug("Main end");Console.Read();
}
执行结果:
4. 线程安全
对于多线程编程,当访问公共资源时一定要考虑冲突问题,解决的核心思想就是保证同一时刻只有一个线程访问资源。
4.1 lock
lock基本使用:
- 标准定义:
private static readonly object TaskSafe_Lock = new object();
private
:防止外面也能调用锁satatic
:全场唯一,避免不同实例锁会不同readonly
:防止修改object
:表示引用类型,锁只能是引用类型,可以是任何引用类型,但不要用string,因为享元。
internal class Program
{private static readonly object TaskSafe_Lock = new object();private static int TotalCount = 0;private static List<int> IntList = new List<int>();static void Main(string[] args){LogDebug("Main start");List<Task> tasks = new List<Task>();for (int i = 0; i < 10000; i++){int newI = i;tasks.Add(Task.Factory.StartNew(() =>{//lock 后的方法快,任意时刻只有一个线程可以进来lock (TaskSafe_Lock){TotalCount += 1;IntList.Add(newI);}}));}Task.WaitAll(tasks.ToArray());Console.WriteLine(TotalCount);Console.WriteLine(IntList.Count());LogDebug("Main end");Console.Read();}
}
执行结果:
如果上面的代码屏蔽lock (TaskSafe_Lock)
,执行结果:
我们小结一下:
- 第二次结果没有使用锁,在访问公共资源TotalCount时,可能会出现某一个时刻两个线程同时操作该变量,向该变量写入了相同的值。
- 使用多线程能提高并发,而使用lock时,lock语句块内同一时刻只有一个线程能进入,有可能发生等待,这又牺牲了部分性能,所以使用lock一定要注意保护的粒度,要尽可能缩小lock的范围。
5. 线程取消
说到线程取消先理解一个问题,线程不是C#的资源,而是操作系统OS的资源,我们是没办法真正掌控线程什么时候取消的,所以像Thread.Abort()这种用法在.NET5及更高版本中不再支持,Task也没有提供能够取消线程的API。那该如何取消呢?常用以下方式。
5.1 CancellationToken
使用CancellationToken做线程取消不是直接操作Task对象,而是在线程里面去检查某个状态标志,流程大致是:创建公共访问变量——修改它——线程不断监测它。我们先简单整理下CancellationToken的使用:
- CancellationToken结构体
- None:空
- IsCancellationRequested:是否取消
- ThrowIfCancellationRequested:如果任务被取消,执行到这句话就抛异常
- CancellationTokenSource
- CancelAfter:超时后发出取消信号
- Cancel:发出取消信号
- Token:CancellationToken对象通常不是直接new出来的,而是通过CancellationTokenSource的Token属性得到。
举个场景:启动多个线程,某个线程抛出异常后,希望通知别的线程都停下来:
static void Main(string[] args)
{CancellationTokenSource cts = new CancellationTokenSource();for (int i = 0; i < 20; i++){string name = string.Format($"Task_{i}");Action<object> act = t =>{try{Thread.Sleep(1000);if (t.ToString().Equals("Task_3")){throw new Exception(string.Format($"***{t} 执行失败"));}//检查信号量if (cts.IsCancellationRequested){Console.WriteLine("{0} 放弃执行", t);//return结束的是当前线程return;}Console.WriteLine("{0} 执行成功", t);}catch (Exception ex){cts.Cancel();Console.WriteLine(ex.Message);}};Task.Factory.StartNew(act, name, cts.Token);}Console.Read();
}
执行结果:
我们小结一下:
- StartNew时传入CancellationToken如果已经是Cancel的状态,线程会放弃启动。
- 已经启动的线程,需要通过判断IsCancellationRequested状态标志手动决定如何结束线程
6. 异常处理
6.1 线程内部异常
在同步调用中可以将方法包装到try catch中进行捕获,但是异步在调用时不可以,因为异步执行代码和try catch不在同一线程内。有解释说比如调用Start或Task.Run()方法时,方法是立即返回的,可能子线程还没抛出异常就跳出了try catch的范围,下面的例子在try中加入延时即使子线程执行完仍然捕捉不到异常。
static void Main(string[] args)
{LogDebug("Main start");try{Task.Run(() =>{Thread.Sleep(1000);throw new Exception(string.Format($"抛出异常"));});//给try catch留充足的时间Thread.Sleep(5000);}catch (Exception ex){Console.WriteLine($"************{ex.Message}");}LogDebug("Main end");Console.Read();
}
执行结果:
所以线程内部的异常应当在线程内部进行捕获处理:
static void Main(string[] args)
{LogDebug("Main start");try{Task.Run(() =>{try{Thread.Sleep(1000);throw new Exception(string.Format($"抛出异常"));}catch (Exception ex){Console.WriteLine($"子线程捕获异常:{ex.Message}");}});Thread.Sleep(5000);}catch (Exception ex){Console.WriteLine($"************{ex.Message}");}LogDebug("Main end");Console.Read();
}
执行结果:
6.2 多个任务(AggregateException)
AggregateException异常有几个关键点:
- 调用WaitAll等待期间,异常会被包裹到一个对象中
- 如果子线程中捕捉过异常,AggregateException中不会再次记录异常
- 等全部任务完成,WaitAll返回,如果有未处理的异常才会catch到AggregateException
- 遍历异常有两种方式:
- 对每个异常调用
Handle
- 遍历
InnerExceptions
- 对每个异常调用
具体用法如下:
static void Main(string[] args)
{try{List<Task> tasks = new List<Task>();for (int i = 0; i < 10; i++){string name = string.Format($"Task_{i}");Action<object> act = t =>{Thread.Sleep(1000);if (t.ToString().Equals("Task_3")){throw new Exception(string.Format($"***{t} 执行失败"));}if (t.ToString().Equals("Task_4")){throw new Exception(string.Format($"***{t} 执行失败"));}Console.WriteLine("{0} 执行成功", t);};tasks.Add(Task.Factory.StartNew(act, name));}Task.WaitAll(tasks.ToArray());}//Wait期间发生的异常都包裹到一个对象中去//子线程中处理过异常这里就不会再捕获了catch (AggregateException aex){//遍历异常方式一aex.Handle(ex =>{Console.WriteLine($"方式1遍历结果:{ex.Message}");return true;});//遍历异常方式二foreach (var item in aex.InnerExceptions){Console.WriteLine($"方式2遍历结果:{item.Message}");}}catch (Exception ex){Console.WriteLine($"************{ex.Message}");}Console.Read();
}
执行结果:
7 延时(Dealay)
提到延时大家肯定用过Thread.Sleep()
,在Task中也提供了一个Delay
方法,这两者有什么区别,我们通过一个例子来理解:
static void Main(string[] args)
{LogDebug("Main start");//使用DelayStopwatch swDelay = Stopwatch.StartNew();swDelay.Start();Task.Delay(2000);swDelay.Stop();Console.WriteLine($"Delay: {swDelay.ElapsedMilliseconds}");//使用SleepStopwatch swSleep = Stopwatch.StartNew();swSleep.Start();Thread.Sleep(2000);swSleep.Stop();Console.WriteLine($"Sleep: {swSleep.ElapsedMilliseconds}");LogDebug("Main end");Console.Read();
}
执行结果:
根据结果可以看到,调用Task.Delay()
好像根本没有延时,程序直接向下执行了,那这个延时有什么用?实际上这两者最大的区别是Sleep是同步等待,会阻塞调用者线程,而Delay是异步等待,不会阻塞调用者线程。
那如何使用Delay实现延时呢?先来看下定义:public static Task Delay(int millisecondsDelay);
- 参数:传入指定时间
- 返回值:返回一个Task对象,有了Task不就可以组合
ContinueWith
了
static void Main(string[] args)
{LogDebug("Main start");Stopwatch sw = Stopwatch.StartNew();sw.Start();Task.Delay(2000).ContinueWith(task =>{sw.Stop();Console.WriteLine($"延时:{sw.ElapsedMilliseconds}");});LogDebug("Main end");Console.Read();
}
执行结果:
我们小结以下:
- 调用Delay没有阻塞调用者线程
- 延时指定时间后执行延续任务
如果Delay不好理解,用Thread实现一个异步延时,对比一下:
static void Main(string[] args)
{LogDebug("Main start");Stopwatch sw = Stopwatch.StartNew();sw.Start();Task.Run(() =>{Thread.Sleep(2000);sw.Stop();Console.WriteLine($"延时:{sw.ElapsedMilliseconds}");});LogDebug("Main end");Console.Read();
}
执行结果: