欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 美食 > Autogen_core源码:_subscription.py

Autogen_core源码:_subscription.py

2025/2/3 1:00:52 来源:https://blog.csdn.net/qq_41472205/article/details/145416446  浏览:    关键词:Autogen_core源码:_subscription.py

目录

    • _subscription.py代码
    • 代码解释
      • 代码逻辑解释
        • 1. 导入模块
        • 2. 定义`Subscription`协议
        • 3. 定义`id`属性
        • 4. 定义`__eq__`方法
        • 5. 定义`is_match`方法
        • 6. 定义`map_to_agent`方法
        • 7. 定义`UnboundSubscription`别名
      • 功能总结
    • 代码示例
      • 1. Implementing the `Subscription` Protocol
      • 2. Using the `UnboundSubscription` Helper Alias
      • 3. Asynchronous `UnboundSubscription` Example

_subscription.py代码

from __future__ import annotationsfrom typing import Awaitable, Callable, Protocol, runtime_checkablefrom ._agent_id import AgentId
from ._topic import TopicId@runtime_checkable
class Subscription(Protocol):"""Subscriptions define the topics that an agent is interested in."""@propertydef id(self) -> str:"""Get the ID of the subscription.Implementations should return a unique ID for the subscription. Usually this is a UUID.Returns:str: ID of the subscription."""...def __eq__(self, other: object) -> bool:"""Check if two subscriptions are equal.Args:other (object): Other subscription to compare against.Returns:bool: True if the subscriptions are equal, False otherwise."""if not isinstance(other, Subscription):return Falsereturn self.id == other.iddef is_match(self, topic_id: TopicId) -> bool:"""Check if a given topic_id matches the subscription.Args:topic_id (TopicId): TopicId to check.Returns:bool: True if the topic_id matches the subscription, False otherwise."""...def map_to_agent(self, topic_id: TopicId) -> AgentId:"""Map a topic_id to an agent. Should only be called if `is_match` returns True for the given topic_id.Args:topic_id (TopicId): TopicId to map.Returns:AgentId: ID of the agent that should handle the topic_id.Raises:CantHandleException: If the subscription cannot handle the topic_id."""...# Helper alias to represent the lambdas used to define subscriptions
UnboundSubscription = Callable[[], list[Subscription] | Awaitable[list[Subscription]]]

代码解释

这段Python代码定义了一个名为Subscription的协议(Protocol),并创建了一个辅助别名UnboundSubscription,下面详细解释其逻辑和功能。

代码逻辑解释

1. 导入模块
from __future__ import annotations
from typing import Awaitable, Callable, Protocol, runtime_checkable
from._agent_id import AgentId
from._topic import TopicId
  • from __future__ import annotations:允许在类型注解中使用前向引用,即可以在定义类型时引用尚未定义的类或类型。
  • AwaitableCallableProtocolruntime_checkable:这些是Python标准库typing模块中的类型和装饰器。Awaitable用于表示可等待对象,Callable用于表示可调用对象,Protocol用于定义协议,runtime_checkable用于将协议标记为可在运行时检查。
  • AgentIdTopicId:从._agent_id._topic模块中导入的自定义类型,分别表示代理ID和主题ID。
2. 定义Subscription协议
@runtime_checkable
class Subscription(Protocol):"""Subscriptions define the topics that an agent is interested in."""
  • @runtime_checkable:将Subscription协议标记为可在运行时检查,这意味着可以使用isinstance()函数来检查一个对象是否实现了该协议。
  • class Subscription(Protocol):定义了一个名为Subscription的协议,协议类似于接口,它定义了一组方法和属性,但不提供具体的实现。
3. 定义id属性
    @propertydef id(self) -> str:"""Get the ID of the subscription.Implementations should return a unique ID for the subscription. Usually this is a UUID.Returns:str: ID of the subscription."""...
  • @property:将id方法转换为属性,使得可以像访问属性一样访问该方法。
  • def id(self) -> str:定义了一个抽象方法,要求实现该协议的类必须提供一个id属性,该属性返回一个字符串类型的订阅ID。
4. 定义__eq__方法
    def __eq__(self, other: object) -> bool:"""Check if two subscriptions are equal.Args:other (object): Other subscription to compare against.Returns:bool: True if the subscriptions are equal, False otherwise."""if not isinstance(other, Subscription):return Falsereturn self.id == other.id
  • def __eq__(self, other: object) -> bool:定义了__eq__方法,用于比较两个Subscription对象是否相等。
  • if not isinstance(other, Subscription): return False:如果other不是Subscription类型的对象,则返回False
  • return self.id == other.id:如果otherSubscription类型的对象,则比较两个对象的id属性是否相等。
