欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 时评 > Langchain对管道操作符|的重构实现链式流程

Langchain对管道操作符|的重构实现链式流程

2025/2/22 17:00:27 来源:https://blog.csdn.net/qq_43920838/article/details/145613510  浏览:    关键词:Langchain对管道操作符|的重构实现链式流程

目录

  • 一、管道操作符
  • 二、LangChain 的链式流程
  • 三、源码剖析

一、管道操作符

  1. Python 中的管道操作符
    在 Python 3.10 及以上版本中,管道操作符 | 主要用于以下场景:

(1)集合操作
管道操作符可以用于集合的并集操作。

set1 = {1, 2, 3}
set2 = {3, 4, 5}
result = set1 | set2  # 并集: {1, 2, 3, 4, 5}

(2)类型联合
在类型注解中,| 可以表示“或”的关系。

def func(x: int | float) -> int | float:return x * 2

(3)模式匹配
在 match 语句中,| 可以用于匹配多个模式。

value = 3
match value:case 1 | 2 | 3:print("Small number")case _:print("Other number")

二、LangChain 的链式流程

在 LangChain 中,管道操作符 | 用于将多个步骤串联起来,形成一个链式流程。

chain = ({"question": RunnablePassthrough()}| prompt_template| llm| output_parser
)

每一步的输出会作为下一步的输入。

这种设计使得代码更加模块化和可读。

三、源码剖析

在 LangChain 的实际源码中,管道操作符的实现位于 langchain/schema/runnable.py 文件中。以下是一些关键代码片段:

class Runnable:def __or__(self, other: Union["Runnable", Callable]) -> "RunnableSequence":from langchain.schema.runnable import RunnableSequencereturn RunnableSequence([self, other])

or 方法,它是 Python 中的特殊方法(也称为魔术方法或双下方法)。or 方法用于重载 | 操作符的行为。具体来说,这段代码的作用是允许一个对象与另一个对象(或可调用对象)通过 | 操作符连接,并返回一个新的 RunnableSequence 对象。
or 方法定义了 | 操作符的行为。当你在代码中使用 | 操作符连接两个对象时,Python 会调用左侧对象的 or 方法,并将右侧对象作为参数传递。

step1 = Runnable()
step2 = Runnable()# 使用 | 操作符连接两个对象
chain = step1 | step2

Union[“Runnable”, Callable] 的含义
Union 是 Python 类型注解中的一个工具,表示“或”的关系。Union[“Runnable”, Callable] 表示 other 可以是以下两种类型之一:

Runnable:一个实现了 Runnable 接口的对象。

Callable:一个可调用对象(如函数、lambda 表达式等)。

链式流程有递归的思想

from typing import Union, Callableclass Runnable:def __or__(self, other: Union["Runnable", Callable]) -> "RunnableSequence":if callable(other):# 如果 other 是可调用对象,将其包装为 Runnableother = RunnableWrapper(other)return RunnableSequence(self, other)class RunnableSequence:def __init__(self, first: Runnable, second: Runnable):self.first = firstself.second = seconddef run(self, input):# 先运行第一个步骤intermediate = self.first.run(input)# 再运行第二个步骤return self.second.run(intermediate)class RunnableWrapper(Runnable):def __init__(self, func: Callable):self.func = funcdef run(self, input):return self.func(input)

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词