为了让Flask应用在网页刷新或关闭后仍能保持后台任务运行,关键在于将任务执行与客户端请求分离。以下是两种实现方案:
方案一:使用后台线程(适合简单场景/开发环境)
通过Python的threading
模块启动独立线程执行任务,任务不受客户端请求生命周期影响。
from flask import Flask from threading import Threadapp = Flask(__name__) task_thread = None # 存储线程实例def long_running_task():import timewhile True:print("任务执行中...")time.sleep(5)@app.route('/start_task') def start_task():global task_threadif not task_thread or not task_thread.is_alive():task_thread = Thread(target=long_running_task)task_thread.daemon = True # 主进程退出时自动结束线程(按需设置)task_thread.start()return "任务已启动!"return "任务已在运行中"if __name__ == '__main__':app.run(debug=False, use_reloader=False) # 禁用调试重载器
注意事项:
-
线程管理:使用全局变量存储线程实例,避免重复启动。
-
上下文问题:若任务涉及Flask上下文(如数据库操作),需手动推送:
from flask import current_appdef long_running_task():with current_app.app_context():# 访问数据库等操作
-
生产部署:避免使用多线程处理高并发,考虑WSGI服务器如Gunicorn。
方案二:使用Celery任务队列(推荐生产环境)
通过分布式任务队列管理后台任务,支持持久化、重试和监控。
步骤1:安装依赖
pip install celery redis # 启动Redis服务(确保已安装)
步骤2:配置Celery
from flask import Flask from celery import Celeryapp = Flask(__name__) app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0' app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL']) celery.conf.update(app.config)@celery.task(bind=True) def long_running_task(self):import timewhile True:print("Celery任务执行中...")time.sleep(5)
步骤3:在Flask中触发任务
@app.route('/start_celery_task') def start_celery_task():long_running_task.delay()return "Celery任务已提交!"
管理任务:
-
监控任务状态:使用Flower(
celery -A your_app.celery flower
) -
获取结果:通过
AsyncResult
查询任务状态:from celery.result import AsyncResult@app.route('/task_status/<task_id>') def task_status(task_id):task = AsyncResult(task_id, app=celery)return {'status': task.status}
关键区别与选择建议:
特性 | 后台线程 | Celery |
---|---|---|
可靠性 | 进程崩溃则任务丢失 | 支持持久化,任务中断后可恢复 |
并发能力 | 适合轻量级任务 | 支持分布式、多Worker |
复杂度 | 简单,无需额外组件 | 需Redis/RabbitMQ作为消息代理 |
适用场景 | 开发/测试环境、简单任务 | 生产环境、复杂或关键任务 |
常见问题解决:
-
任务重复启动:通过全局变量或数据库记录任务状态。
-
线程无法停止:添加停止标志:
task_running = True def long_task():while task_running:# 执行操作 @app.route('/stop_task') def stop_task():global task_runningtask_running = False
-
Celery Worker离线:使用监控工具(如Supervisor)保持Worker进程存活。
选择适合你项目规模和需求的方案,即可实现Flask后台任务的持久化运行。