欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 房产 > 家装 > Autogen_core: ClosureAgent使用与测试

Autogen_core: ClosureAgent使用与测试

2025/2/1 3:06:10 来源:https://blog.csdn.net/qq_41472205/article/details/145400326  浏览:    关键词:Autogen_core: ClosureAgent使用与测试

目录

      • 第一个示例
      • 第二个示例
      • 完成的功能

下面两个示例展示了如何使用 AutoGen 库中的 ClosureAgent 来创建和使用代理。 ClosureAgent 允许你使用闭包(即一个没有定义类的函数)来定义代理,并从运行时中提取值。代码中展示了两个示例:

第一个示例

import asyncio
from dataclasses import dataclassfrom autogen_core import (ClosureAgent,ClosureContext,DefaultSubscription,DefaultTopicId,MessageContext,SingleThreadedAgentRuntime,
)
"""
The closure agent allows you to define an agent using a closure, or function without needing to define a class. It allows values to be extracted out of the runtime.The closure can define the type of message which is expected, or `Any` can be used to accept any type of message."""
@dataclass
class MyMessage:content: strasync def main():queue = asyncio.Queue[MyMessage]()async def output_result(_ctx: ClosureContext, message: MyMessage, ctx: MessageContext) -> None:await queue.put(message)runtime = SingleThreadedAgentRuntime()await ClosureAgent.register_closure(runtime, "output_result", output_result, subscriptions=lambda: [DefaultSubscription()])runtime.start()await runtime.publish_message(MyMessage("Hello, world!"), DefaultTopicId())await runtime.stop_when_idle()result = await queue.get()print(result)await main()
MyMessage(content='Hello, world!')

这个示例定义了一个简单的代理,它接收一个消息并将其放入一个 asyncio 队列中。

  1. 定义了一个名为 MyMessage 的数据类,用于存储消息内容。
  2. 创建了一个名为 output_result 的异步函数,它接收一个 ClosureContext、一个 MyMessage 和一个 MessageContext,然后将消息放入队列中。
  3. 创建了一个 SingleThreadedAgentRuntime 实例。
  4. 使用 register_closure 方法注册了 output_result 函数作为一个代理。它订阅了默认的主题。
  5. 启动运行时,发布一个 MyMessage 消息,并在空闲时停止运行时。
  6. 从队列中获取结果并打印。

这个示例展示了如何注册一个闭包代理,并在运行时中发送和接收消息。

第二个示例

@dataclass
class Message:content: strasync def test_register_receives_publish() -> None:runtime = SingleThreadedAgentRuntime()queue = asyncio.Queue[tuple[str, str]]()async def log_message(closure_ctx: ClosureContext, message: Message, ctx: MessageContext) -> None:key = closure_ctx.id.keyawait queue.put((key, message.content))await ClosureAgent.register_closure(runtime, "name", log_message, subscriptions=lambda: [DefaultSubscription()])runtime.start()await runtime.publish_message(Message("first message"), topic_id=DefaultTopicId())await runtime.publish_message(Message("second message"), topic_id=DefaultTopicId())await runtime.publish_message(Message("third message"), topic_id=DefaultTopicId())await runtime.stop_when_idle()assert queue.qsize() == 3assert queue.get_nowait() == ("default", "first message")assert queue.get_nowait() == ("default", "second message")assert queue.get_nowait() == ("default", "third message")assert queue.empty()test_register_receives_publish()
<coroutine object test_register_receives_publish at 0x00000150ABEAA640>

这个示例定义了一个测试函数,用于测试注册代理、接收和发布消息的功能。

  1. 定义了一个名为 Message 的数据类,用于存储消息内容。
  2. 创建了一个名为 test_register_receives_publish 的异步测试函数。
  3. 在这个函数中,创建了一个 SingleThreadedAgentRuntime 实例和一个 asyncio 队列。
  4. 定义了一个名为 log_message 的异步函数,它接收一个 ClosureContext、一个 Message 和一个 MessageContext,然后将代理的 ID 和消息内容作为一个元组放入队列中。
  5. 使用 register_closure 方法注册了 log_message 函数作为一个代理,并订阅了默认的主题。
  6. 启动运行时,并发布三个 Message 消息。
  7. 等待运行时空闲后,检查队列中的消息数量和内容,确保有三个消息,并且它们的顺序和内容与发布的消息一致。

这个示例展示了如何测试代理的注册、消息的接收和发布功能。

完成的功能

这两个示例展示了如何使用 ClosureAgentSingleThreadedAgentRuntime 来创建、注册和使用闭包代理。它们展示了如何发送和接收消息,以及如何测试代理的功能。这些示例是理解和学习如何使用 AutoGen 库来构建代理和消息传递系统的基础。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com