环境:Windows 11、python 3.12.3、Django 4.2.11、 APScheduler 3.10.4
背景:工作需要使用且用法较为复杂,各种功能基本都使用了
事件:20240920
说明:记录,方便后期自己查找
1、搭建基础环境
文件结构图
蓝色代表文件,黑色代表目录,主要是django自动生成的文件以及apscheduler需要的文件
包括Django、APScheduler两个,代码如下:
新建scheduler文件
创建调度器,并配置启动函数
# scheduleJob\scheduler.pyfrom apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from django_apscheduler.jobstores import DjangoJobStore
from pytz import timezone, utc# 第二种方式,内嵌
jobstores = {"default": DjangoJobStore()}
executors = {"default": ThreadPoolExecutor(20), "processpool": ProcessPoolExecutor(5)}
job_defaults = { # 该参数既可以在创建scheduler对象时使用,也可以用在add_job中,对象范围广、优先级低'coalesce': True, # 是否合并积压的任务。如果设置为 True,当任务运行时间落后时,会只运行一次,而不是运行多次。默认值为 False。'max_instances': 2, # 允许的最大作业实例数。确保同一任务在同一时间不会有多个实例运行。默认值为 1。'misfire_grace_time': 30, # 设置任务错过其执行时间的容忍时间(以秒为单位)。如果任务在这个时间内错过了执行时间,将立即执行。如果设置为 None,则没有时间限制。'replace_existing': True # 如果添加的任务ID已存在,是否替换现有任务。默认值为 False。
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=timezone('Asia/Shanghai'))def start():scheduler.start()
在settings文件中添加应用并配置数据库
# scheduleJob\settings.pyINSTALLED_APPS = [# ...'django_apscheduler','testapscheduler',
]
ROOT_URLCONF = 'scheduleJob.urls'
DATABASES = {'default': {'ENGINE': 'django.db.backends.mysql','NAME': "scheduler",'USER': "root",'PASSWORD': "123456",'HOST': "localhost",'PORT': 3306,}
}
LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'Asia/Shanghai'
USE_I18N = True
USE_TZ = False
在apps中实现启动调度器
# testapscheduler\apps.pyfrom django.apps import AppConfigclass TestapschedulerConfig(AppConfig):default_auto_field = 'django.db.models.BigAutoField'name = 'testapscheduler'print("启动")def ready(self):from scheduleJob import schedulerscheduler.start()
在url中配置路由
# scheduleJob\urls.pyfrom django.contrib import admin
from django.urls import path
from testapscheduler.views import operate_taskurlpatterns = [path('admin/', admin.site.urls),path('operate_task/', operate_task),
]
在views文件中实现真正的逻辑处理
# testapscheduler\views.pyfrom scheduleJob.scheduler import scheduler
from django.http import HttpResponse
from datetime import datetime
from django.views.decorators.csrf import csrf_exemptimport time, json# Create your views here.
@csrf_exemptdef operate_task(request):# 动态任务id, 利用时间戳获取整数,如一秒内添加两个,则会出现bugp = request.POST.get("id")print(p, "============")my_task_id = int(datetime.timestamp(datetime.now()))# job参数job_kwargs = {"func":excute_task, # job的函数"id":str(my_task_id), # 添加的jobid, 必须是字符串"name":f"task_{my_task_id}", # 添加的job名称"kwargs":{"info":"test"}, # job的函数参数,本例中excute_task需要的参数"next_run_time": datetime.now(), # 添加任务成功后立即执行"replace_existing": True, "misfire_grace_time": 10, "coalesce": True, "max_instances": 10, "trigger": "interval", # 任务执行类型,还有date、cron"seconds": 10, # 任务执行间隔时间,代表每10s执行一次}scheduler.add_job(**job_kwargs)# scheduler.remove_all_jobs()return HttpResponse("add task success")def excute_task(info):time.sleep(3)print(info, "--------------------------------", datetime.now())
数据库迁移
运行前,先执行数据库迁移
python manage.py makemigrationspython manage.py migrate# 前者是将model层转为迁移文件migration
# 后者将新版本的迁移文件执行,更新数据库。
会在数据库生成两个表,引用方法如下
from django_apscheduler.models import DjangoJob, DjangoJobExecution
django_apscheduler_djangojob:对应Django中的DjangoJob,共计三个字段,分别为id、next_run_time、job_state,默认排序字段为next_run_time
django_apscheduler_djangojobexecution:对应Django中的DjangoJobExecution,共计八个字段,分别是id、status、run_time、duration、finished、exception、traceback、job_id。
postman请求测试
在postman中请求路由,代码如下:
# postman生成的请求代码import requests
import jsonurl = "localhost:8000/operate_task/"payload = json.dumps({"action": "start","id": 1
})
headers = {'Content-Type': 'application/json'
}response = requests.request("POST", url, headers=headers, data=payload)print(response.text)
实现效果
test -------------------------------- 2024-09-20 15:13:58.165250
test -------------------------------- 2024-09-20 15:13:58.165250
test -------------------------------- 2024-09-20 15:14:04.469140
test -------------------------------- 2024-09-20 15:14:04.469140
任务状态
#: constant indicating a scheduler's stopped state
STATE_STOPPED = 0
#: constant indicating a scheduler's running state (started and processing jobs)
STATE_RUNNING = 1
#: constant indicating a scheduler's paused state (started but not processing jobs)
STATE_PAUSED = 2
2、调度器动态操作
1、查询所有任务
# testapscheduler\views.py# ......@csrf_exempt
def query_all_task(request):# 查看所有任务job_list = scheduler.get_jobs()return JsonResponse([{x.name:x.id} for x in job_list], safe=False)
响应
[{"task_1726812545": "1726812545"},{"task_1726816011": "1726816011"}
]
2、查询某个任务
@csrf_exempt
def get_job(request):# 查询任务是否存在job_id = loads(request.body).get("id")msg = scheduler.get_job(job_id=job_id)print(msg)return JsonResponse({"msg":"success"})
3、移除所有任务
@csrf_exempt
def remove_all_jobs(request):# 移除所有任务, 事件代码是256scheduler.remove_all_jobs()return JsonResponse({"msg":"success"})
4、移除某个任务
@csrf_exempt
def remove_job(request):# 移除某个任务, 事件代码是1024job_id = loads(request.body).get("id")scheduler.remove_job(job_id=job_id)return JsonResponse({"msg":"success"})
5、暂停某个任务
@csrf_exempt
def pause_job(request):# 暂停某个任务, 事件代码是2048job_id = loads(request.body).get("id")scheduler.pause_job(job_id=job_id)return JsonResponse({"msg":"success"})
6、恢复某个任务
@csrf_exempt
def resume_job(request):# 恢复某个任务,仅能恢复已暂停的任务, 事件代码是2048job_id = loads(request.body).get("id")scheduler.resume_job(job_id=job_id)return JsonResponse({"msg":"success"})
7、添加某个任务
@csrf_exempt
def add_job(request):# 添加任务, 事件代码是512kwargs = loads(request.body)scheduler.add_job(**kwargs)return JsonResponse({"msg":"success"})
8、修改某个任务
@csrf_exempt
def modify_job(request):# 恢复某个任务,仅能恢复已暂停的任务, 事件代码是2048job_id = loads(request.body).get("id")changes = loads(request.body).get("changes")scheduler.modify_job(job_id=job_id, changes=changes)return JsonResponse({"msg":"success"})
9、打印所有任务信息
@csrf_exempt
def print_jobs(request):# 打印所有任务信息scheduler.print_jobs()return JsonResponse({"msg":"success"})
10、启动调度器
@csrf_exempt
def start(request):# 调度程序启动, 事件代码是1scheduler.start()return JsonResponse({"msg":"success"})
11、关闭调度器
@csrf_exempt
def shutdown(request):# 调度程序关闭, 事件代码是2scheduler.shutdown()return JsonResponse({"msg":"success"})
12、暂停调度器
@csrf_exempt
def pause(request):# 调度程序暂停, 事件代码是4scheduler.pause()return JsonResponse({"msg":"success"})
13、恢复调度器
@csrf_exempt
def resume(request):# 调度程序恢复, 事件代码是8scheduler.resume()return JsonResponse({"msg":"success"})
14、添加执行器
@csrf_exempt
def add_executor(request):# 调度程序添加执行器, 事件代码是16executors = {"default": ThreadPoolExecutor(20), "processpool": ProcessPoolExecutor(5)}scheduler.add_executor(executor=executors)
15、删除执行器
@csrf_exempt
def remove_executor(request):# 调度程序删除执行器, 事件代码是32alias = loads(request.body).get("alias")scheduler.remove_executor(alias=alias)return JsonResponse({"msg":"success"})
16、添加作业存储器
@csrf_exempt
def add_jobstore(request):# 调度程序添加作业存储器, 事件代码是64jobstores = {"default": DjangoJobStore()}scheduler.add_jobstore(jobstores=jobstores)
17、删除作业存储器
@csrf_exempt
def remove_jobstore(request):# 调度程序删除作业存储器, 事件代码是128alias = loads(request.body).get("alias")scheduler.remove_jobstore(alias=alias)return JsonResponse({"msg":"success"})
18、修改触发器参数
@csrf_exempt
def reschedule_job(request):# 调度程序修改触发器参数job_id = loads(request.body).get("job_id")trigger_args = loads(request.body).get("trigger_args")scheduler.reschedule_job(trigger_args=trigger_args, job_id=job_id)return JsonResponse({"msg":"success"})
未完待续 ······