欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 房产 > 建筑 > Autogen_core源码:_cancellation_token.py

Autogen_core源码:_cancellation_token.py

2025/2/2 9:46:06 来源:https://blog.csdn.net/qq_41472205/article/details/145413567  浏览:    关键词:Autogen_core源码:_cancellation_token.py

目录

    • _cancellation_token.py代码
    • 代码解释
      • 类的初始化
      • 取消操作
      • 检查取消状态
      • 添加回调函数
      • 关联异步`Future`对象
      • 总结
    • 代码示例
      • 示例 1:基本的取消操作
      • 示例 2:添加回调函数
      • 示例 3:检查令牌是否已取消

_cancellation_token.py代码

import threading
from asyncio import Future
from typing import Any, Callable, Listclass CancellationToken:"""A token used to cancel pending async calls"""def __init__(self) -> None:self._cancelled: bool = Falseself._lock: threading.Lock = threading.Lock()self._callbacks: List[Callable[[], None]] = []def cancel(self) -> None:"""Cancel pending async calls linked to this cancellation token."""with self._lock:if not self._cancelled:self._cancelled = Truefor callback in self._callbacks:callback()def is_cancelled(self) -> bool:"""Check if the CancellationToken has been used"""with self._lock:return self._cancelleddef add_callback(self, callback: Callable[[], None]) -> None:"""Attach a callback that will be called when cancel is invoked"""with self._lock:if self._cancelled:callback()else:self._callbacks.append(callback)def link_future(self, future: Future[Any]) -> Future[Any]:"""Link a pending async call to a token to allow its cancellation"""with self._lock:if self._cancelled:future.cancel()else:def _cancel() -> None:future.cancel()self._callbacks.append(_cancel)return future

代码解释

这段Python代码定义了一个名为CancellationToken的类,其主要功能是提供一种机制,用于取消挂起的异步调用。下面详细解释代码的逻辑和功能:

类的初始化

def __init__(self) -> None:self._cancelled: bool = Falseself._lock: threading.Lock = threading.Lock()self._callbacks: List[Callable[[], None]] = []
  • _cancelled:一个布尔类型的私有变量,用于标记该取消令牌是否已经被使用(即是否已经调用了cancel方法),初始值为False
  • _lock:一个线程锁对象,用于确保在多线程环境下对共享资源(如_cancelled_callbacks)的访问是线程安全的。
  • _callbacks:一个列表,用于存储当调用cancel方法时需要执行的回调函数。

取消操作

def cancel(self) -> None:"""Cancel pending async calls linked to this cancellation token."""with self._lock:if not self._cancelled:self._cancelled = Truefor callback in self._callbacks:callback()
  • cancel方法用于取消与该取消令牌关联的所有挂起的异步调用。
  • 使用with self._lock语句确保在修改_cancelled状态和执行回调函数时不会出现竞态条件。
  • 只有当_cancelledFalse时,才会将其设置为True,并依次执行_callbacks列表中的所有回调函数。

检查取消状态

def is_cancelled(self) -> bool:"""Check if the CancellationToken has been used"""with self._lock:return self._cancelled
  • is_cancelled方法用于检查该取消令牌是否已经被使用。
  • 使用with self._lock语句确保在读取_cancelled状态时不会出现竞态条件。
  • 返回_cancelled的值。

添加回调函数

def add_callback(self, callback: Callable[[], None]) -> None:"""Attach a callback that will be called when cancel is invoked"""with self._lock:if self._cancelled:callback()else:self._callbacks.append(callback)
  • add_callback方法用于添加一个回调函数,当调用cancel方法时,该回调函数将被执行。
  • 使用with self._lock语句确保在检查_cancelled状态和修改_callbacks列表时不会出现竞态条件。
  • 如果_cancelledTrue,说明取消操作已经发生,直接执行回调函数;否则,将回调函数添加到_callbacks列表中。

关联异步Future对象

def link_future(self, future: Future[Any]) -> Future[Any]:"""Link a pending async call to a token to allow its cancellation"""with self._lock:if self._cancelled:future.cancel()else:def _cancel() -> None:future.cancel()self._callbacks.append(_cancel)return future
  • link_future方法用于将一个异步Future对象与该取消令牌关联起来,以便可以取消该异步调用。
  • 使用with self._lock语句确保在检查_cancelled状态和修改_callbacks列表时不会出现竞态条件。
  • 如果_cancelledTrue,说明取消操作已经发生,直接取消Future对象;否则,定义一个内部函数_cancel,用于取消Future对象,并将其添加到_callbacks列表中。
  • 最后返回Future对象。

总结

CancellationToken类提供了一种机制,允许用户在需要时取消挂起的异步调用。通过添加回调函数和关联Future对象,当调用cancel方法时,所有与该取消令牌关联的操作都将被取消。同时,使用线程锁确保了在多线程环境下的线程安全。

代码示例

示例 1:基本的取消操作

import asyncio
from typing import Any, Callable, List
import threading
from asyncio import Futureclass CancellationToken:"""A token used to cancel pending async calls"""def __init__(self) -> None:self._cancelled: bool = Falseself._lock: threading.Lock = threading.Lock()self._callbacks: List[Callable[[], None]] = []def cancel(self) -> None:"""Cancel pending async calls linked to this cancellation token."""with self._lock:if not self._cancelled:self._cancelled = Truefor callback in self._callbacks:callback()def is_cancelled(self) -> bool:"""Check if the CancellationToken has been used"""with self._lock:return self._cancelleddef add_callback(self, callback: Callable[[], None]) -> None:"""Attach a callback that will be called when cancel is invoked"""with self._lock:if self._cancelled:callback()else:self._callbacks.append(callback)def link_future(self, future: Future[Any]) -> Future[Any]:"""Link a pending async call to a token to allow its cancellation"""with self._lock:if self._cancelled:future.cancel()else:def _cancel() -> None:future.cancel()self._callbacks.append(_cancel)return futureasync def long_running_task():print("Task started")await asyncio.sleep(5)print("Task completed")async def main():token = CancellationToken()task = asyncio.create_task(long_running_task())token.link_future(task)# 模拟一段时间后取消任务await asyncio.sleep(2)token.cancel()try:await taskexcept asyncio.CancelledError:print("Task was cancelled")await main()
Task started
Task was cancelled

示例 2:添加回调函数

def callback_function():print("Callback function was called")async def main():token = CancellationToken()token.add_callback(callback_function)# 取消令牌token.cancel()await main()
Callback function was called

示例 3:检查令牌是否已取消


async def main():token = CancellationToken()print(f"Is cancelled before cancel: {token.is_cancelled()}")token.cancel()print(f"Is cancelled after cancel: {token.is_cancelled()}")await main()
Is cancelled before cancel: False
Is cancelled after cancel: True

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com