欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 财经 > 创投人物 > Autogen_core 测试代码:test_cancellation.py

Autogen_core 测试代码:test_cancellation.py

2025/2/8 6:16:01 来源:https://blog.csdn.net/qq_41472205/article/details/145396565  浏览:    关键词:Autogen_core 测试代码:test_cancellation.py

目录

      • 第一段代码:定义 `LongRunningAgent` 类
        • 主要逻辑:
        • 完成的功能:
      • 第二段代码:定义 `NestingLongRunningAgent` 类
        • 主要逻辑:
        • 完成的功能:
      • 第三段代码:测试取消功能
        • 主要逻辑:
        • 完成的功能:
      • 第四段代码:测试嵌套取消功能(仅外层调用)
        • 主要逻辑:
        • 完成的功能:

第一段代码:定义 LongRunningAgent

import asyncio
from dataclasses import dataclassimport pytest
from autogen_core import (AgentId,AgentInstantiationContext,CancellationToken,MessageContext,RoutedAgent,SingleThreadedAgentRuntime,message_handler,
)@dataclass
class MessageType: ...# Note for future reader:
# To do cancellation, only the token should be interacted with as a user
# If you cancel a future, it may not work as you expect.
class LongRunningAgent(RoutedAgent):def __init__(self) -> None:super().__init__("A long running agent")self.called = Falseself.cancelled = False@message_handlerasync def on_new_message(self, message: MessageType, ctx: MessageContext) -> MessageType:self.called = Truesleep = asyncio.ensure_future(asyncio.sleep(100))ctx.cancellation_token.link_future(sleep)try:await sleepreturn MessageType()except asyncio.CancelledError:self.cancelled = Trueraiseclass NestingLongRunningAgent(RoutedAgent):def __init__(self, nested_agent: AgentId) -> None:super().__init__("A nesting long running agent")self.called = Falseself.cancelled = Falseself._nested_agent = nested_agent@message_handlerasync def on_new_message(self, message: MessageType, ctx: MessageContext) -> MessageType:self.called = Trueresponse = self.send_message(message, self._nested_agent, cancellation_token=ctx.cancellation_token)try:val = await responseassert isinstance(val, MessageType)return valexcept asyncio.CancelledError:self.cancelled = Trueraise

这段代码定义了一个名为 LongRunningAgent 的类,它继承自 RoutedAgent。这个类的主要功能是处理长时间运行的任务,并支持取消操作。

主要逻辑:
  1. 初始化:在 __init__ 方法中,设置了代理的名称,并初始化了两个标志 calledcancelled,用于跟踪任务是否被调用和是否被取消。

  2. 消息处理:通过装饰器 @message_handler 定义了一个异步方法 on_new_message,用于处理接收到的消息。

    • 设置长时间运行任务:使用 asyncio.sleep(100) 创建一个长时间运行的任务,并将其封装为 asyncio.Future 对象。

    • 链接取消令牌:通过 ctx.cancellation_token.link_future(sleep) 将取消令牌与长时间运行的任务链接起来,这样当令牌被取消时,任务也会被取消。

    • 等待任务完成:使用 await sleep 等待任务完成。

    • 处理取消:如果任务在等待过程中被取消(即接收到 asyncio.CancelledError),则设置 cancelled 标志为 True 并重新抛出异常。

完成的功能:
  • 定义了一个能够处理长时间运行任务的代理。

  • 支持通过取消令牌取消长时间运行的任务。

第二段代码:定义 NestingLongRunningAgent

async def test_cancellation_with_token() -> None:runtime = SingleThreadedAgentRuntime()await LongRunningAgent.register(runtime, "long_running", LongRunningAgent)agent_id = AgentId("long_running", key="default")token = CancellationToken()response = asyncio.create_task(runtime.send_message(MessageType(), recipient=agent_id, cancellation_token=token))assert not response.done()while runtime.unprocessed_messages_count == 0:await asyncio.sleep(0.01)await runtime._process_next()  # type: ignoretoken.cancel()with pytest.raises(asyncio.CancelledError):await responseassert response.done()long_running_agent = await runtime.try_get_underlying_agent_instance(agent_id, type=LongRunningAgent)assert long_running_agent.calledassert long_running_agent.cancelled
await test_cancellation_with_token()

这段代码定义了一个名为 NestingLongRunningAgent 的类,它也继承自 RoutedAgent。这个类的主要功能是嵌套调用另一个代理,并处理长时间运行的任务。

主要逻辑:
  1. 初始化:在 __init__ 方法中,设置了代理的名称,并初始化了两个标志 calledcancelled,同时接收一个嵌套代理的 AgentId

  2. 消息处理:通过装饰器 @message_handler 定义了一个异步方法 on_new_message,用于处理接收到的消息。

    • 调用嵌套代理:使用 self.send_message 向嵌套代理发送消息,并将取消令牌传递给嵌套代理。

    • 等待响应:使用 await response 等待嵌套代理的响应。

    • 处理取消:如果嵌套代理在等待过程中被取消(即接收到 asyncio.CancelledError),则设置 cancelled 标志为 True 并重新抛出异常。

