数据库表结构和部分数据
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;-- ----------------------------
-- Table structure for user
-- ----------------------------
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键ID',`name` varchar(60) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '用户名',`password` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '密码',`avatar` varchar(60) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '头像',`sex` smallint(6) NULL DEFAULT 0 COMMENT '性别: 0 未知 1 男 2 女',`phone` varchar(30) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '手机号',`is_deleted` smallint(6) NULL DEFAULT 0 COMMENT '是否删除: 0 未删除 1 已删除',`create_time` datetime NULL DEFAULT NULL COMMENT '创建时间',`update_time` datetime NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 31 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = DYNAMIC;-- ----------------------------
-- Records of user
-- ----------------------------
INSERT INTO `user` VALUES (1, 'admin', '30780cc6f2e56945aaf9c9578c932e22', '1', 0, NULL, 0, '2023-03-29 02:34:58', '2023-03-29 02:34:58');
INSERT INTO `user` VALUES (2, 'user', '30780cc6f2e56945aaf9c9578c932e22', '', 1, NULL, 0, '2023-03-29 02:34:58', '2023-03-29 02:34:58');
INSERT INTO `user` VALUES (3, 'guest', '30780cc6f2e56945aaf9c9578c932e22', '2', 2, NULL, 0, '2023-03-29 02:34:58', '2023-03-29 02:34:58');
INSERT INTO `user` VALUES (4, 'guest', '30780cc6f2e56945aaf9c9578c932e22', '2', 2, NULL, 0, '2023-03-29 02:34:58', '2023-03-29 02:34:58');
INSERT INTO `user` VALUES (5, 'guest', '30780cc6f2e56945aaf9c9578c932e22', '2', 2, NULL, 0, '2023-03-29 02:34:58', '2023-03-29 02:34:58');SET FOREIGN_KEY_CHECKS = 1;
python 代码
import logging
from datetime import datetime
from typing import List, Optionalimport uvicorn
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel
from pydantic import create_model
from sqlalchemy import create_engine, Column, Integer, String, SmallInteger, DateTime, and_
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.future import select
from sqlalchemy.orm import sessionmaker# 配置 SQLAlchemy 日志,打印 SQL 语句
logging.basicConfig()
logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)# 修改为您的数据库连接信息
DATABASE_URL = "mysql://root:wonderful2021@127.0.0.1:3306/demo"engine = create_engine(DATABASE_URL, echo=True) # echo=True 打印 SQL 语句
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)Base = declarative_base()class User(Base):__tablename__ = "user"id = Column(Integer, primary_key=True, index=True)name = Column(String(60), nullable=False)password = Column(String(64), nullable=False)avatar = Column(String(60), nullable=True)sex = Column(SmallInteger, default=0)phone = Column(String(30), nullable=True)is_deleted = Column(SmallInteger, default=0)create_time = Column(DateTime, default=datetime.utcnow)update_time = Column(DateTime, default=datetime.utcnow)class UserResponse(BaseModel):id: intname: stravatar: Optional[str] = Nonesex: intphone: Optional[str] = Noneis_deleted: intcreate_time: datetimeupdate_time: datetimeclass Config:orm_mode = Truejson_encoders = {datetime: lambda v: v.strftime('%Y-%m-%d %H:%M:%S') # 自定义日期时间格式}app = FastAPI()# # 固定返回字段 字段与类型的映射
field_type_mapping = {'id': int,'name': str,'phone': str,'avatar': str,'create_time': str, # 这里使用 str,因为我们会在返回时格式化'update_time': str # 同上
}# 动态生成 Pydantic 模型
DynamicUserResponse = create_model('DynamicUserResponse',**{field: (field_type_mapping[field], ...) for field in field_type_mapping.keys()}
)def get_selected_fields(model, fields):"""根据字段列表动态选择模型字段"""return [getattr(model, field.strip()) for field in fields]def handle_none_values(data: dict) -> dict:"""处理返回值中的 None 值,转换为空字符串"""return {k: (v if v is not None else "") for k, v in data.items()}def format_datetime(dt: Optional[datetime]) -> str:"""格式化日期时间,返回字符串"""return dt.strftime('%Y-%m-%d %H:%M:%S') if dt else ""@app.get("/users/", response_model=List[DynamicUserResponse])
async def read_users(skip: int = Query(0, ge=0),limit: int = Query(10, ge=1),name: Optional[str] = None,sex: Optional[int] = None,phone: Optional[str] = None
):with SessionLocal() as session:filters = [User.is_deleted == 0] # 默认条件:未删除if name:filters.append(User.name.like(f"%{name}%")) # 模糊查询if sex is not None:filters.append(User.sex == sex) # 精确查询if phone:filters.append(User.phone.like(f"%{phone}%")) # 模糊查询# 固定返回字段selected_fields = get_selected_fields(User, field_type_mapping.keys())stmt = select(*selected_fields).where(and_(*filters)).offset(skip).limit(limit)result = session.execute(stmt)users = result.all()# 返回字典列表,处理 None 值并格式化日期时间return [handle_none_values({**dict(zip(field_type_mapping.keys(), user)),'create_time': format_datetime(user.create_time),'update_time': format_datetime(user.update_time)}) for user in users]@app.get("/users/{user_id}", response_model=DynamicUserResponse)
async def read_user(user_id: int
):with SessionLocal() as session:# 固定返回字段selected_fields = get_selected_fields(User, field_type_mapping.keys())stmt = select(*selected_fields).where(User.id == user_id, User.is_deleted == 0)result = session.execute(stmt)user = result.first()if user is None:raise HTTPException(status_code=404, detail="User not found")# 返回字典,处理 None 值并格式化日期时间return handle_none_values({**dict(zip(field_type_mapping.keys(), user)),'create_time': format_datetime(user.create_time),'update_time': format_datetime(user.update_time)})@app.get("/usersallfiled/", response_model=List[UserResponse])
async def read_users(skip: int = Query(0, ge=0),limit: int = Query(10, ge=1),name: Optional[str] = None,sex: Optional[int] = None,phone: Optional[str] = None
):with SessionLocal() as session:filters = [User.is_deleted == 0] # 默认条件:未删除if name:filters.append(User.name.like(f"%{name}%")) # 模糊查询if sex is not None:filters.append(User.sex == sex) # 精确查询if phone:filters.append(User.phone.like(f"%{phone}%")) # 模糊查询stmt = select(User).where(and_(*filters)).offset(skip).limit(limit)result = session.execute(stmt)users = result.scalars().all()return users@app.get("/usersallfiled/{user_id}", response_model=UserResponse)
async def read_user(user_id: int
):with SessionLocal() as session:stmt = select(User).where(User.id == user_id, User.is_deleted == 0)result = session.execute(stmt)user = result.scalars().first()if user is None:raise HTTPException(status_code=404, detail="User not found")return userif __name__ == "__main__":uvicorn.run(app, host="127.0.0.1", port=8000)
调用方式
get请求http://127.0.0.1:8000/usersallfiled/?name=admin&skip=0&limit=10
返回结果[{"id":1,"name":"admin","avatar":"1","sex":0,"phone":null,"is_deleted":0,"create_time":"2023-03-29 02:34:58","update_time":"2023-03-29 02:34:58"}]