欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 财经 > 金融 > 构建LangChain应用程序的示例代码:10、如何使用LangChain的标准聊天功能,并通过Apache Kafka来回传递聊天消息教程

构建LangChain应用程序的示例代码:10、如何使用LangChain的标准聊天功能,并通过Apache Kafka来回传递聊天消息教程

2025/2/23 1:59:48 来源:https://blog.csdn.net/wangjiansui/article/details/139421610  浏览:    关键词:构建LangChain应用程序的示例代码:10、如何使用LangChain的标准聊天功能,并通过Apache Kafka来回传递聊天消息教程

使用Apache Kafka路由消息

本示例向您展示了如何使用LangChain的标准聊天功能,并通过Apache Kafka来回传递聊天消息。

目标是模拟一个架构,其中聊天前端和LLM作为需要通过内部网络相互通信的独立服务运行。

这是一种替代通过REST API请求模型响应的典型模式(本文末尾有更多信息,解释了为什么您可能想要这样做)。

1. 安装主要依赖项

依赖项包括:

  • Quix Streams库,用于以"Pandas-like"的方式管理与Apache Kafka(或Kafka-like工具,如Redpanda)的交互。
  • LangChain库,用于管理与Llama-2的交互并存储对话状态。
!pip install quixstreams==2.1.2a langchain==0.0.340 huggingface_hub==0.19.4 langchain-experimental==0.0.42 python-dotenv

2. 构建并安装llama-cpp-python库(启用CUDA以利用Google Colab GPU)

llama-cpp-python库是一个Python包装器,围绕llama-cpp库,使您能够高效地仅使用CPU运行量化的LLM。

使用标准的pip安装命令llama-cpp-python默认不支持GPU。如果在Google Colab中仅依赖CPU,生成可能会非常慢,所以下面的命令添加了一个额外的选项来构建并安装带有GPU支持的llama-cpp-python(确保您在Google Colab中选择了GPU支持的运行环境)。

!CMAKE_ARGS="-DLLAMA_CUBLAS=on" FORCE_CMAKE=1 pip install llama-cpp-python

3. 下载并设置Kafka和Zookeeper实例

从Apache网站下载Kafka二进制文件,并以守护进程方式启动服务器。我们将使用Apache Kafka提供的默认配置来启动实例。

!curl -sSOL https://dlcdn.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz
!tar -xzf kafka_2.13-3.6.1.tgz!./kafka_2.13-3.6.1/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.6.1/config/zookeeper.properties
!./kafka_2.13-3.6.1/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.6.1/config/server.properties
!echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
!sleep 10

4. 检查Kafka守护进程是否正在运行

显示正在运行的进程,并过滤Java进程(您应该看到两个——每个服务器一个)。

!ps aux | grep -E '[j]ava'

5. 导入所需的依赖项并初始化所需的变量

导入与Kafka交互的Quix Streams库,以及运行ConversationChain所需的LangChain组件。

# 导入实用程序库
import json
import random
import re
import time
import uuid
from os import environ
from pathlib import Path
from random import choice, randint, randomfrom dotenv import load_dotenv# 从Hugging Face hub直接下载模型的Hugging Face实用程序:
from huggingface_hub import hf_hub_download
from langchain.chains import ConversationChain# 导入Langchain模块以管理提示和对话链:
from langchain.llms import LlamaCpp
from langchain.memory import ConversationTokenBufferMemory
from langchain.prompts import PromptTemplate, load_prompt
from langchain_core.messages import SystemMessage
from langchain_experimental.chat_models import Llama2Chat
from quixstreams import Application, State, message_key# 导入Quix依赖项
from quixstreams.kafka import Producer# 初始化全局变量。
AGENT_ROLE = "AI"
chat_id = ""# 将当前角色设置为角色常量,并为补充客户元数据初始化变量:
role = AGENT_ROLE

6. 下载"llama-2-7b-chat.Q4_K_M.gguf"模型

从Hugging Face下载量化的Llama-2 7B模型,我们将使用它作为本地LLM(而不是依赖于外部服务的REST API调用)。

model_name = "llama-2-7b-chat.Q4_K_M.gguf"
model_path = f"./state/{model_name}"if not Path(model_path).exists():print("The model path does not exist in state. Downloading model...")hf_hub_download("TheBloke/Llama-2-7b-Chat-GGUF", model_name, local_dir="state")
else:print("Loading model from state...")

7. 加载模型并初始化对话记忆

加载Llama 2,并使用ConversationTokenBufferMemory将对话缓冲区设置为300个token。这个值用于在仅CPU容器中运行Llama,所以如果在Google Colab中运行,您可以提高它。它防止了托管模型的容器内存不足。

在这里,我们覆盖了默认的系统角色,以便聊天机器人具有《银河系漫游指南》中Marvin The Paranoid Android的个性。

# 使用适当的参数加载模型:llm = LlamaCpp(model_path=model_path,max_tokens=250,top_p=0.95,top_k=150,temperature=0.7,repeat_penalty=1.2,n_ctx=2048,streaming=False,n_gpu_layers=-1,
)model = Llama2Chat(llm=llm,system_message=SystemMessage(content="您是一个非常无聊的机器人,具有《银河系漫游指南》中Marvin the Paranoid Android的个性。"),
)# 定义在每次交流中给模型的对话历史量(300个token,或者略多于300个单词)# 该函数自动修剪超出token范围的对话历史中最旧的消息。memory = ConversationTokenBufferMemory(llm=llm,max_token_limit=300,ai_prefix="AGENT",human_prefix="HUMAN",return_messages=True,
)# 定义自定义提示prompt_template = PromptTemplate(input_variables=["history", "input"],template=""" 以下文本是您和需要您智慧的谦逊人类之间聊天的历史。请回复人类最近的消息。当前对话:{history}HUMAN: {input}\:nANDROID:""",
)chain = ConversationChain(llm=model, prompt=prompt_template, memory=memory)print("--------------------------------------------")
print(f"Prompt={chain.prompt}")
print("--------------------------------------------")

