目录
- _cancellation_token.py代码
- 代码解释
- 类的初始化
- 取消操作
- 检查取消状态
- 添加回调函数
- 关联异步`Future`对象
- 总结
- 代码示例
- 示例 1:基本的取消操作
- 示例 2:添加回调函数
- 示例 3:检查令牌是否已取消
_cancellation_token.py代码
import threading
from asyncio import Future
from typing import Any, Callable, Listclass CancellationToken:"""A token used to cancel pending async calls"""def __init__(self) -> None:self._cancelled: bool = Falseself._lock: threading.Lock = threading.Lock()self._callbacks: List[Callable[[], None]] = []def cancel(self) -> None:"""Cancel pending async calls linked to this cancellation token."""with self._lock:if not self._cancelled:self._cancelled = Truefor callback in self._callbacks:callback()def is_cancelled(self) -> bool:"""Check if the CancellationToken has been used"""with self._lock:return self._cancelleddef add_callback(self, callback: Callable[[], None]) -> None:"""Attach a callback that will be called when cancel is invoked"""with self._lock:if self._cancelled:callback()else:self._callbacks.append(callback)def link_future(self, future: Future[Any]) -> Future[Any]:"""Link a pending async call to a token to allow its cancellation"""with self._lock:if self._cancelled:future.cancel()else:def _cancel() -> None:future.cancel()self._callbacks.append(_cancel)return future
代码解释
这段Python代码定义了一个名为CancellationToken
的类,其主要功能是提供一种机制,用于取消挂起的异步调用。下面详细解释代码的逻辑和功能:
类的初始化
def __init__(self) -> None:self._cancelled: bool = Falseself._lock: threading.Lock = threading.Lock()self._callbacks: List[Callable[[], None]] = []
_cancelled
:一个布尔类型的私有变量,用于标记该取消令牌是否已经被使用(即是否已经调用了cancel
方法),初始值为False
。_lock
:一个线程锁对象,用于确保在多线程环境下对共享资源(如_cancelled
和_callbacks
)的访问是线程安全的。_callbacks
:一个列表,用于存储当调用cancel
方法时需要执行的回调函数。
取消操作
def cancel(self) -> None:"""Cancel pending async calls linked to this cancellation token."""with self._lock:if not self._cancelled:self._cancelled = Truefor callback in self._callbacks:callback()
cancel
方法用于取消与该取消令牌关联的所有挂起的异步调用。- 使用
with self._lock
语句确保在修改_cancelled
状态和执行回调函数时不会出现竞态条件。 - 只有当
_cancelled
为False
时,才会将其设置为True
,并依次执行_callbacks
列表中的所有回调函数。
检查取消状态
def is_cancelled(self) -> bool:"""Check if the CancellationToken has been used"""with self._lock:return self._cancelled
is_cancelled
方法用于检查该取消令牌是否已经被使用。- 使用
with self._lock
语句确保在读取_cancelled
状态时不会出现竞态条件。 - 返回
_cancelled
的值。
添加回调函数
def add_callback(self, callback: Callable[[], None]) -> None:"""Attach a callback that will be called when cancel is invoked"""with self._lock:if self._cancelled:callback()else:self._callbacks.append(callback)
add_callback
方法用于添加一个回调函数,当调用cancel
方法时,该回调函数将被执行。- 使用
with self._lock
语句确保在检查_cancelled
状态和修改_callbacks
列表时不会出现竞态条件。 - 如果
_cancelled
为True
,说明取消操作已经发生,直接执行回调函数;否则,将回调函数添加到_callbacks
列表中。
关联异步Future
对象
def link_future(self, future: Future[Any]) -> Future[Any]:"""Link a pending async call to a token to allow its cancellation"""with self._lock:if self._cancelled:future.cancel()else:def _cancel() -> None:future.cancel()self._callbacks.append(_cancel)return future
link_future
方法用于将一个异步Future
对象与该取消令牌关联起来,以便可以取消该异步调用。- 使用
with self._lock
语句确保在检查_cancelled
状态和修改_callbacks
列表时不会出现竞态条件。 - 如果
_cancelled
为True
,说明取消操作已经发生,直接取消Future
对象;否则,定义一个内部函数_cancel
,用于取消Future
对象,并将其添加到_callbacks
列表中。 - 最后返回
Future
对象。
总结
CancellationToken
类提供了一种机制,允许用户在需要时取消挂起的异步调用。通过添加回调函数和关联Future
对象,当调用cancel
方法时,所有与该取消令牌关联的操作都将被取消。同时,使用线程锁确保了在多线程环境下的线程安全。
代码示例
示例 1:基本的取消操作
import asyncio
from typing import Any, Callable, List
import threading
from asyncio import Futureclass CancellationToken:"""A token used to cancel pending async calls"""def __init__(self) -> None:self._cancelled: bool = Falseself._lock: threading.Lock = threading.Lock()self._callbacks: List[Callable[[], None]] = []def cancel(self) -> None:"""Cancel pending async calls linked to this cancellation token."""with self._lock:if not self._cancelled:self._cancelled = Truefor callback in self._callbacks:callback()def is_cancelled(self) -> bool:"""Check if the CancellationToken has been used"""with self._lock:return self._cancelleddef add_callback(self, callback: Callable[[], None]) -> None:"""Attach a callback that will be called when cancel is invoked"""with self._lock:if self._cancelled:callback()else:self._callbacks.append(callback)def link_future(self, future: Future[Any]) -> Future[Any]:"""Link a pending async call to a token to allow its cancellation"""with self._lock:if self._cancelled:future.cancel()else:def _cancel() -> None:future.cancel()self._callbacks.append(_cancel)return futureasync def long_running_task():print("Task started")await asyncio.sleep(5)print("Task completed")async def main():token = CancellationToken()task = asyncio.create_task(long_running_task())token.link_future(task)# 模拟一段时间后取消任务await asyncio.sleep(2)token.cancel()try:await taskexcept asyncio.CancelledError:print("Task was cancelled")await main()
Task started
Task was cancelled
示例 2:添加回调函数
def callback_function():print("Callback function was called")async def main():token = CancellationToken()token.add_callback(callback_function)# 取消令牌token.cancel()await main()
Callback function was called
示例 3:检查令牌是否已取消
async def main():token = CancellationToken()print(f"Is cancelled before cancel: {token.is_cancelled()}")token.cancel()print(f"Is cancelled after cancel: {token.is_cancelled()}")await main()
Is cancelled before cancel: False
Is cancelled after cancel: True