完成的功能:
  • 定义了一个能够嵌套调用另一个代理的代理。

  • 支持通过取消令牌取消嵌套代理的任务。

第三段代码:测试取消功能

async def test_nested_cancellation_only_outer_called() -> None:runtime = SingleThreadedAgentRuntime()await LongRunningAgent.register(runtime, "long_running", LongRunningAgent)await NestingLongRunningAgent.register(runtime,"nested",lambda: NestingLongRunningAgent(AgentId("long_running", key=AgentInstantiationContext.current_agent_id().key)),)long_running_id = AgentId("long_running", key="default")nested_id = AgentId("nested", key="default")token = CancellationToken()response = asyncio.create_task(runtime.send_message(MessageType(), nested_id, cancellation_token=token))assert not response.done()while runtime.unprocessed_messages_count == 0:await asyncio.sleep(0.01)await runtime._process_next()  # type: ignoretoken.cancel()with pytest.raises(asyncio.CancelledError):await responseassert response.done()nested_agent = await runtime.try_get_underlying_agent_instance(nested_id, type=NestingLongRunningAgent)assert nested_agent.calledassert nested_agent.cancelledlong_running_agent = await runtime.try_get_underlying_agent_instance(long_running_id, type=LongRunningAgent)assert long_running_agent.called is Falseassert long_running_agent.cancelled is Falseawait test_nested_cancellation_only_outer_called()

这段代码定义了一个异步函数 test_cancellation_with_token,用于测试 LongRunningAgent 的取消功能。

主要逻辑:
  1. 初始化运行时环境:创建一个 SingleThreadedAgentRuntime 实例。

  2. 注册代理:注册 LongRunningAgent

  3. 创建取消令牌:创建一个 CancellationToken 实例。

  4. 发送消息:使用 runtime.send_message 向代理发送消息,并将取消令牌传递给代理。

  5. 等待代理开始处理消息:使用 while 循环等待代理开始处理消息。

  6. 取消任务:调用 token.cancel() 取消任务。

  7. 检查结果:检查任务是否被取消,并验证代理的 calledcancelled 标志。

完成的功能:
  • 测试 LongRunningAgent 的取消功能。

  • 验证代理在取消操作后,能够正确地抛出 asyncio.CancelledError 异常。

  • 验证代理的 calledcancelled 标志是否正确设置。

第四段代码:测试嵌套取消功能(仅外层调用)


async def test_nested_cancellation_inner_called() -> None:runtime = SingleThreadedAgentRuntime()await LongRunningAgent.register(runtime, "long_running", LongRunningAgent)await NestingLongRunningAgent.register(runtime,"nested",lambda: NestingLongRunningAgent(AgentId("long_running", key=AgentInstantiationContext.current_agent_id().key)),)long_running_id = AgentId("long_running", key="default")nested_id = AgentId("nested", key="default")token = CancellationToken()response = asyncio.create_task(runtime.send_message(MessageType(), nested_id, cancellation_token=token))assert not response.done()while runtime.unprocessed_messages_count == 0:await asyncio.sleep(0.01)await runtime._process_next()  # type: ignore# allow the inner agent to processawait runtime._process_next()  # type: ignoretoken.cancel()with pytest.raises(asyncio.CancelledError):await responseassert response.done()nested_agent = await runtime.try_get_underlying_agent_instance(nested_id, type=NestingLongRunningAgent)assert nested_agent.calledassert nested_agent.cancelledlong_running_agent = await runtime.try_get_underlying_agent_instance(long_running_id, type=LongRunningAgent)assert long_running_agent.calledassert long_running_agent.cancelledawait test_nested_cancellation_inner_called()

这段代码定义了一个异步函数 test_nested_cancellation_only_outer_called,用于测试 NestingLongRunningAgent 的取消功能,仅取消外层代理。

主要逻辑:
  1. 初始化运行时环境:创建一个 SingleThreadedAgentRuntime 实例。

  2. 注册代理:注册 LongRunningAgentNestingLongRunningAgent

  3. 创建取消令牌:创建一个 CancellationToken 实例。

  4. 发送消息:使用 runtime.send_message 向嵌套代理发送消息,并将取消令牌传递给代理。

  5. 等待代理开始处理消息:使用 while 循环等待代理开始处理消息。

  6. 取消任务:调用 token.cancel() 取消任务。

  7. 检查结果:检查任务是否被取消,并验证代理的 calledcancelled 标志。

完成的功能:
  • 嵌套代理的取消传播
    测试当一个父任务(NestingLongRunningAgent)被取消时,其内部的子任务(LongRunningAgent)是否也会被正确取消。

    • 父任务(nested)启动后触发子任务(long_running)。

    • 通过 CancellationToken 取消父任务,观察子任务是否同步取消。

  • 异步任务的状态验证
    确保任务取消后,所有相关代理(Agent)的状态正确更新:

    • called:标记代理是否被调用。

    • cancelled:标记代理是否被取消。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com