8. 使用聊天机器人初始化聊天对话

我们配置聊天机器人通过向"chat" Kafka主题发送固定问候来初始化对话。当我们发送第一条消息时,"chat"主题会自动创建。

def chat_init():chat_id = str(uuid.uuid4())  # 为有效的消息键控给对话一个IDprint("======================================")print(f"Generated CHAT_ID = {chat_id}")print("======================================")chat_init()

9. 初始化回复功能

这个函数定义了聊天机器人应该如何回复传入的消息。与之前的单元格不同,我们不是发送一个固定的消息,而是使用Llama-2生成一个回复,并将该回复发回"chat" Kafka主题。

def reply(row: dict, state: State):print("-------------------------------")print("Received:")print(row)print("-------------------------------")print(f"Thinking about the reply to: {row['text']}...")

10. 检查Kafka主题以获取新的人类消息,并让模型生成回复

如果您第一次运行这个单元格,请运行它并等待在控制台输出中看到Marvin的问候(“Hello my name is Marvin…”)。在收到LLM的回复后,手动停止这个单元格,并继续执行下一个单元格,在那里您将被提示输入您的回复。

一旦您输入了您的消息,请回到这个单元格。您的回复也发送到了同一个"chat"主题。Kafka消费者检查新消息,并过滤掉来自聊天机器人本身的消息,只留下最新的人类消息。

一旦检测到新的人类消息,就会触发回复功能。

在输出中收到LLM的回复后,手动停止这个单元格。

# 定义您的应用程序和设置app = Application(broker_address="127.0.0.1:9092",consumer_group="aichat",auto_offset_reset="earliest",consumer_extra_config={"allow.auto.create.topics": "true"},
)# 定义一个带有JSON反序列化的输入主题input_topic = app.topic("chat", value_deserializer="json")# 定义一个带有JSON序列化的输出主题output_topic = app.topic("chat", value_serializer="json")# 基于输入主题的消息流初始化一个流数据帧:sdf = app.dataframe(topic=input_topic)# 过滤SDF,只包括角色与机器人当前角色不匹配的传入行sdf = sdf.update(lambda val: print(f"Received update: {val}\n\nSTOP THIS CELL MANUALLY TO HAVE THE LLM REPLY OR ENTER YOUR OWN FOLLOWUP RESPONSE")
)# 以便它不会回复自己的消息sdf = sdf[sdf["role"] != role]# 为过滤后的SDF中检测到的任何新消息(行)触发回复功能sdf = sdf.apply(reply, stateful=True)# 再次检查SDF并过滤掉任何空行sdf = sdf[sdf.apply(lambda row: row is not None)]# 更新时间戳列到当前时间的纳秒sdf["Timestamp"] = sdf["Timestamp"].apply(lambda row: time.time_ns())# 将处理过的SDF发布到由output_topic对象指定的Kafka主题。sdf = sdf.to_topic(output_topic)app.run(sdf)

11. 输入人类消息

运行这个单元格以输入您想要发送给模型的消息。它使用另一个Kafka生产者将您的文本发送到"chat" Kafka主题,供模型获取(需要再次运行上一个单元格)。

chat_input = input("Please enter your reply: ")
myreply = chat_inputmsgvalue = {"uuid": chat_id,  # 现在留空"role": "human","text": myreply,"conversation_id": chat_id,"Timestamp": time.time_ns(),
}with Producer(broker_address="127.0.0.1:9092",extra_config={"allow.auto.create.topics": "true"},
) as producer:value = msgvalueproducer.produce(topic="chat",headers=[("uuid", str(uuid.uuid4()))],  # 这里也允许使用字典key=chat_id,  # 现在留空value=json.dumps(value),  # 需要是一个字符串)print("Replied to chatbot with message:")
print("--------------------------------------------")
print(value)
print("--------------------------------------------")
print("\n\nRUN THE PREVIOUS CELL TO HAVE THE CHATBOT GENERATE A REPLY")

为什么要通过Kafka路由聊天消息?

使用LangChain内置的对话管理功能直接与LLM交互更容易。此外,您还可以使用REST API从外部托管的模型生成响应。那么为什么要费心使用Apache Kafka呢?

有几个原因,例如:

  • 集成:许多企业希望运行自己的LLM,以便他们可以将数据保留在内部。这需要将LLM支持的组件集成到可能已经使用某种消息总线进行解耦的现有架构中。
  • 可扩展性:Apache Kafka旨在进行并行处理,因此许多团队更喜欢使用它来更有效地将工作分配给可用的工作者(在这种情况下,“工作者”是运行LLM的容器)。
  • 持久性:Kafka旨在允许服务在另一项服务遇到内存问题或下线时继续进行。这可以防止在多个系统相互通信的复杂分布式架构中发生数据丢失(LLM只是许多相互依赖的系统之一,还包括向量数据库和传统数据库)。

有关事件流为何适用于Gen AI应用程序架构的更多背景信息,请参阅Kai Waehner的文章“Apache Kafka + Vector Database + LLM = Real-Time GenAI”。


本文介绍了一种使用Apache Kafka进行消息路由以模拟独立聊天前端和LLM服务之间通信的架构。通过安装依赖库、设置Kafka环境、初始化模型和对话记忆,构建了一个聊天机器人。这种方法利用了Kafka的集成性、可扩展性和持久性优势,适合需要将LLM集成到现有解耦架构中的企业使用。此外,它还提供了一种比直接REST API调用更健壮和可靠的解决方案。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词