欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 房产 > 建筑 > 基于 Python schedule 的任务调度

基于 Python schedule 的任务调度

2025/2/8 7:28:30 来源:https://blog.csdn.net/lilongsy/article/details/145500488  浏览:    关键词:基于 Python schedule 的任务调度

schedule 是Python的第三方任务调度库,可以用来做定时任务,API简单易用,可以按照秒,分,小时,日期或者自定义事件执行时间,不需要额外的流程,非常轻量级,没有外部依赖,兼容Python 3.7、3.8、3.9、3.10和3.11。

安装

$ pip install schedule

每隔一段时间执行一次

import schedule
import timedef job():print("I'm working...")# Run job every 3 second/minute/hour/day/week,
# Starting 3 second/minute/hour/day/week from now
schedule.every(3).seconds.do(job)
schedule.every(3).minutes.do(job)
schedule.every(3).hours.do(job)
schedule.every(3).days.do(job)
schedule.every(3).weeks.do(job)# Run job every minute at the 23rd second
schedule.every().minute.at(":23").do(job)# Run job every hour at the 42nd minute
schedule.every().hour.at(":42").do(job)# Run jobs every 5th hour, 20 minutes and 30 seconds in.
# If current time is 02:00, first execution is at 06:20:30
schedule.every(5).hours.at("20:30").do(job)# Run job every day at specific HH:MM and next HH:MM:SS
schedule.every().day.at("10:30").do(job)
schedule.every().day.at("10:30:42").do(job)
schedule.every().day.at("12:42", "Europe/Amsterdam").do(job)# Run job on a specific day of the week
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)while True:schedule.run_pending()time.sleep(1)

1

用装饰器执行

@repeat装饰器来设置执行参数。

from schedule import every, repeat, run_pending
import time@repeat(every(10).minutes)
def job():print("I am a scheduled job")while True:run_pending()time.sleep(1)

给任务传递参数

do()可以传递额外的参数给任务。

import scheduledef greet(name):print('Hello', name)schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')from schedule import every, repeat@repeat(every().second, "World")
@repeat(every().day, "Mars")
def hello(planet):print("Hello", planet)

取消任务

import scheduledef some_task():print('Hello world')job = schedule.every().day.at('22:30').do(some_task)
schedule.cancel_job(job)

schedule.cancel_job(job)取消任务。

执行一次任务

启动任务后,调用schedule.CancelJob取消任务,就只执行一次任务。

import schedule
import timedef job_that_executes_once():# Do some work that only needs to happen once...return schedule.CancelJobschedule.every().day.at('22:30').do(job_that_executes_once)while True:schedule.run_pending()time.sleep(1)

获取所有的任务

schedule.get_jobs()获取所有任务。

import scheduledef hello():print('Hello world')schedule.every().second.do(hello)all_jobs = schedule.get_jobs()

取消所有任务

schedule.clear()取消所有任务。

import scheduledef greet(name):print('Hello {}'.format(name))schedule.every().second.do(greet)schedule.clear()

获取指定任务

通过tag设置标签,schedule.get_jobs('friend')来获取指定标签的任务。

import scheduledef greet(name):print('Hello {}'.format(name))schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')friends = schedule.get_jobs('friend')

取消指定任务

import scheduledef greet(name):print('Hello {}'.format(name))schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')schedule.clear('daily-tasks')

指定区间运行任务

def my_job():print('Foo')# Run every 5 to 10 seconds.
schedule.every(5).to(10).seconds.do(my_job)

执行任务直到指定时间(超时)

import schedule
from datetime import datetime, timedelta, timedef job():print('Boo')# run job until a 18:30 today
schedule.every(1).hours.until("18:30").do(job)# run job until a 2030-01-01 18:33 today
schedule.every(1).hours.until("2030-01-01 18:33").do(job)# Schedule a job to run for the next 8 hours
schedule.every(1).hours.until(timedelta(hours=8)).do(job)# Run my_job until today 11:33:42
schedule.every(1).hours.until(time(11, 33, 42)).do(job)# run job until a specific datetime
schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)

执行到下一个任务

schedule.idle_seconds()用来获取下一个任务计划执行的秒数。如果没有任务了,返回None。

import schedule
import timedef job():print('Hello')schedule.every(5).seconds.do(job)while 1:n = schedule.idle_seconds()if n is None:# no more jobsbreakelif n > 0:# sleep exactly the right amount of timetime.sleep(n)schedule.run_pending()

立即运行

import scheduledef job_1():print('Foo')def job_2():print('Bar')schedule.every().monday.at("12:40").do(job_1)
schedule.every().tuesday.at("16:40").do(job_2)schedule.run_all()# Add the delay_seconds argument to run the jobs with a number
# of seconds delay in between.
schedule.run_all(delay_seconds=10)

schedule.run_all()会让任务忽略计划,立即执行。schedule.run_all(delay_seconds=10)让任务恢复执行。

后台执行

import threading
import timeimport scheduledef run_continuously(interval=1):"""Continuously run, while executing pending jobs at eachelapsed time interval.@return cease_continuous_run: threading. Event which canbe set to cease continuous run. Please note that it is*intended behavior that run_continuously() does not runmissed jobs*. For example, if you've registered a job thatshould run every minute and you set a continuous runinterval of one hour then your job won't be run 60 timesat each interval but only once."""cease_continuous_run = threading.Event()class ScheduleThread(threading.Thread):@classmethoddef run(cls):while not cease_continuous_run.is_set():schedule.run_pending()time.sleep(interval)continuous_thread = ScheduleThread()continuous_thread.start()return cease_continuous_rundef background_job():print('Hello from the background thread')schedule.every().second.do(background_job)# Start the background thread
stop_run_continuously = run_continuously()# Do some other things...
time.sleep(10)# Stop the background thread
stop_run_continuously.set()

并行执行

import threading
import time
import scheduledef job():print("I'm running on thread %s" % threading.current_thread())def run_threaded(job_func):job_thread = threading.Thread(target=job_func)job_thread.start()schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)while 1:schedule.run_pending()time.sleep(1)

这里每个任务都启动一个线程来并行执行。

import time
import threading
import schedule
import queuedef job():print("I'm working")def worker_main():while 1:job_func = jobqueue.get()job_func()jobqueue.task_done()jobqueue = queue.Queue()schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)worker_thread = threading.Thread(target=worker_main)
worker_thread.start()while 1:schedule.run_pending()time.sleep(1)

这里用到用了一个任务队列来严格执行线程数。

设置时区

# Pass a timezone as a string
schedule.every().day.at("12:42", "Europe/Amsterdam").do(job)# Pass an pytz timezone object
from pytz import timezone
schedule.every().friday.at("12:42", timezone("Africa/Lagos")).do(job)

相关链接

https://schedule.readthedocs.io/en/stable/
https://github.com/dbader/schedule
https://schedule.readthedocs.io/en/stable/examples.html#run-a-job-every-x-minute

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com