基于ReAct(Reasoning + Acting)框架的自主智能体
import re
from typing import List, Tuplefrom langchain_community.chat_message_histories.in_memory import ChatMessageHistory
from langchain_core.language_models.chat_models import BaseChatModel
from langchain.output_parsers import PydanticOutputParser, OutputFixingParser
from langchain.schema.output_parser import StrOutputParser
from langchain.tools.base import BaseTool
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.tools import render_text_description
from pydantic import ValidationError
from langchain_core.prompts import HumanMessagePromptTemplatefrom Agent.Action import Action
from Utils.CallbackHandlers import *class ReActAgent:"""AutoGPT:基于Langchain实现"""@staticmethoddef __format_thought_observation(thought: str, action: Action, observation: str) -> str:# 将全部JSON代码块替换为空ret = re.sub(r'```json(.*?)```', '', thought, flags=re.DOTALL)ret += "\n" + str(action) + "\n返回结果:\n" + observationreturn ret@staticmethoddef __extract_json_action(text: str) -> str | None:# 匹配最后出现的JSON代码块json_pattern = re.compile(r'```json(.*?)```', re.DOTALL)matches = json_pattern.findall(text)if matches:last_json_str = matches[-1]return last_json_strreturn Nonedef __init__(self,llm: BaseChatModel,tools: List[BaseTool],work_dir: str,main_prompt_file: str,max_thought_steps: Optional[int] = 10,):self.llm = llmself.tools = toolsself.work_dir = work_dirself.max_thought_steps = max_thought_steps# OutputFixingParser: 如果输出格式不正确,尝试修复self.output_parser = PydanticOutputParser(pydantic_object=Action)self.robust_parser = OutputFixingParser.from_llm(parser=self.output_parser,llm=llm)self.main_prompt_file = main_prompt_fileself.__init_prompt_templates()self.__init_chains()self.verbose_handler = ColoredPrintHandler(color=THOUGHT_COLOR)def __init_prompt_templates(self):with open(self.main_prompt_file, 'r', encoding='utf-8') as f:self.prompt = ChatPromptTemplate.from_messages([MessagesPlaceholder(variable_name="chat_history"),HumanMessagePromptTemplate.from_template(f.read()),]).partial(work_dir=self.work_dir,tools=render_text_description(self.tools),tool_names=','.join([tool.name for tool in self.tools]),format_instructions=self.output_parser.get_format_instructions(),)def __init_chains(self):# 主流程的chainself.main_chain = (self.prompt | self.llm | StrOutputParser())def __find_tool(self, tool_name: str) -> Optional[BaseTool]:for tool in self.tools:if tool.name == tool_name:return toolreturn Nonedef __step(self,task,short_term_memory,chat_history,verbose=False) -> Tuple[Action, str]:"""执行一步思考"""inputs = {"input": task,"agent_scratchpad": "\n".join(short_term_memory),"chat_history": chat_history.messages,}config = {"callbacks": [self.verbose_handler]if verbose else []}response = ""for s in self.main_chain.stream(inputs, config=config):response += s# 提取JSON代码块json_action = self.__extract_json_action(response)# 带容错的解析action = self.robust_parser.parse(json_action if json_action else response)return action, responsedef __exec_action(self, action: Action) -> str:# 查找工具tool = self.__find_tool(action.name)if tool is None:observation = (f"Error: 找不到工具或指令 '{action.name}'. "f"请从提供的工具/指令列表中选择,请确保按对顶格式输出。")else:try:# 执行工具observation = tool.run(action.args)except ValidationError as e:# 工具的入参异常observation = (f"Validation Error in args: {str(e)}, args: {action.args}")except Exception as e:# 工具执行异常observation = f"Error: {str(e)}, {type(e).__name__}, args: {action.args}"return observationdef run(self,task: str,chat_history: ChatMessageHistory,verbose=False) -> str:"""运行智能体:param task: 用户任务:param chat_history: 对话上下文(长时记忆):param verbose: 是否显示详细信息"""# 初始化短时记忆: 记录推理过程short_term_memory = []# 思考步数thought_step_count = 0reply = ""# 开始逐步思考while thought_step_count < self.max_thought_steps:if verbose:self.verbose_handler.on_thought_start(thought_step_count)# 执行一步思考action, response = self.__step(task=task,short_term_memory=short_term_memory,chat_history=chat_history,verbose=verbose,)# 如果是结束指令,执行最后一步if action.name == "FINISH":reply = self.__exec_action(action)break# 执行动作observation = self.__exec_action(action)if verbose:self.verbose_handler.on_tool_end(observation)# 更新短时记忆short_term_memory.append(self.__format_thought_observation(response, action, observation))thought_step_count += 1if thought_step_count >= self.max_thought_steps:# 如果思考步数达到上限,返回错误信息reply = "抱歉,我没能完成您的任务。"# 更新长时记忆chat_history.add_user_message(task)chat_history.add_ai_message(reply)return reply
结合LangChain框架和工具调用能力来逐步解决用户任务。
以下是代码的逐模块解析:
1. 核心结构
class ReActAgent:"""AutoGPT:基于Langchain实现"""
- 核心类:实现了ReAct范式(推理+行动循环)的自主智能体
- 核心能力:
- 多步思考推理
- 工具调用
- 长短期记忆管理
- 异常处理与自我修正
2. 关键静态方法
@staticmethod
def __format_thought_observation(...): # 格式化思考记录
@staticmethod
def __extract_json_action(...): # 提取JSON动作
- 功能:
__format_thought_observation
:将思考过程、动作执行和观察结果格式化为可读文本,存入短期记忆__extract_json_action
:用正则表达式提取模型输出中的最后一个JSON代码块(确保获取最新动作)
3. 初始化模块
def __init__(...):# 核心组件初始化self.llm = llm # 大语言模型self.tools = tools # 可用工具列表self.work_dir = work_dir # 工作目录self.max_thought_steps = ... # 最大思考步数# 输出解析系统self.output_parser = PydanticOutputParser(pydantic_object=Action)self.robust_parser = OutputFixingParser.from_llm(...)# 提示工程self.__init_prompt_templates()self.__init_chains()
- 关键技术点:
- 双解析器机制:
OutputFixingParser
可在格式错误时自动修复输出 - Pydantic验证:确保动作符合预定义结构(Action模型)
- 工具描述渲染:
render_text_description
将工具转化为自然语言描述
- 双解析器机制:
4. 提示工程系统
def __init_prompt_templates(self):with open(self.main_prompt_file) as f:self.prompt = ChatPromptTemplate.from_messages(...).partial(tools=..., # 工具描述tool_names=..., # 工具名称列表format_instructions=..., # 格式说明)
- 核心要素:
- 动态加载提示模板文件
- 包含:
- 聊天历史占位符
- 工具使用说明
- 输出格式要求
- 工作目录上下文
5. 执行流程控制
def run(...):while thought_step_count < self.max_thought_steps:# 单步思考action, response = self.__step(...)if action.name == "FINISH":break# 执行动作observation = self.__exec_action(action)# 记忆更新short_term_memory.append(...)
- ReAct循环:
- Reasoning:生成思考与动作(
__step
) - Acting:执行工具调用(
__exec_action
) - Observing:记录执行结果
- Loop:直到达到终止条件
- Reasoning:生成思考与动作(
6. 关键技术实现
6.1 单步推理 (__step
)
def __step(...):inputs = {"input": task,"agent_scratchpad": "\n".join(short_term_memory),"chat_history": chat_history.messages,}# 流式处理LLM输出for s in self.main_chain.stream(inputs):response += s# 提取并解析动作json_action = self.__extract_json_action(response)action = self.robust_parser.parse(...)
- 输入组成:
- 任务目标
- 短期记忆(推理过程)
- 长期记忆(聊天历史)
- 流式处理:实时显示思考过程
- 错误恢复:自动修复格式错误的JSON输出
6.2 动作执行 (__exec_action
)
def __exec_action(...):tool = self.__find_tool(action.name)try:observation = tool.run(action.args)except ValidationError:# 参数验证错误处理except Exception:# 通用错误处理
- 异常处理机制:
- 工具不存在
- 参数验证错误
- 运行时异常
- 观察反馈:将错误信息转化为自然语言,供后续推理使用
7. 记忆系统
# 短期记忆
short_term_memory = [] # 存储格式化的推理过程# 长期记忆
chat_history = ChatMessageHistory() # 保存完整对话记录
- 记忆类型:
- 短期记忆:当前任务的推理过程(最多保留max_thought_steps步)
- 长期记忆:跨会话的完整对话历史
8. 关键设计亮点
-
自愈式输出解析:
- 通过
OutputFixingParser
实现格式错误自动修复 - 示例场景:当LLM返回非法JSON时,自动尝试修正
- 通过
-
渐进式推理:
# 示例输出格式 Thought: 我需要先查找用户信息 Action: {"name": "user_search", "args": {"id": 123}} Observation: 用户张三,年龄30岁
- 通过
agent_scratchpad
维护推理上下文
- 通过
-
工具发现机制:
- 动态渲染工具描述到提示词
- 支持工具的热插拔
-
多级异常处理:
- 工具不存在
- 参数验证错误
- 执行时异常
- 最大步数限制
9. 使用示例
# 初始化组件
llm = ChatOpenAI()
tools = [SearchTool(), Calculator()]
agent = ReActAgent(llm, tools, work_dir="/data")# 执行任务
result = agent.run(task="计算马云当前年龄的平方根",chat_history=ChatMessageHistory(),verbose=True
)
- 典型执行流程:
- 搜索"马云年龄" → 得到60岁
- 调用计算器计算√60 → 约7.746
- 返回最终结果
10. 可扩展性建议
-
增强记忆管理:
- 添加向量数据库长期记忆
- 实现记忆压缩/摘要
-
改进推理质量:
- 添加自我验证步骤
- 实现多路径推理
-
性能优化:
- 添加异步执行
- 实现工具并行调用
该实现展示了如何结合LangChain框架构建复杂的自主智能体系统,平衡了LLM的创造力和结构化工具调用的可靠性。