欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 教育 > 培训 > APScheduler、Django实现定时任务,以及任务动态操作

APScheduler、Django实现定时任务,以及任务动态操作

2024/12/1 0:44:44 来源:https://blog.csdn.net/forevercui/article/details/142384341  浏览:    关键词:APScheduler、Django实现定时任务,以及任务动态操作

环境:Windows 11、python 3.12.3、Django 4.2.11、 APScheduler 3.10.4

背景:工作需要使用且用法较为复杂,各种功能基本都使用了

事件:20240920

说明:记录,方便后期自己查找

 1、搭建基础环境

文件结构图

蓝色代表文件,黑色代表目录,主要是django自动生成的文件以及apscheduler需要的文件

包括Django、APScheduler两个,代码如下:

新建scheduler文件

创建调度器,并配置启动函数

# scheduleJob\scheduler.pyfrom apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from django_apscheduler.jobstores import DjangoJobStore
from pytz import timezone, utc# 第二种方式,内嵌
jobstores = {"default": DjangoJobStore()}
executors = {"default": ThreadPoolExecutor(20), "processpool": ProcessPoolExecutor(5)}
job_defaults = {                    # 该参数既可以在创建scheduler对象时使用,也可以用在add_job中,对象范围广、优先级低'coalesce': True,               # 是否合并积压的任务。如果设置为 True,当任务运行时间落后时,会只运行一次,而不是运行多次。默认值为 False。'max_instances': 2,             # 允许的最大作业实例数。确保同一任务在同一时间不会有多个实例运行。默认值为 1。'misfire_grace_time': 30,       # 设置任务错过其执行时间的容忍时间(以秒为单位)。如果任务在这个时间内错过了执行时间,将立即执行。如果设置为 None,则没有时间限制。'replace_existing': True        # 如果添加的任务ID已存在,是否替换现有任务。默认值为 False。
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=timezone('Asia/Shanghai'))def start():scheduler.start()

 在settings文件中添加应用并配置数据库

# scheduleJob\settings.pyINSTALLED_APPS = [# ...'django_apscheduler','testapscheduler',
]
ROOT_URLCONF = 'scheduleJob.urls'
DATABASES = {'default': {'ENGINE': 'django.db.backends.mysql','NAME': "scheduler",'USER': "root",'PASSWORD': "123456",'HOST': "localhost",'PORT': 3306,}
}
LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'Asia/Shanghai'
USE_I18N = True
USE_TZ = False

 在apps中实现启动调度器

# testapscheduler\apps.pyfrom django.apps import AppConfigclass TestapschedulerConfig(AppConfig):default_auto_field = 'django.db.models.BigAutoField'name = 'testapscheduler'print("启动")def ready(self):from scheduleJob import schedulerscheduler.start()

 在url中配置路由

# scheduleJob\urls.pyfrom django.contrib import admin
from django.urls import path
from testapscheduler.views import operate_taskurlpatterns = [path('admin/', admin.site.urls),path('operate_task/', operate_task),
]

 在views文件中实现真正的逻辑处理

# testapscheduler\views.pyfrom scheduleJob.scheduler import scheduler
from django.http import HttpResponse
from datetime import datetime
from django.views.decorators.csrf import csrf_exemptimport time, json# Create your views here.
@csrf_exemptdef operate_task(request):# 动态任务id, 利用时间戳获取整数,如一秒内添加两个,则会出现bugp = request.POST.get("id")print(p, "============")my_task_id = int(datetime.timestamp(datetime.now()))# job参数job_kwargs = {"func":excute_task,                     # job的函数"id":str(my_task_id),                   # 添加的jobid, 必须是字符串"name":f"task_{my_task_id}",            # 添加的job名称"kwargs":{"info":"test"},               # job的函数参数,本例中excute_task需要的参数"next_run_time": datetime.now(),        # 添加任务成功后立即执行"replace_existing": True,               "misfire_grace_time": 10,               "coalesce": True,                       "max_instances": 10,                    "trigger": "interval",                  # 任务执行类型,还有date、cron"seconds": 10,                          # 任务执行间隔时间,代表每10s执行一次}scheduler.add_job(**job_kwargs)# scheduler.remove_all_jobs()return HttpResponse("add task success")def excute_task(info):time.sleep(3)print(info, "--------------------------------", datetime.now())

数据库迁移 

运行前,先执行数据库迁移

python manage.py makemigrationspython manage.py migrate# 前者是将model层转为迁移文件migration
# 后者将新版本的迁移文件执行,更新数据库。

 会在数据库生成两个表,引用方法如下

from django_apscheduler.models import DjangoJob, DjangoJobExecution

django_apscheduler_djangojob:对应Django中的DjangoJob,共计三个字段,分别为id、next_run_time、job_state,默认排序字段为next_run_time

django_apscheduler_djangojobexecution:对应Django中的DjangoJobExecution,共计八个字段,分别是id、status、run_time、duration、finished、exception、traceback、job_id。

postman请求测试

在postman中请求路由,代码如下:

