
教程总体简介:2. 目标 1.1产品与开发 1.2环境配置 1.3 运行方式 1.4目录说明 1.5数据库设计 2.用户认证 Json Web Token(JWT) 3.书架 4.1分类列表 5.搜索 5.3搜索-精准&高匹配&推荐 6.小说 6.4推荐-同类热门推荐 7.浏览记录 8.1配置-阅读偏好 8.配置 9.1项目部署uWSGI 配置 启动 9.部署 10.1异常和日志 10.补充 10.2 flask-restful 1.项目目录实现 3.数据库迁移: 1.JWT:json web token 2.jwt工具的封装 4.用户权限校验 5.登录验证装饰器 1.书架列表 2.书架管理 3.最后阅读 2.分类书籍列表 3.热门搜索 7.3小说-详情 2.小说目录 2.阅读偏好 3.阅读设置
完整笔记资料代码:https://gitee.com/yinuo112/Backend/tree/master/Flask/嘿马文学web完整flask项目/note.md
感兴趣的小伙伴可以自取哦~
全套教程部分目录:

部分文件图片:

2.用户认证
2.3用户登录
- 在applet_app/user.py文件中实现业务。
1-1 用户登录接口设计
- 接口名称:用户登录
- 接口路径:/user/login
- 请求方法:POST
- 请求参数:
参数名称 | 参数类型 | 是否必须 | 参数位置 | 参数说明 |
---|
Content-Type | 无 | True | 请求头 | 数据类型application/json |
envryptedData | string | True | body | 包括用户信息的加密数据,见开发文档,[ |
iv | string | True | body | 加密字符的初始向量 |
code | string | True | body | 用户登录凭证。开发者需要在开发者服务器使用code换取openid和session_key 等信息 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | {/** jwt token*/"token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYW1lIjoiSm9obiBEb2UiLCJhZG1pbiI6dHJ1ZX0.OLvs36KmqB9cmsUrMpUutfhV52_iSz4bQMYJjkI_TLQ",/** 用户信息*/"userInfo": {"uid": "199282123", // 用户id"gender": 1, // 1 男 0 女// 头像"avatarUrl": "},/** 阅读配置*/"config": {"preference": "0", // 0 女 1 男"brightness": 90, // 10~100 亮度"fontSize": "18", // 字号"background": "B1", // B1 ~ B6 内置背景"turn": "T1" // T1 仿真 T2 平滑 T3 无 翻页模式}
}
|
1-2 代码实现
1、创建蓝图
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | from flask import Blueprintuser_bp = Blueprint('user', __name__)
|
2、第三方登录
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | @user_bp.route("/login",methods=['POST'])
def login():
""" 第三方登录 """# code 用户登录凭证(有效期五分钟)。开发者需要在开发者服务器后台调用 auth.code2Session,# 使用 code 换取 openid 和 session_key 等信息code = request.json.get('code', '')# iv 加密算法的初始向量iv = request.json.get('iv', '')# encryptedData 包括敏感数据在内的完整用户信息的加密数据encryptedData = request.json.get('encryptedData', '')if not iv or not encryptedData or not code:return jsonify(msg='参数有误'), 403data = wxauth.get_wxapp_session_key(code)print('data', data)if 'session_key' not in data:return jsonify(msg='获取session_key失败', data=data), 500user_info = wxauth.get_user_info(encryptedData, iv, data['session_key'])if 'openId' not in user_info:return jsonify(msg='获取用户信息失败', user_info=user_info), 403user = User.query.filter_by(openId=user_info.get('openId')).first()if not user:user = User(user_info)db.session.add(user)db.session.flush()# 书架增加随机书籍-----后面完成# _add_book_shelf(user.id, user.gender)else:# 更新用户信息user.update_info(user_info)db.session.commit()# 生成jwt tokentoken = _generate_tokens(user.id)ret_data = {"token": token,"userInfo": {"uid": user.id,"gender": user.gender,"avatarUrl": user.avatarUrl},"config": {"preference": user.preference,"brightness": user.brightness,"fontSize": user.fontSize,"background": user.background,"turn": user.turn}}return jsonify(ret_data)
|
3、为了后续项目功能演示,可以添加测试用户,用来测试功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | @user_bp.route('/tmp_add_user', methods=['POST'])
def tmp_add_user():
"""
添加测试用户
:return:
"""data = dict(openId='1'*32,nickName='测试用户002',gender=1,city='广州市',province='广东省',country='中国',avatarUrl='default')user = User(data)db.session.add(user)db.session.commit()# 后面完成# _add_book_shelf(user.id, user.gender)ret_data = {'msg': '添加成功','user_id': user.id,}return jsonify(ret_data)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | @user_bp.route('/tmp_login')
def tmp_login():
"""
登录测试接口
:return:
"""user_id = request.args.get('user_id')user = User.query.get(user_id)# 生成jwt tokentoken = _generate_tokens(user_id)ret_data = {"token": token,"userInfo": {"uid": user.id,"gender": user.gender,"avatarUrl": user.avatarUrl},"config": {"preference": user.preference,"brightness": user.brightness,"fontSize": user.fontSize,"background": user.background,"turn": user.turn}}return jsonify(ret_data)
|
5、使用postman对接口进行测试:

2.4用户认证
1-1 校验用户
- 用户登录完成后,在项目的后续接口中,有些需要用户登录才能访问的接口,所以,需要校验用户身份信息。
- 我们需要在每次请求前,从请求的JWTtoken的payload中取出用户id,用来校验用户权限,可以通过请求钩子实现,在项目目录/lib/middlewares.py文件,具体见代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | def before_request():g.user_id = Noneauthorization = request.headers.get('Authorization')print(authorization)payload = verify_jwt(authorization)print(payload)if payload:g.user_id = payload['user_id']
|
1-2 登录验证装饰器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | from functools import wraps
from flask import g, jsonifydef login_required(func):# wraps的作用让被装饰器装饰的函数的属性不发生变化。@wraps(func)def wrapper(*args, **kwargs):if not g.user_id:return jsonify({'msg': 'token error'}), 401return func(*args, **kwargs)return wrapper
|
2.用户认证
3.1书架列表
- 在applet_app/mybooks.py文件中实现业务。
1-1 书架列表的基本业务:
- 用户必须登录,可以添加我们前面实现登录验证装饰器。
- 默认查询书架所有书籍,倒序排序。
- 如果书架没有书籍,查询书籍数据库,随机挑选5本书籍,存入书架中。
- 如果书架有书籍,返回书籍数据。
2-2 代码实现
1、创建蓝图
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | from flask import Blueprintmybooks_bp = Blueprint('mybooks', __name__)
|
2、定义视图
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | @mybooks_bp.route('/')
@login_required
def mybooks_list():
"""
我的书架-列表
:return:
"""mybooks = BookShelf.query.filter_by(user_id=g.user_id).order_by(BookShelf.created.desc()).all()data = []if not mybooks:books = Book.query.all()bs_books = random.sample(books, 5)for book in bs_books:bs = BookShelf(user_id=g.user_id,book_id=book.book_id,book_name=book.book_name,cover=book.cover)db.session.add(bs)data.append({'id': book.book_id,'imgURL': ' book.cover),'title': book.book_name})db.session.commit()return jsonify(data)else:for bs in mybooks:data.append({'id': bs.book_id,'imgURL': ' bs.cover),'title': bs.book_name})return jsonify(data)
|
3、使用postman对接口进行测试:
3.2书架管理
- 在applet_app/mybooks.py文件中实现业务。
- 书架的业务,主要是添加和删除书籍。
1-1 添加书籍的基本业务
- 添加登录验证装饰器。
- 需要传入参数书籍id
- 查询书籍表,确认书籍的存在。
- 查询书架表,如果书架上没有该书籍,添加书籍。
- 否则,返回该书籍已在书架中。
1-2 代码实现
1、定义蓝图
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | from flask import Blueprintmybooks_bp = Blueprint('mybooks', __name__)
|
2、定义视图类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | # 定义路由,书架管理--添加书籍@login_required
@mybooks_bp.route("/<book_id>",methods=['POST'])
def add_book(book_id):
"""
book_id:url固定参数,必须作为视图参数直接传入,Flask中使用转换器进行处理,默认的数据类型是str;
:return:
"""# 1.添加登录验证装饰器user_id = g.user_id# 2.接收参数,书籍id# 3.根据书籍id,查询书籍表,确认数据的存在book = Book.query.filter(Book.book_id==book_id).first()# 确认查询结果if not book:return jsonify(msg='书籍不存在'),404# 4.查询书架表,确认该书在书架中是否存在book_shelf = BookShelf.query.filter(BookShelf.user_id==user_id,BookShelf.book_id==book_id).first()# 判断书架的查询结果if not book_shelf:# 5.如果书架中不存在,添加书籍bk_shelf = BookShelf(user_id=user_id,book_id=book.book_id,book_name=book.book_name,cover=book.cover)db.session.add(bk_shelf)db.session.commit()# 返回添加成功的信息return jsonify(msg='添加成功')else:# 否则,书架中该书籍已经存在return jsonify(msg='书架中该书籍已经存在'),400
|
3、使用postman对接口进行测试:

1-3 删除书籍的基本业务
- 根据传入的参数书籍id,查询书架表
- 判断查询结果,如果书籍不存在,返回信息
- 否则,删除书架中的该书籍数据
1、具体见代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | @login_required
@mybooks_bp.route("/<book_id>",methods=['DELETE'])
def del_book(book_id):# - 1.添加登录验证装饰器user_id = g.user_id# - 2.接收参数,书籍idbk_shelf = BookShelf.query.filter_by(user_id=user_id,book_id=book_id).first()# - 3.根据书籍id,查询书籍表,确认数据的存在if not bk_shelf:return jsonify(msg='该书籍在书架中不存在'),400# - 4.删除书籍db.session.delete(bk_shelf)db.session.commit()return jsonify(msg='删除成功')pass
|
2、使用postman对接口进行测试:
3.3最后阅读
- 在applet_app/mybooks.py文件中实现业务。
1-1 最后阅读的基本业务
1-2 代码实现
1、定义蓝图
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | from flask import Blueprintmybooks_bp = Blueprint('mybooks', __name__)
|
2、定义视图
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | # 定义路由,书架管理--最后阅读@login_required
@mybooks_bp.route("/last")
def book_last_reading():# 1.使用登录验证装饰器,获取用户信息user_id = g.user_iduser = User.query.get(user_id)read_rate = None# 2.判断用户没有阅读书籍if not user.last_read:# 3.如果用户没有阅读,默认查询第一本书籍,当做用户的阅读书籍# -----也可以查询书架的第一本书book = Book.query.first()# 保存用户的阅读书籍的iduser.last_read = book.book_id# 4.查询该书籍的章节信息,默认升序排序,bk_chapter = BookChapters.query.filter_by(book_id=book.book_id).order_by(BookChapters.chapter_id.asc()).first()# 保存用户的阅读书籍的章节信息user.last_read_chapter_id = bk_chapter.chapter_id# - 把查询结果,存入阅读进度表read_rate = ReadRate(user_id=user.id,book_id=book.book_id,chapter_id=bk_chapter.chapter_id,chapter_name=bk_chapter.chapter_name)# 保存数据db.session.add(read_rate)db.session.add(user)# 保存两个对象,参数必须列表# db.session.add_all([read_rate,user])db.session.commit()# 5.如果用户阅读书籍,查询用户阅读的书籍else:book = Book.query.get(user.last_read)# 6.判断是否有阅读进度,如果没有,查询阅读进度表if not read_rate:read_rate = ReadRate.query.filter_by(user_id=user.id,book_id=book.book_id,chapter_id=user.last_read_chapter_id).first()# 7.返回查询结果data = {'id':book.book_id,'title':book.book_name,'chapter':read_rate.chapter_name,'progress':read_rate.rate,'imgURL':'}# 转成json格式返回数据return jsonify(data)
|
3、使用postman对接口进行测试:
3.书架
4.1分类列表
- 在applet_app/category.py文件中实现业务。
1-1 分类列表的基本业务:
- 获取参数,性别
- 根据用户性别,查询书籍大分类
- 遍历查询结果,保存大分类数据
- 使用关系引用,获取大分类下面的二级分类数据
- 返回结果
1-2 代码实现
1、使用分类蓝图
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | from flask import Blueprintcategory_bp = Blueprint('category',__name__,url_prefix='/categories')
|
2、定义视图
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | @category_bp.route('/')
def category_list():# 1.用户参数,性别参数,查询字符串# 如果未传入参数,默认值为1表示男,int表示类型转换# gender = int(gender)gender = request.args.get("gender",1,int)# 2.根据性别,查询大分类数据bk_big_category = BookBigCategory.query.filter(BookBigCategory.channel==gender).all()data = []# 3.遍历大分类列表数据,保存for book_category in bk_big_category:big_temp = {'id':book_category.cate_id,'title':book_category.cate_name,'imgURL':''subCategory':[]}# 4.可以通过关系引用,获取大分类下面的二级分类数据for category in book_category.second_cates:temp = {'id':category.cate_id,'title':category.cate_name}big_temp['subCategory'].append(temp)data.append(big_temp)# 5.返回分类数据return jsonify(data)
|
3、使用postman对接口进行测试:
