高效的 Web 应用认证与授权:基于 FastAPI 和 JWT 的实现与优化
目录
- 🛡️ 1. JWT 认证概述与工作原理
- 🔐 2. 使用 JWT 进行用户认证
- ⚡ 3. 在 FastAPI 中实现 JWT 登录与认证
- ⏳ 4. Token 验证与自动过期机制
🛡️ 1. JWT 认证概述与工作原理
JSON Web Token(JWT)是一种开放标准(RFC 7519),用于在网络应用环境间传递声明,并且以 JSON 对象的形式安全地表示。JWT 常用于实现 Web 应用中的认证与授权。它将用户的身份信息通过签名加密后存储在客户端,并且能够在每次请求时进行校验,从而减少服务器的负担。
JWT 结构
JWT 由三部分组成:
- Header:头部,通常包含两部分信息:令牌的类型(JWT)和所使用的签名算法(如 HMAC SHA256 或 RSA)。
- Payload:有效负载,包含声明(Claims),可以是用户的身份信息或其他必要的信息。声明分为三类:
- 注册声明(Registered Claims):预定义的标准声明,如
iss
(发行者)、exp
(过期时间)等。 - 公共声明(Public Claims):可以自由定义的声明,如用户名、用户ID等。
- 私有声明(Private Claims):客户端和服务端之间约定使用的声明,不同应用之间不能互通。
- 注册声明(Registered Claims):预定义的标准声明,如
- Signature:签名部分,用于验证 JWT 的完整性,并确认消息未被篡改。签名通过 Header 和 Payload 以及一个密钥生成。
JWT 的工作流程通常如下:
- 用户登录时,服务器验证用户名和密码的正确性。
- 验证通过后,服务器生成 JWT,并将其返回给客户端。
- 客户端在后续的每个请求中将 JWT 放在 HTTP 头部中,服务器通过密钥验证 JWT 是否有效,确保请求合法。
JWT 的优势
JWT 的一个主要优势是其无状态性。JWT 本身包含了用户身份的所有信息,因此服务器无需存储会话数据,可以显著减轻后端存储的负担。这对于分布式系统尤其重要,因为在多台服务器间共享会话数据会非常复杂和低效。JWT 还支持跨域认证,适合微服务架构和单页面应用(SPA)等场景。
安全性
虽然 JWT 提供了一定的安全性,但需要注意以下几点:
- 使用 HTTPS:JWT 的内容通常是明文传输的,因此需要通过 HTTPS 来加密传输,防止被中间人攻击。
- 密钥的管理:签名密钥必须保密,泄露密钥将导致 JWT 被伪造。
- Token 的过期时间:JWT 默认没有失效机制,需要手动设置过期时间,并确保及时处理过期的 Token。
🔐 2. 使用 JWT 进行用户认证
JWT 认证的核心功能是通过携带 JWT Token 来验证用户的身份。实现 JWT 认证的关键在于如何正确地颁发和验证 Token。
用户登录与 Token 生成
JWT 认证的第一步是用户登录。当用户通过用户名和密码进行身份验证时,服务端需要生成一个 JWT Token,并将其返回给客户端。这个 Token 中通常包含用户的基本信息以及过期时间等。
以下是使用 pyjwt
库生成 JWT Token 的代码示例:
import jwt
from datetime import datetime, timedelta
from typing import OptionalSECRET_KEY = "your_secret_key" # 用于签名的密钥
ALGORITHM = "HS256" # 签名算法
ACCESS_TOKEN_EXPIRE_MINUTES = 30 # Token 过期时间(单位:分钟)# 创建 JWT Token 的函数
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):to_encode = data.copy()if expires_delta:expire = datetime.utcnow() + expires_deltaelse:expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)to_encode.update({"exp": expire}) # 设置过期时间encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)return encoded_jwt
在上面的代码中,create_access_token
函数通过 jwt.encode
方法生成一个 JWT,其中 exp
字段设置为 Token 的过期时间。SECRET_KEY
用于签名,ALGORITHM
指定了 HMAC SHA256 算法。
用户登录与 Token 返回
from fastapi import FastAPI, Depends, HTTPException, status
from pydantic import BaseModelapp = FastAPI()class UserLogin(BaseModel):username: strpassword: str@app.post("/login")
def login(user: UserLogin):# 假设用户验证通过(实际应用中应使用数据库进行验证)if user.username == "test" and user.password == "password":# 生成访问令牌access_token = create_access_token(data={"sub": user.username})return {"access_token": access_token, "token_type": "bearer"}raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Invalid credentials",)
在上面的示例中,/login
路由接收用户名和密码,在验证通过后返回 JWT Token,客户端可以将其存储在本地(如浏览器的 localStorage 或 sessionStorage)并在后续请求中带上该 Token。
⚡ 3. 在 FastAPI 中实现 JWT 登录与认证
FastAPI 提供了与 JWT 认证集成的便捷方法,通过依赖注入系统,能够简洁地实现 Token 的验证和保护路由。通过 Depends
可以轻松地对 API 路由进行授权控制。
JWT Token 认证依赖
首先,需要定义一个用于验证 JWT Token 的依赖项。这个依赖项会检查请求中的 Authorization 头部,解析其中的 Token,并验证其有效性。
from fastapi import Security, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwtoauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")# JWT Token 校验函数
def verify_token(token: str = Depends(oauth2_scheme)):try:payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])username: str = payload.get("sub")if username is None:raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Token has no subject",)return usernameexcept JWTError:raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Token is invalid",)
在这个例子中,verify_token
函数会解码传入的 JWT Token,检查其是否有效。如果有效,则返回 Token 中存储的用户名,否则抛出 HTTP_401_UNAUTHORIZED
错误。
保护路由
通过将 verify_token
函数作为依赖项传递给 FastAPI 路由,可以保护那些需要用户认证的路由。只有携带有效 Token 的用户才能访问这些受保护的资源。
@app.get("/protected")
def protected_route(username: str = Depends(verify_token)):return {"message": f"Hello, {username}! This is a protected route."}
通过上面的代码,只有提供有效 Token 的用户才能访问 /protected
路由。如果 Token 无效或缺失,FastAPI 会自动返回 401 未授权错误。
⏳ 4. Token 验证与自动过期机制
JWT 的一个重要特性是过期时间。为了增强安全性,生成的 Token 会设置一个过期时间(exp
),一旦 Token 过期,用户需要重新登录以获取新的 Token。FastAPI 可以在每个请求时自动验证 Token 是否过期,并根据需要处理 Token 的刷新。
Token 过期处理
在验证 Token 时,可以检查其是否已过期,如果过期则抛出异常。JWT 库会自动处理过期时间并抛出 JWTError
错误。
def verify_token(token: str = Depends(oauth2_scheme)):try:payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])username: str = payload.get("sub")if username is None:raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Token has no subject",)# 检查 Token 是否过期if payload.get("exp") < datetime.utcnow().timestamp():raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Token has expired",)return usernameexcept JWTError:raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Token is invalid",)
通过 payload.get("exp")
获取 Token 的过期时间,检查当前时间是否已超过该时间,如果超时则返回 401 错误。
刷新 Token
为了避免频繁登录,可以实现 Token 刷新功能。通过提供一个刷新接口,用户可以通过旧 Token 获取新的有效 Token。
@app.post("/refresh")
def refresh_token(token: str = Depends(oauth2_scheme)):try:# 解码旧 Tokenpayload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])username = payload.get("sub")if username is None:raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Invalid token",)# 创建新 Tokennew_token = create_access_token(data={"sub": username})return {"access_token": new_token, "token_type": "bearer"}except JWTError:raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Token is invalid or expired",)
用户可以在旧 Token 过期前通过 /refresh
路由请求新的 Token,从而避免频繁登录。