5. 定义is_match方法
    def is_match(self, topic_id: TopicId) -> bool:"""Check if a given topic_id matches the subscription.Args:topic_id (TopicId): TopicId to check.Returns:bool: True if the topic_id matches the subscription, False otherwise."""...
  • def is_match(self, topic_id: TopicId) -> bool:定义了一个抽象方法,要求实现该协议的类必须提供一个is_match方法,该方法接受一个TopicId类型的参数,并返回一个布尔值,表示该主题ID是否与订阅匹配。
6. 定义map_to_agent方法
    def map_to_agent(self, topic_id: TopicId) -> AgentId:"""Map a topic_id to an agent. Should only be called if `is_match` returns True for the given topic_id.Args:topic_id (TopicId): TopicId to map.Returns:AgentId: ID of the agent that should handle the topic_id.Raises:CantHandleException: If the subscription cannot handle the topic_id."""...
  • def map_to_agent(self, topic_id: TopicId) -> AgentId:定义了一个抽象方法,要求实现该协议的类必须提供一个map_to_agent方法,该方法接受一个TopicId类型的参数,并返回一个AgentId类型的结果,表示应该处理该主题ID的代理ID。
  • Raises: CantHandleException: If the subscription cannot handle the topic_id.:如果订阅无法处理该主题ID,则抛出CantHandleException异常。
7. 定义UnboundSubscription别名
UnboundSubscription = Callable[[], list[Subscription] | Awaitable[list[Subscription]]]
  • UnboundSubscription:定义了一个辅助别名,表示一个可调用对象,该对象不接受任何参数,返回一个Subscription对象列表或一个可等待的Subscription对象列表。

功能总结

这段代码的主要功能是定义了一个Subscription协议,用于描述订阅的行为和属性。订阅表示一个代理对某些主题感兴趣,通过实现Subscription协议的类可以创建具体的订阅对象,并使用这些对象来检查主题ID是否匹配、将主题ID映射到代理等。同时,代码还定义了一个辅助别名UnboundSubscription,用于表示生成订阅列表的可调用对象。

代码示例

1. Implementing the Subscription Protocol

from uuid import uuid4
from typing import Awaitable, Callable, Protocol, runtime_checkable,List# Assume these are simple string-based representations for now
class AgentId(str):passclass TopicId(str):pass@runtime_checkable
class Subscription(Protocol):@propertydef id(self) -> str:...def __eq__(self, other: object) -> bool:if not isinstance(other, Subscription):return Falsereturn self.id == other.iddef is_match(self, topic_id: TopicId) -> bool:...def map_to_agent(self, topic_id: TopicId) -> AgentId:...class ConcreteSubscription:def __init__(self, agent_id: AgentId, topic_pattern: str):self._id = str(uuid4())self._agent_id = agent_idself._topic_pattern = topic_pattern@propertydef id(self) -> str:return self._iddef is_match(self, topic_id: TopicId) -> bool:return topic_id.startswith(self._topic_pattern)def map_to_agent(self, topic_id: TopicId) -> AgentId:if self.is_match(topic_id):return self._agent_idraise ValueError("CantHandleException: Subscription cannot handle the topic_id.")# Usage example
agent_id = AgentId("agent_1")
subscription = ConcreteSubscription(agent_id, "topic_prefix_")
topic_id = TopicId("topic_prefix_123")
if subscription.is_match(topic_id):mapped_agent_id = subscription.map_to_agent(topic_id)print(f"Topic {topic_id} is mapped to agent {mapped_agent_id}")
Topic topic_prefix_123 is mapped to agent agent_1

2. Using the UnboundSubscription Helper Alias

UnboundSubscription = Callable[[], List[Subscription] | Awaitable[List[Subscription]]]def create_subscriptions() -> List[Subscription]:agent_id = AgentId("agent_1")subscription1 = ConcreteSubscription(agent_id, "topic_prefix_1_")subscription2 = ConcreteSubscription(agent_id, "topic_prefix_2_")return [subscription1, subscription2]unbound_sub: UnboundSubscription = create_subscriptions
subscriptions = unbound_sub()
for sub in subscriptions:print(f"Subscription ID: {sub.id}")
Subscription ID: 9b25f5f0-b162-4e21-bf11-c777313b0110
Subscription ID: ebdcd6d7-79ec-42d9-8121-c89dc28d781c

3. Asynchronous UnboundSubscription Example

import asyncio
async def create_subscriptions_async() -> List[Subscription]:await asyncio.sleep(1)  # Simulate some async operationagent_id = AgentId("agent_1")subscription1 = ConcreteSubscription(agent_id, "topic_prefix_1_")subscription2 = ConcreteSubscription(agent_id, "topic_prefix_2_")return [subscription1, subscription2]unbound_sub_async: UnboundSubscription = create_subscriptions_asyncasync def main():subscriptions = await unbound_sub_async()for sub in subscriptions:print(f"Subscription ID: {sub.id}")await main()
Subscription ID: 644e7731-1bf8-4202-b7dc-58fe2feb8ca3
Subscription ID: 2cae55f8-4163-4dbe-8c26-30f7ec80b581

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com