7.3. 数据监控与生成本地知识库
- 目的:监控新生成的小红书文案,记录每一次生成的小红书文案风格。后续根据输入topic,检索与某一topic有关的文案,可以根据先前的文案风格,生成类似风格的文案。
- 实现思路:
- 1.要实现文件监控功能,需要使用
watchdog
库。watchdog
是一个Python库,用于监控文件系统的变化。它提供了多种事件类型,如文件创建、修改、删除等,可以用来监控文件的变化。启动一个线程,实时监控xiaohongshu_drafts
目录下的文件变化,当有新文件生成时,调用process_new_file(file_path)
函数生成知识库。 - 2.
process_new_file(file_path)
函数读取新文件中的内容,并调用generate_knowledge_base()
函数对新生成的文案进行文本分割、对象转换、向量化等一系列操作来生成知识库。
- 代码实现:
'''
Author: yeffky
Date: 2025-02-12 13:29:31
LastEditTime: 2025-02-17 14:28:11
'''
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from langchain_community.document_loaders import TextLoader
from langchain_community.vectorstores import FAISS
from langchain.embeddings.huggingface import HuggingFaceEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from transformers import AutoTokenizer
from langchain.schema import Document
from text2vec import SentenceModel
import time
import osclass NewFileHandler(FileSystemEventHandler):def on_created(self, event):if not event.is_directory:file_path = event.src_pathprocess_new_file(file_path) print(f"新增文件已加载: {file_path}")def process_new_file(file_path):loader = TextLoader(file_path, encoding="utf-8")documents = loader.load()print(type(documents[0]))tokenizer = AutoTokenizer.from_pretrained("bert-base-chinese")text_splitter = RecursiveCharacterTextSplitter.from_huggingface_tokenizer(tokenizer,chunk_size=256,chunk_overlap=0,separators=['---'])text_chunks = text_splitter.split_text(documents[0].page_content)chunk_docs = [Document(page_content=chunk) for chunk in text_chunks]embeddings = HuggingFaceEmbeddings(model_name="BAAI/bge-base-zh")vector_store_dir = "./vector_store"if os.path.exists(vector_store_dir):vector_store = FAISS.load_local(vector_store_dir, embeddings, allow_dangerous_deserialization=True)vector_store.add_documents(chunk_docs)else:vector_store = FAISS.from_documents(chunk_docs, embeddings)vector_store.save_local(vector_store_dir)def start_observer():observer = Observer()observer.schedule(NewFileHandler(), path="./xiaohongshu_drafts", recursive=False)observer.start()try:while True:time.sleep(1)except KeyboardInterrupt:observer.stop()observer.join()if __name__ == "__main__":start_observer()
7.4. 数据分析
- 目的:根据先前爬取数据,与本地知识库联动,调用deepseek api完成小红书文案生成。
- 实现思路:
- 要通过deepseek生成文案首先需要构建prompt和提问词,首先从文件中加载prompt和提问词模板,然后将前期爬取的商品数据以及本地知识库中的文案数据作为输入,构建prompt和提问词。
- 代码实现:
'''
Author: yeffky
Date: 2025-02-11 11:17:04
LastEditTime: 2025-02-17 15:35:13
'''
import json
import os
import requests
from datetime import datetime
import random
from langchain import FAISS
from langchain.embeddings.huggingface import HuggingFaceEmbeddings
from text2vec import SentenceModelos.environ['HF_ENDPOINT'] = 'hf-mirror.com'
today_date = datetime.now().strftime('%Y-%m-%d')
topic = "手机推荐"
def read_json_file(filename):with open(f'data/{filename}', 'r', encoding='utf-8') as f:return json.load(f)
def build_prompt(item):with open('./docs/prompt.txt', 'r', encoding='utf-8') as f:prompt = f.read()embeddings = HuggingFaceEmbeddings(model_name="BAAI/bge-base-zh")vector_store = FAISS.load_local("./vector_store", embeddings, allow_dangerous_deserialization=True)retrieved_docs = vector_store.similarity_search(topic, k=5) random.shuffle(retrieved_docs)selected_docs = retrieved_docs[:3]return f"""{prompt},
{json.dumps(item, ensure_ascii=False, indent=2)}
**根据以下文案风格,做出创新**:{selected_docs}**注意**:- 在结尾加入提示,数据截至当前日期:{today_date}- 每一段内容使用 --- 进行分割
"""def build_preset():with open('./docs/preset.txt', 'r', encoding='utf-8') as f:preset = f.read()embeddings = HuggingFaceEmbeddings(model_name="BAAI/bge-base-zh")print("embeddings加载完毕")vector_store = FAISS.load_local("./vector_store", embeddings, allow_dangerous_deserialization=True)retrieved_docs = vector_store.similarity_search(topic, k=5) random.shuffle(retrieved_docs)selected_docs = retrieved_docs[:3]preset += f"""\n **主题**:{topic}**创新要求**:- 使用{random.choice(["轻松幽默", "专业严谨", "犀利吐槽"])}的语气- 加入{["emoji表情", "热门梗", "互动提问"]}元素"""print(preset)return preset
def get_deepseek_response(preset, prompt, api_key):url = "https://api.deepseek.com/chat/completions"headers = {"Authorization": f"Bearer {api_key}",'Content-Type': 'application/json','Accept': 'application/json',}payload = json.dumps({"messages": [{"content": preset, "role": "system"},{"content": prompt,"role": "user"}],"model": "deepseek-reasoner","frequency_penalty": 0,"max_tokens": 2048,"presence_penalty": 0,"response_format": {"type": "text"},"stop": None,"stream": False,"stream_options": None,"temperature": 1,"top_p": 1,"tools": None,"tool_choice": "none","logprobs": False,"top_logprobs": None})response = Nonewhile not response:try:response = requests.post(url, data=payload, headers=headers, timeout=100)response.raise_for_status()if not response.json():response = Noneprint("没有收到响应,重试中...")else:print("收到响应,内容为:\n" + response.json()['choices'][0]['message']['content'])except requests.exceptions.RequestException as e:print(f"请求失败:{str(e)}")response = Nonereturn response.json()['choices'][0]['message']['content']
def save_copywriting(content):base_path = f'./xiaohongshu_drafts/'filename = f"小红书_推广文案_千战系列" + today_date + ".txt"print(content)with open(base_path + filename, 'w', encoding='utf-8') as f:f.write(content)print(f"文案已保存至:{filename}")
def analysis_data():API_KEY = os.getenv("DEEPSEEK_API_KEY") JSON_FILE = f'goods_{today_date}.json'items = read_json_file(JSON_FILE)print(f"正在处理:{JSON_FILE}")prompt = build_prompt(items)preset = build_preset()try:response = get_deepseek_response(preset, prompt, API_KEY)save_copywriting(response)except Exception as e:print(f"处理失败:{str(e)}")if __name__ == "__main__":analysis_data()