21. 中间件开发
from flask import request@app.before_request
def log_request_info():app.logger.debug(f"Request Headers: {request.headers}")app.logger.debug(f"Request Body: {request.get_data()}")@app.after_request
def add_custom_header(response):response.headers["X-Powered-By"] = "My API Server"return response
解析:
@app.before_request
在请求处理前执行(日志记录/权限校验)@app.after_request
在响应返回前修改响应头- 中间件适用于全局逻辑处理(如跨域/限流)
22. 微服务通信(gRPC)
# protobuf 定义(user.proto)
service UserService {rpc GetUser (UserRequest) returns (UserResponse) {}
}# 服务端实现
class UserServicer(user_pb2_grpc.UserServiceServicer):def GetUser(self, request, context):return user_pb2.UserResponse(id=1, name="Alice")server = grpc.server(futures.ThreadPoolExecutor())
user_pb2_grpc.add_UserServiceServicer_to_server(UserServicer(), server)
server.add_insecure_port('[::]:50051')
server.start()
解析:
- Protocol Buffers 定义接口契约
- 高性能二进制通信(比 REST 快 5-7 倍)
- 支持双向流式通信
23. 性能压测(Locust)
from locust import HttpUser, taskclass ApiUser(HttpUser):@taskdef get_users(self):self.client.get("/api/users")@task(3) # 权重3倍def create_user(self):self.client.post("/api/users", json={"name": "test"})
解析:
- 模拟高并发用户行为(可视化压测报告)
- 定义任务权重控制请求比例
- 找出系统瓶颈(数据库/网络/CPU)
24. 配置热更新
import configparser
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandlerclass ConfigHandler(FileSystemEventHandler):def on_modified(self, event):if event.src_path.endswith("config.ini"):reload_config()def reload_config():config = configparser.ConfigParser()config.read('config.ini')app.config.update(config['DEFAULT'])
解析:
- 使用 watchdog 监听配置文件变化
- 避免重启服务实现配置更新
- 适用场景:功能开关/日志级别调整
25. 分布式锁(Redis)
import redis
from redis.lock import Lockr = redis.Redis()
lock = Lock(r, "order_lock", timeout=10)def process_order():if lock.acquire(blocking_timeout=5):try:# 关键业务逻辑finally:lock.release()
解析:
- 防止并发操作导致数据不一致
timeout
防止死锁(自动释放锁)- 需处理网络分区带来的脑裂问题
26. 定时任务(APScheduler)
from apscheduler.schedulers.background import BackgroundSchedulerscheduler = BackgroundScheduler()
scheduler.add_job(clean_expired_tokens, 'cron', hour=2)
scheduler.start()@app.teardown_appcontext
def shutdown_scheduler(exception=None):scheduler.shutdown()
解析:
- 后台线程执行定时任务
- 支持 cron 表达式/间隔触发
- 集群环境下需配合分布式锁
27. 服务健康检查
@app.route('/health')
def health_check():try:db.session.execute('SELECT 1')return "OK", 200except Exception as e:return str(e), 500
解析:
- Kubernetes/Docker 依赖此接口做存活检测
- 检查核心依赖状态(数据库/缓存/第三方服务)
- 返回详细错误信息便于诊断
28. 请求限流
from flask_limiter import Limiterlimiter = Limiter(app=app, key_func=get_remote_address)
limiter.limit("100/hour")(app.route('/api/data'))@limiter.request_filter
def ip_whitelist():return request.remote_addr == "192.168.1.1"
解析:
- 基于 IP/用户ID 的速率限制
- 白名单机制排除特定请求
- 防止 API 被滥用/拒绝服务攻击
29. 数据分页优化
@app.route('/articles')
def get_articles():page = request.args.get('page', 1, type=int)per_page = 20pagination = Article.query.paginate(page=page, per_page=per_page,error_out=False)return {"data": [article.to_dict() for article in pagination.items],"total": pagination.total}
解析:
paginate()
方法自动计算分页参数error_out=False
无效页码返回空而非404- 前端需处理分页元数据展示
30. 对象存储集成
import boto3
from botocore.exceptions import ClientErrors3 = boto3.client('s3', aws_access_key_id=os.getenv('AWS_KEY'),aws_secret_access_key=os.getenv('AWS_SECRET')
)def upload_file(file_stream, bucket, object_name):try:s3.upload_fileobj(file_stream, bucket, object_name)return Trueexcept ClientError as e:logger.error(e)return False
解析:
- 使用 AWS S3 协议兼容的存储服务
upload_fileobj
支持流式上传大文件- 生产环境需配置 CDN 加速访问
31. 服务监控(Prometheus)
from prometheus_client import Counter, generate_latestREQUEST_COUNT = Counter('http_requests_total', 'Total HTTP Requests')@app.route('/metrics')
def metrics():REQUEST_COUNT.inc()return generate_latest()@app.route('/api/data')
def get_data():REQUEST_COUNT.inc()return "Data"
解析:
- 暴露
/metrics
端点供 Prometheus 抓取 - 定义业务指标(请求数/耗时/错误率)
- Grafana 可视化监控仪表盘
32. 消息队列(RabbitMQ)
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)def callback(ch, method, properties, body):print(f"Received: {body}")ch.basic_ack(delivery_tag=method.delivery_tag)channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()
解析:
durable=True
持久化队列防止消息丢失basic_ack
手动确认消息处理完成- 实现解耦和流量削峰
33. 数据库迁移(Alembic)
# 初始化迁移仓库
alembic init migrations# 生成迁移脚本
alembic revision --autogenerate -m "create user table"# 执行迁移
alembic upgrade head
解析:
- 版本化数据库 schema 变更
- 自动对比模型与数据库差异
- 支持回滚到历史版本
34. 文档生成(Swagger)
from flasgger import Swaggerswagger = Swagger(app)@app.route('/api/user/<int:user_id>')
def get_user(user_id):"""获取用户信息---parameters:- name: user_idin: pathtype: integerrequired: trueresponses:200:description: 用户对象"""user = User.query.get(user_id)return jsonify(user.to_dict())
解析:
- 自动生成交互式 API 文档
- 通过代码注释定义接口规范
- 支持在线测试接口
35. 国际化和本地化
from flask_babel import Babel, _babel = Babel(app)@babel.localeselector
def get_locale():return request.accept_languages.best_match(['zh', 'en'])@app.route('/greeting')
def greeting():return _("Hello World")
解析:
_()
函数实现文本翻译- 根据请求头自动选择语言
- 需要维护多语言翻译文件
36. 全链路追踪(Jaeger)
from jaeger_client import Configconfig = Config(config={'sampler': {'type': 'const', 'param': 1}}, service_name='user-service')
tracer = config.initialize_tracer()@app.route('/order')
def create_order():with tracer.start_span('order-creation') as span:span.log_kv({'event': 'start processing'})# 业务逻辑return "Order created"
解析:
- 追踪跨服务调用链路
- 分析系统性能瓶颈
- 结合 OpenTelemetry 标准
37. 服务熔断(Hystrix)
from pyhystrix import Commandclass PaymentCommand(Command):def run(self):return call_payment_service() # 可能失败的操作def fallback(self):return {"status": "payment system busy"}result = PaymentCommand().execute()
解析:
- 当失败率超过阈值时触发熔断
fallback
提供降级响应- 防止雪崩效应扩散
38. 服务发现(Consul)
import consulc = consul.Consul()# 注册服务
c.agent.service.register('user-service',service_id='user-service-1',address='10.0.0.1',port=5000,check={'HTTP': 'http://10.0.0.1:5000/health','Interval': '10s'}
)# 发现服务
_, services = c.catalog.service('user-service')
解析:
- 服务实例自动注册/注销
- 健康检查机制剔除故障节点
- 客户端负载均衡基础
39. 特征开关(Feature Flag)
from flask import request
from UnleashClient import UnleashClientunleash = UnleashClient(url="http://unleash:4242/api",app_name="my-api",environment="prod"
)@app.route('/new-feature')
def new_feature():if unleash.is_enabled("new-ui"):return render_template('new-ui.html')else:return render_template('old-ui.html')
解析:
- 灰度发布新功能
- 动态控制功能可见性
- 支持用户分群测试
40. 实时日志分析
import logging
from logging.handlers import HTTPHandlerhttp_handler = HTTPHandler(host='logstash:8080',url='/logs',method='POST'
)
logger.addHandler(http_handler)logger.error("Payment failed", extra={"user_id": 123,"order_id": "ABC-123"
})
解析:
- 将日志发送到 ELK 等分析系统
- 结构化日志便于检索分析
- 快速定位生产环境问题
架构设计原则总结
- 解耦原则:服务间通过明确定义的接口通信
- 弹性设计:具备容错、限流、熔断能力
- 可观测性:日志/监控/追踪三位一体
- 自动化优先:测试/部署/扩缩容自动化
- 持续演进:拥抱云原生技术栈
每个知识点对应实际生产场景中的关键需求,理解原理后可根据业务特点灵活组合使用。
声明:本文通过AI及自己整理,如有雷同纯属巧合