# postman生成的请求代码import requests
import jsonurl = "localhost:8000/operate_task/"payload = json.dumps({"action": "start","id": 1
})
headers = {'Content-Type': 'application/json'
}response = requests.request("POST", url, headers=headers, data=payload)print(response.text)

实现效果

test -------------------------------- 2024-09-20 15:13:58.165250
test -------------------------------- 2024-09-20 15:13:58.165250
test -------------------------------- 2024-09-20 15:14:04.469140
test -------------------------------- 2024-09-20 15:14:04.469140

任务状态

#: constant indicating a scheduler's stopped state
STATE_STOPPED = 0
#: constant indicating a scheduler's running state (started and processing jobs)
STATE_RUNNING = 1
#: constant indicating a scheduler's paused state (started but not processing jobs)
STATE_PAUSED = 2

2、调度器动态操作

1、查询所有任务

# testapscheduler\views.py# ......@csrf_exempt
def query_all_task(request):# 查看所有任务job_list = scheduler.get_jobs()return JsonResponse([{x.name:x.id} for x in job_list], safe=False)

响应

[{"task_1726812545": "1726812545"},{"task_1726816011": "1726816011"}
]

2、查询某个任务

@csrf_exempt
def get_job(request):# 查询任务是否存在job_id = loads(request.body).get("id")msg = scheduler.get_job(job_id=job_id)print(msg)return JsonResponse({"msg":"success"})

 3、移除所有任务

@csrf_exempt
def remove_all_jobs(request):# 移除所有任务, 事件代码是256scheduler.remove_all_jobs()return JsonResponse({"msg":"success"})

4、移除某个任务

@csrf_exempt
def remove_job(request):# 移除某个任务, 事件代码是1024job_id = loads(request.body).get("id")scheduler.remove_job(job_id=job_id)return JsonResponse({"msg":"success"})

5、暂停某个任务

@csrf_exempt
def pause_job(request):# 暂停某个任务, 事件代码是2048job_id = loads(request.body).get("id")scheduler.pause_job(job_id=job_id)return JsonResponse({"msg":"success"})

6、恢复某个任务

@csrf_exempt
def resume_job(request):# 恢复某个任务,仅能恢复已暂停的任务, 事件代码是2048job_id = loads(request.body).get("id")scheduler.resume_job(job_id=job_id)return JsonResponse({"msg":"success"})

7、添加某个任务

@csrf_exempt
def add_job(request):# 添加任务, 事件代码是512kwargs = loads(request.body)scheduler.add_job(**kwargs)return JsonResponse({"msg":"success"})

8、修改某个任务

@csrf_exempt
def modify_job(request):# 恢复某个任务,仅能恢复已暂停的任务, 事件代码是2048job_id = loads(request.body).get("id")changes = loads(request.body).get("changes")scheduler.modify_job(job_id=job_id, changes=changes)return JsonResponse({"msg":"success"})

9、打印所有任务信息

@csrf_exempt
def print_jobs(request):# 打印所有任务信息scheduler.print_jobs()return JsonResponse({"msg":"success"})

10、启动调度器

@csrf_exempt
def start(request):# 调度程序启动, 事件代码是1scheduler.start()return JsonResponse({"msg":"success"})

11、关闭调度器

@csrf_exempt
def shutdown(request):# 调度程序关闭, 事件代码是2scheduler.shutdown()return JsonResponse({"msg":"success"})

12、暂停调度器

@csrf_exempt
def pause(request):# 调度程序暂停, 事件代码是4scheduler.pause()return JsonResponse({"msg":"success"})

13、恢复调度器

@csrf_exempt
def resume(request):# 调度程序恢复, 事件代码是8scheduler.resume()return JsonResponse({"msg":"success"})

14、添加执行器

@csrf_exempt
def add_executor(request):# 调度程序添加执行器, 事件代码是16executors = {"default": ThreadPoolExecutor(20), "processpool": ProcessPoolExecutor(5)}scheduler.add_executor(executor=executors)

15、删除执行器

@csrf_exempt
def remove_executor(request):# 调度程序删除执行器, 事件代码是32alias = loads(request.body).get("alias")scheduler.remove_executor(alias=alias)return JsonResponse({"msg":"success"})

16、添加作业存储器

@csrf_exempt
def add_jobstore(request):# 调度程序添加作业存储器, 事件代码是64jobstores = {"default": DjangoJobStore()}scheduler.add_jobstore(jobstores=jobstores)

17、删除作业存储器

@csrf_exempt
def remove_jobstore(request):# 调度程序删除作业存储器, 事件代码是128alias = loads(request.body).get("alias")scheduler.remove_jobstore(alias=alias)return JsonResponse({"msg":"success"})

18、修改触发器参数

@csrf_exempt
def reschedule_job(request):# 调度程序修改触发器参数job_id = loads(request.body).get("job_id")trigger_args = loads(request.body).get("trigger_args")scheduler.reschedule_job(trigger_args=trigger_args, job_id=job_id)return JsonResponse({"msg":"success"})

 未完待续 ······

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com