以下是使用 Python + Flask + vLLM 部署 QWQ-32B 大语言模型的完整实现代码,包含详细注释和优化配置:
# -*- coding: utf-8 -*-
# qwq32b_deploy.py
import os
import time
from typing import Dict, List
from flask import Flask, request, jsonify
from vllm import AsyncLLMEngine, SamplingParams
from vllm.engine.arg_utils import AsyncEngineArgs
from concurrent.futures import ThreadPoolExecutor# ========== 配置区 ==========
class Config:MODEL_PATH = "/path/to/QWQ-32B" # 模型路径 (需包含config.json和模型权重)HOST = "0.0.0.0"PORT = 8000MAX_TOKENS = 1024 # 默认生成最大token数TEMPERATURE = 0.7 # 默认温度参数TOP_P = 0.9 # 默认top-p采样GPU_MEM_UTIL = 0.85 # GPU显存利用率TENSOR_PARALLEL = 2 # 张量并行度 (根据GPU数量调整)MAX_CONCURRENT = 100 # 最大并发请求数# ========== 初始化 ==========
def create_engine():"""初始化vLLM异步推理引擎"""engine_args = AsyncEngineArgs(model=Config.MODEL_PATH,tensor_parallel_size=Config.TENSOR_PARALLEL,gpu_memory_utilization=Config.GPU_MEM_UTIL,max_num_seqs=Config.MAX_CONCURRENT,max_model_len=4096, # 最大上下文长度trust_remote_code=True, # 允许自定义模型代码quantization="awq", # 使用AWQ量化 (如果模型支持)enable_prefix_caching=True, # 启用前缀缓存block_size=16, # KV缓存块大小swap_space=8 # CPU-GPU交换空间(GB))return AsyncLLMEngine.from_engine_args(engine_args)# ========== Flask应用 ==========
app = Flask(__name__)
engine = create_engine()
executor = ThreadPoolExecutor(max_workers=Config.MAX_CONCURRENT)# ========== 工具函数 ==========
def generate_request_id():"""生成唯一请求ID"""return f"{int(time.time())}-{os.urandom(4).hex()}"def validate_input(data: Dict) -> Dict:"""验证输入参数"""defaults = {"prompt": "","max_tokens": Config.MAX_TOKENS,"temperature": Config.TEMPERATURE,"top_p": Config.TOP_P,"stop": None}return {k: data.get(k, v) for k, v in defaults.items()}# ========== 核心接口 ==========
# 备注此处也可以调换为标准OpenAI接口风格
@app.route('/generate', methods=['POST'])
async def generate():"""文本生成主接口"""try:# 参数验证data = validate_input(request.json)# 构建采样参数sampling_params = SamplingParams(n=1,max_tokens=data["max_tokens"],temperature=data["temperature"],top_p=data["top_p"],stop=data["stop"],frequency_penalty=0.5,presence_penalty=0.5)# 异步执行推理request_id = generate_request_id()output_generator = engine.generate(data["prompt"],sampling_params,request_id=request_id)# 获取结果final_output = Noneasync for output in output_generator:final_output = output# 返回格式化结果return jsonify({"status": "success","output": final_output.outputs[0].text,"request_id": request_id,"metrics": {"total_tokens": len(final_output.outputs[0].token_ids),"generation_time": final_output.metrics.generation_time}})except Exception as e:return jsonify({"status": "error","message": str(e),"request_id": request_id if 'request_id' in locals() else None}), 500# ========== 管理接口 ==========
@app.route('/health', methods=['GET'])
def health_check():"""健康检查接口"""return jsonify({"status": "healthy","gpu_utilization": engine.statistics.gpu_memory_used,"pending_requests": engine.statistics.pending_requests})@app.route('/stats', methods=['GET'])
def get_stats():"""性能统计接口"""stats = engine.statisticsreturn jsonify({"throughput": stats.throughput,"avg_latency": stats.avg_latency,"gpu_mem_used": f"{stats.gpu_memory_used} GB","active_requests": stats.running_requests})# ========== 启动服务 ==========
if __name__ == '__main__':# 打印配置信息print(f"""===== QWQ-32B 部署配置 =====Model Path: {Config.MODEL_PATH}Tensor Parallel: {Config.TENSOR_PARALLEL}Max Concurrent: {Config.MAX_CONCURRENT}Server: {Config.HOST}:{Config.PORT}""")# 启动Flask应用app.run(host=Config.HOST,port=Config.PORT,threaded=True,debug=False)
部署说明
- 环境准备:
# 基础环境
pip install flask vllm>=0.3.3 torch>=2.1.0 transformers# 可选监控工具
pip install prometheus_client # 如需添加监控
- 模型准备:
- 确保模型目录包含:
QWQ-32B/├── config.json├── model.safetensors├── tokenizer.model└── special_tokens_map.json
- 如果使用量化版本,需要对应修改
quantization
参数
- 启动命令:
# 单GPU启动
CUDA_VISIBLE_DEVICES=0 python qwq32b_deploy.py# 多GPU启动 (示例使用2卡)
CUDA_VISIBLE_DEVICES=0,1 python qwq32b_deploy.py --tensor_parallel 2
- 性能优化建议:
- 调整
block_size
参数优化显存使用 - 使用
--quantization "awq"
启用4bit量化 - 监控
/stats
接口调整max_concurrent
参数
- 生产环境建议:
- 使用Gunicorn+Gevent部署:
pip install gunicorn gevent gunicorn -w 4 -k gevent -b :8000 qwq32b_deploy:app
- 添加Nginx反向代理和SSL加密
- 实现API密钥认证
接口调用示例
import requestsurl = "http://localhost:8000/generate"
headers = {"Content-Type": "application/json"}data = {"prompt": "解释量子计算的基本原理","max_tokens": 256,"temperature": 0.8
}response = requests.post(url, json=data, headers=headers)
print(response.json())
关键特性
- 动态批处理:自动合并并发请求提高吞吐
- 显存优化:使用PagedAttention技术管理KV缓存
- 异步处理:非阻塞式请求处理
- 监控接口:实时获取服务状态和性能指标
- 安全验证:输入参数自动校验
可根据实际需求调整配置参数,特别是tensor_parallel_size
需要匹配实际GPU数量。对于更大规模的部署,建议结合Kubernetes实现自动扩缩容。