1. 异步任务运行过程中,如何取消一个任务的运行?
use tokio::sync::broadcast;
use tokio::task;
use tokio::time::{sleep, Duration};#[tokio::main]
async fn main() {// 创建一个广播频道,用于通知任务取消let (tx, _) = broadcast::channel(16);// 启动多个异步任务let mut handles = Vec::new();for i in 0..3 {let mut rx = tx.subscribe();let handle = task::spawn(async move {tokio::select! {_ = async {loop {sleep(Duration::from_secs(1)).await;println!("Task {} running", i);}} => {println!("Task {} completed", i);}_ = rx.recv() => {println!("Task {} aborted", i);}}});handles.push(handle);}// 等待一段时间,然后发送取消信号sleep(Duration::from_secs(3)).await;let _ = tx.send(());// 等待所有任务完成for handle in handles {let _ = handle.await;}println!("All tasks finished.");
}
如上代码所示,我们创建了一个广播频道broadcast::channel(16)
,用于通知任务取消。在每个任务中,我们使用tokio::select!
宏来监听两个异步任务:一个是任务的主体逻辑,另一个是接收取消信号。当接收到取消信号时,任务会打印Task {} aborted
并退出。在代码片段中的sleep(Duration::from_secs(3)).await;
这一块逻辑,完全可以替换为其他逻辑,比如用户输入、网络请求等,使得任务取消更加灵活。
2. 更近一步,多个异步任务执行的时候,其中一个任务失败了,如何取消其他任务的执行?
第一个想法是使用tokio::try_join!
+ tokio::spawn
。
use std::time::Duration;async fn do_stuff_async() -> Result<(), &'static str> {println!("do some async work");Err("do_stuff_async failed")
}async fn more_async_work() -> Result<(), &'static str> {tokio::time::sleep(Duration::from_secs(3)).await;for i in 0..50000 {println!("more here {}", i);}Ok(())
}#[tokio::main]
async fn main() {let f1 = tokio::spawn(do_stuff_async());let f2 = tokio::spawn(more_async_work());let res = tokio::try_join!(f1, f2);match res {Ok((first, second)) => {println!("first = {:?}, second = {:?}", first, second);}Err(err) => {println!("processing failed; error = {}", err);}}
}
这段代码中,我们创建了两个异步任务f1
和f2
,分别执行do_stuff_async
和more_async_work
函数。在f1
任务中,我们在异步函数do_stuff_async
中模拟了一个失败的情况,当异步函数返回Err
时,我们直接返回这个错误,但悲剧的是这个任务的失败并不会影响其他异步任务的执行。想实现预期的效果,貌似只能通过在异步函数中使用tokio::select!
宏来监听取消信号,然后在任务失败时发送取消信号。如:try_join! on parallel tasks讨论的那样。
3. tokio::select!
宏
tokio::select!
如果一个分支如果正在执行且未执行结束的时候,另一个分支执行结束了,如_ = rx.recv() => {}
,那正在执行的其他分支的代码逻辑是不是依旧会继续执行?
在使用tokio::select!
宏时,它允许同时等待多个操作,比如异步操作的完成,并且当其中一个操作完成时,select!
宏会立即处理完成的操作。关于当一个分支(比如一个异步接收操作_ = rx.recv()
)完成时,其他正在执行但尚未完成的分支会发生什么,这里有几个关键点需要理解:
-
立即响应:当使用
tokio::select!
时,一旦其中一个分支完成,select!
宏会立即处理这个分支。这意味着,如果_ = rx.recv()
这个接收操作完成了,它会立即被处理。 -
取消其他分支:对于其他还在执行但未完成的分支,
tokio::select!
会尝试取消它们。这是通过Future
的Drop
特性实现的,当一个Future
被丢弃时,它有机会进行清理工作,这可能包括取消正在进行的操作。不过,这取决于具体的Future
实现是否支持取消。 -
执行逻辑:如果一个分支正在执行且未执行结束的时候,另一个分支执行结束了,那么正在执行的分支会被
select!
尝试取消。但是,如果这个分支的逻辑包含了无法取消的操作或者执行到了一个不可中断的点,那么这部分代码的逻辑可能会继续执行到下一个可以取消的点或执行完毕。 -
编写可取消的代码:在使用
tokio::select!
时,重要的是要意识到,你的异步代码可能会被取消,因此编写可取消的异步代码是一个好习惯。这意味着应该避免长时间运行的同步代码块,并且定期检查操作是否应该被取消。
总结来说,当使用tokio::select!
时,如果一个分支完成了,其他正在执行的分支会被尝试取消。但是,这取决于这些分支的具体实现,以及它们是否支持取消和如何响应取消请求。