高级异步编程
在 Rust 精通篇中,我们将深入探索 Rust 的高级异步编程技术。Rust 的异步编程模型基于 Future 特征和异步运行时,提供了高效的非阻塞 I/O 和并发处理能力。在本章中,我们将超越基础知识,探索如何构建高性能异步系统和自定义执行器。
异步编程回顾
在深入高级主题之前,让我们简要回顾 Rust 的异步编程模型:
use std::time::Duration;
use tokio::time;#[tokio::main]
async fn main() {// 创建两个异步任务let task1 = async {time::sleep(Duration::from_millis(100)).await;println!("任务 1 完成");1};let task2 = async {time::sleep(Duration::from_millis(50)).await;println!("任务 2 完成");2};// 并发执行两个任务let (result1, result2) = tokio::join!(task1, task2);println!("结果: {} + {} = {}", result1, result2, result1 + result2);
}
Rust 的异步编程基于以下核心概念:
- Future 特征:表示可能尚未完成的计算
- async/await 语法:简化异步代码的编写
- 异步运行时:如 Tokio、async-std 等,负责执行和调度异步任务
- 任务(Task):可独立调度的异步执行单元
Future 深入理解
Future 特征的内部机制
Future
特征是 Rust 异步编程的核心:
pub trait Future {type Output;fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
poll
方法是 Future 的核心,它尝试将异步计算推进到完成状态:
- 如果 Future 已完成,返回
Poll::Ready(result)
- 如果 Future 尚未完成,返回
Poll::Pending
并安排在事件发生时重新调用poll
手动实现 Future
下面是一个简单的 Future 实现示例:
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};struct Delay {when: Instant,
}impl Future for Delay {type Output = ();fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {if Instant::now() >= self.when {println!("Future 已完成");Poll::Ready(())} else {// 安排在未来某个时刻重新调用 polllet waker = cx.waker().clone();let when = self.when;std::thread::spawn(move || {let now = Instant::now();if now < when {std::thread::sleep(when - now);}waker.wake();});println!("Future 尚未完成");Poll::Pending}}
}#[tokio::main]
async fn main() {let future = Delay {when: Instant::now() + Duration::from_secs(1),};println!("等待 Future 完成...");future.await;println!("主函数结束");
}
Pin 和 Unpin
Pin
类型在异步 Rust 中至关重要,它防止自引用结构在内存中被移动:
use std::marker::PhantomPinned;
use std::pin::Pin;// 自引用结构体
struct SelfReferential {data: String,// 指向 data 字段的指针ptr_to_data: *const String,// 标记此类型不能安全地实现 Unpin_marker: PhantomPinned,
}impl SelfReferential {fn new(data: String) -> Pin<Box<Self>> {let mut boxed = Box::new(SelfReferential {data,ptr_to_data: std::ptr::null(),_marker: PhantomPinned,});let ptr = &boxed.data as *const String;boxed.ptr_to_data = ptr;// 将 Box 转换为 Pin<Box>Pin::new(boxed)}fn get_data(self: Pin<&Self>) -> &str {// 安全:数据不会被移动,因为它被固定了let self_ref = unsafe { self.get_ref() };&self_ref.data}fn get_ptr(self: Pin<&Self>) -> *const String {self.ptr_to_data}
}fn main() {let pinned = SelfReferential::new("hello".to_string());// 验证指针确实指向数据let data_ptr = &pinned.data as *const String;let ptr = pinned.as_ref().get_ptr();println!("数据指针: {:?}", data_ptr);println!("存储的指针: {:?}", ptr);println!("数据: {}", pinned.as_ref().get_data());assert_eq!(data_ptr, ptr);
}
异步运行时深入剖析
执行器(Executor)工作原理
异步执行器负责调度和运行 Future,下面是一个简单执行器的实现:
use futures::future::BoxFuture;
use futures::task::{waker_ref, ArcWake};
use futures::Future;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{channel, Sender, Receiver};
use std::task::{Context, Poll};// 任务结构,包含一个 Future
struct Task {future: Mutex<Option<BoxFuture<'static, ()>>>,sender: Sender<Arc<Task>>,
}impl ArcWake for Task {fn wake_by_ref(arc_self: &Arc<Self>) {// 将自己发送到任务队列,以便重新执行let cloned = arc_self.clone();arc_self.sender.send(cloned).expect("任务队列已满");}
}// 简单的执行器
struct Executor {sender: Sender<Arc<Task>>,receiver: Receiver<Arc<Task>>,
}impl Executor {fn new() -> Self {let (sender, receiver) = channel();Executor { sender, receiver }}// 生成新任务fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {let task = Arc::new(Task {future: Mutex::new(Some(Box::pin(future))),sender: self.sender.clone(),});self.sender.send(task).expect("任务队列已满");}// 运行执行器fn run(&self) {while let Ok(task) = self.receiver.recv() {// 创建 waker 和上下文let waker = waker_ref(&task);let mut context = Context::from_waker(&waker);// 尝试推进 Futurelet mut future_slot = task.future.lock().unwrap();if let Some(mut future) = future_slot.take() {match Future::poll(Pin::new(&mut future), &mut context) {Poll::Pending => {// Future 尚未完成,放回任务中*future_slot = Some(future);}Poll::Ready(()) => {// Future 已完成,丢弃它// 不需要放回 future_slot}}}}}
}fn main() {let executor = Executor::new();// 生成一些任务executor.spawn(async {println!("任务 1 开始");// 模拟异步操作futures::future::ready(()).await;println!("任务 1 完成");});executor.spawn(async {println!("任务 2 开始");// 模拟异步操作futures::future::ready(()).await;println!("任务 2 完成");});// 运行执行器executor.run();
}
事件循环与反应器(Reactor)
完整的异步运行时通常包含执行器和反应器两部分:
- 执行器:负责调度和运行 Future
- 反应器:负责监听 I/O 事件并唤醒相关任务
下面是一个简化的反应器示例:
use mio::{Events, Interest, Poll, Token};
use mio::net::TcpListener;
use std::collections::HashMap;
use std::io;
use std::sync::{Arc, Mutex};
use std::task::Waker;// 简化的反应器
struct Reactor {poll: Poll,wakers: Mutex<HashMap<Token, Waker>>,
}impl Reactor {fn new() -> io::Result<Self> {Ok(Reactor {poll: Poll::new()?,wakers: Mutex::new(HashMap::new()),})}// 注册 I/O 资源和唤醒器fn register(&self, source: &mut TcpListener, token: Token, waker: Waker) -> io::Result<()> {self.poll.registry().register(source, token, Interest::READABLE)?;self.wakers.lock().unwrap().insert(token, waker);Ok(())}// 运行一次事件循环fn run_once(&self) -> io::Result<()> {let mut events = Events::with_capacity(1024);self.poll.poll(&mut events, None)?;for event in events.iter() {if let Some(waker) = self.wakers.lock().unwrap().get(&event.token()) {waker.wake_by_ref();}}Ok(())}
}
高级异步模式
流(Stream)处理
Stream
特征类似于 Future
,但可以产生多个值:
use futures::stream::{self, StreamExt};#[tokio::main]
async fn main() {// 创建一个简单的流let mut stream = stream::iter(1..=5);// 使用 next 方法逐个处理流中的元素while let Some(value) = stream.next().await {println!("值: {}", value);}// 使用组合子处理流let sum = stream::iter(1..=10).map(|x| x * 2).filter(|x| futures::future::ready(*x % 3 == 0)).fold(0, |acc, x| async move { acc + x }).await;println!("总和: {}", sum);
}
并发控制模式
信号量(Semaphore)
信号量用于限制并发任务数量:
use std::time::Duration;
use tokio::sync::Semaphore;
use tokio::time;#[tokio::main]
async fn main() {// 创建一个容量为 3 的信号量let semaphore = Arc::new(Semaphore::new(3));let mut handles = vec![];for id in 0..8 {let semaphore = Arc::clone(&semaphore);let handle = tokio::spawn(async move {// 获取许可let permit = semaphore.acquire().await.unwrap();println!("任务 {} 获取许可,开始执行", id);// 模拟工作time::sleep(Duration::from_secs(2)).await;println!("任务 {} 完成", id);// 许可在 permit 被丢弃时自动释放drop(permit);});handles.push(handle);}// 等待所有任务完成for handle in handles {handle.await.unwrap();}
}
超时处理
为异步操作设置超时:
use std::time::Duration;
use tokio::time;async fn long_running_task() -> String {time::sleep(Duration::from_secs(5)).await;"任务完成".to_string()
}#[tokio::main]
async fn main() {// 使用 timeout 包装异步操作match time::timeout(Duration::from_secs(2), long_running_task()).await {Ok(result) => println!("任务结果: {}", result),Err(_) => println!("任务超时"),}// 使用 select! 实现超时tokio::select! {result = long_running_task() => {println!("任务结果: {}", result);}_ = time::sleep(Duration::from_secs(2)) => {println!("任务超时");}}
}
取消和超时
在 Tokio 中实现任务取消:
use tokio::sync::oneshot;
use tokio::time;
use std::time::Duration;async fn cancelable_task(mut cancel_rx: oneshot::Receiver<()>) -> Option<String> {tokio::select! {_ = &mut cancel_rx => {println!("任务被取消");None}_ = async {// 模拟长时间运行的任务time::sleep(Duration::from_secs(3)).await;"任务完成".to_string()} => Some("任务完成".to_string()),}
}#[tokio::main]
async fn main() {// 创建取消通道let (cancel_tx, cancel_rx) = oneshot::channel();// 生成可取消的任务let handle = tokio::spawn(cancelable_task(cancel_rx));// 等待一段时间后取消任务time::sleep(Duration::from_secs(1)).await;cancel_tx.send(()).unwrap();// 等待任务完成let result = handle.await.unwrap();println!("结果: {:?}", result);
}
自定义异步 I/O
实现异步读取器(AsyncRead)
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::AsyncRead;// 自定义异步读取器
struct MyAsyncReader {buffer: Vec<u8>,position: usize,
}impl MyAsyncReader {fn new(data: Vec<u8>) -> Self {MyAsyncReader {buffer: data,position: 0,}}
}impl AsyncRead for MyAsyncReader {fn poll_read(mut self: Pin<&mut Self>,_cx: &mut Context<'_>,buf: &mut tokio::io::ReadBuf<'_>,) -> Poll<io::Result<()>> {if self.position >= self.buffer.len() {return Poll::Ready(Ok(()));}let remaining = &self.buffer[self.position..];let amt = std::cmp::min(remaining.len(), buf.remaining());buf.put_slice(&remaining[..amt]);self.position += amt;Poll::Ready(Ok(()))}
}#[tokio::main]
async fn main() -> io::Result<()> {use tokio::io::AsyncReadExt;let data = b"Hello, async world!";let mut reader = MyAsyncReader::new(data.to_vec());let mut buffer = Vec::new();reader.read_to_end(&mut buffer).await?;println!("读取的数据: {}", String::from_utf8_lossy(&buffer));Ok(())
}
实现异步写入器(AsyncWrite)
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::AsyncWrite;// 自定义异步写入器
struct MyAsyncWriter {buffer: Vec<u8>,
}impl MyAsyncWriter {fn new() -> Self {MyAsyncWriter {buffer: Vec::new(),}}fn get_written_data(&self) -> &[u8] {&self.buffer}
}impl AsyncWrite for MyAsyncWriter {fn poll_write(mut self: Pin<&mut Self>,_cx: &mut Context<'_>,buf: &[u8],) -> Poll<io::Result<usize>> {self.buffer.extend_from_slice(buf);Poll::Ready(Ok(buf.len()))}fn poll_flush(self: Pin<&mut Self>,_cx: &mut Context<'_>,) -> Poll<io::Result<()>> {Poll::Ready(Ok(()))}fn poll_shutdown(self: Pin<&mut Self>,_cx: &mut Context<'_>,) -> Poll<io::Result<()>> {Poll::Ready(Ok(()))}
}#[tokio::main]
async fn main() -> io::Result<()> {use tokio::io::AsyncWriteExt;let mut writer = MyAsyncWriter::new();writer.write_all(b"Hello, async world!").await?;println!("写入的数据: {}", String::from_utf8_lossy(writer.get_written_data()));Ok(())
}
异步设计模式
背压(Backpressure)处理
背压是指当生产者产生数据的速度超过消费者处理的速度时,系统需要限制生产者的速度或缓冲数据:
use tokio::sync::mpsc;
use tokio::time;
use std::time::Duration;#[tokio::main]
async fn main() {// 创建有界通道,提供背压机制let (tx, mut rx) = mpsc::channel(5);// 生产者任务let producer = tokio::spawn(async move {for i in 1..=20 {// 当通道已满时,send 会等待,实现背压if let Err(e) = tx.send(i).await {println!("发送错误: {}", e);return;}println!("发送: {}", i);time::sleep(Duration::from_millis(100)).await;}});// 消费者任务let consumer = tokio::spawn(async move {while let Some(value) = rx.recv().await {println!("接收: {}", value);// 消费者处理较慢time::sleep(Duration::from_millis(300)).await;}});// 等待任务完成let _ = tokio::join!(producer, consumer);
}
优雅关闭(Graceful Shutdown)
实现异步系统的优雅关闭:
use tokio::signal;
use tokio::sync::{mpsc, oneshot};
use tokio::time;
use std::time::Duration;// 工作任务结构
struct Worker {id: usize,shutdown_rx: oneshot::Receiver<()>,
}impl Worker {async fn run(&mut self) {loop {tokio::select! {_ = &mut self.shutdown_rx => {println!("工作者 {} 收到关闭信号", self.id);// 执行清理操作time::sleep(Duration::from_millis(500)).await;println!("工作者 {} 已清理资源并关闭", self.id);return;}_ = time::sleep(Duration::from_secs(1)) => {println!("工作者 {} 正在处理...", self.id);}}}}
}#[tokio::main]
async fn main() {// 创建关闭通知通道let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);// 启动工作者let mut worker_handles = vec![];let mut shutdown_senders = vec![];for i in 0..3 {let (tx, rx) = oneshot::channel();shutdown_senders.push(tx);let mut worker = Worker {id: i,shutdown_rx: rx,};worker_handles.push(tokio::spawn(async move {worker.run().await;}));}// 等待关闭信号tokio::select! {_ = signal::ctrl_c() => {println!("收到 Ctrl+C,开始优雅关闭");}_ = time::sleep(Duration::from_secs(5)) => {println!("模拟关闭信号,开始优雅关闭");}}// 发送关闭信号给所有工作者for tx in shutdown_senders {let _ = tx.send(());}// 通知关闭监听器let _ = shutdown_tx.send(()).await;// 等待所有工作者完成for handle in worker_handles {let _ = handle.await;}println!("所有工作者已关闭,程序退出");
}
性能优化技术
任务本地存储(Task-Local Storage)
use tokio::task::LocalKey;
use std::cell::RefCell;thread_local! {static COUNTER: RefCell<u32> = RefCell::new(0);
}#[tokio::main]
async fn main() {// 在主任务中访问COUNTER.with(|counter| {*counter.borrow_mut() += 1;println!("主任务计数: {}", *counter.borrow());});// 在子任务中访问let task1 = tokio::spawn(async {COUNTER.with(|counter| {*counter.borrow_mut() += 1;println!("任务 1 计数: {}", *counter.borrow());});});let task2 = tokio::spawn(async {COUNTER.with(|counter| {*counter.borrow_mut() += 1;println!("任务 2 计数: {}", *counter.borrow());});});let _ = tokio::join!(task1, task2);// 再次在主任务中访问COUNTER.with(|counter| {println!("最终主任务计数: {}", *counter.borrow());});
}
异步栈展开(Async Stack Unwinding)
use std::panic;
use tokio::task;async fn might_panic(should_panic: bool) -> Result<(), String> {if should_panic {panic!("任务发生恐慌");}Ok(())
}#[tokio::main]
async fn main() {// 设置恐慌钩子panic::set_hook(Box::new(|info| {println!("捕获到恐慌: {}", info);}));// 使用 catch_unwind 捕获异步任务中的恐慌let result = task::spawn(async {let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {might_panic(true)}));match result {Ok(future) => {// 继续处理 futureprintln!("Future 创建成功");future}Err(e) => {println!("捕获到恐慌: {:?}", e);async { Ok(()) }}}}).await;println!("任务结果: {:?}", result);// 另一种方法:使用 tokio::spawn 的错误处理let handle = task::spawn(async {might_panic(true).await});match handle.await {Ok(result) => println!("任务成功: {:?}", result),Err(e) => println!("任务失败: {}", e),}
}
实际应用案例
构建高性能 HTTP 服务器
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use std::error::Error;async fn handle_connection(mut stream: TcpStream) -> Result<(), Box<dyn Error>> {let mut buffer = [0; 1024];// 读取请求let n = stream.read(&mut buffer).await?;let request = String::from_utf8_lossy(&buffer[..n]);println!("收到请求:\n{}", request);// 构造响应let response = "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 12\r\n\r\nHello, Rust!";// 发送响应stream.write_all(response.as_bytes()).await?;stream.flush().await?;