Python定时库APScheduler怎么用,其原理如何理解
Admin 2022-06-21 群英技术资讯 335 次浏览
APscheduler全称Advanced Python Scheduler
作用为在指定的时间规则执行指定的作业。
APScheduler版本 3.6.3
作用
Job作为APScheduler最小执行单位。
创建Job时指定执行的函数,函数中所需参数,Job执行时的一些设置信息。
构建说明
id:指定作业的唯一ID
name:指定作业的名字
trigger:apscheduler定义的触发器,用于确定Job的执行时间,根据设置的trigger规则,计算得到下次执行此job的时间, 满足时将会执行
executor:apscheduler定义的执行器,job创建时设置执行器的名字,根据字符串你名字到scheduler获取到执行此job的 执行器,执行job指定的函数
max_instances:执行此job的最大实例数,executor执行job时,根据job的id来计算执行次数,根据设置的最大实例数来确定是否可执行
next_run_time:Job下次的执行时间,创建Job时可以指定一个时间[datetime],不指定的话则默认根据trigger获取触发时间
misfire_grace_time:Job的延迟执行时间,例如Job的计划执行时间是21:00:00,但因服务重启或其他原因导致21:00:31才执行,如果设置此key为40,则该job会继续执行,否则将会丢弃此job
coalesce:Job是否合并执行,是一个bool值。例如scheduler停止20s后重启启动,而job的触发器设置为5s执行一次,因此此job错过了4个执行时间,如果设置为是,则会合并到一次执行,否则会逐个执行
func:Job执行的函数
args:Job执行函数需要的位置参数
kwargs:Job执行函数需要的关键字参数
Trigger绑定到Job,在scheduler调度筛选Job时,根据触发器的规则计算出Job的触发时间,然后与当前时间比较
确定此Job是否会被执行,总之就是根据trigger规则计算出下一个执行时间。
Trigger有多种种类,指定时间的DateTrigger,指定间隔时间的IntervalTrigger,像Linux的crontab一样的CronTrigger
目前APScheduler支持触发器:
DateTrigger
IntervalTrigger
CronTrigger
Executor在scheduler中初始化,另外也可通过scheduler的add_executor动态添加Executor。
每个executor都会绑定一个alias,这个作为唯一标识绑定到Job,在实际执行时会根据Job绑定的executor找到实际的执行器对象,然后根据执行器对象执行Job
Executor的种类会根据不同的调度来选择,如果选择AsyncIO作为调度的库,那么选择AsyncIOExecutor,如果选择tornado作为调度的库,选择TornadoExecutor,如果选择启动进程作为调度,选择ThreadPoolExecutor或者ProcessPoolExecutor都可以
Executor的选择需要根据实际的scheduler来选择不同的执行器
目前APScheduler支持的Executor:
AsyncIOExecutor
GeventExecutor
ThreadPoolExecutor
ProcessPoolExecutor
TornadoExecutor
TwistedExecutor
Jobstore在scheduler中初始化,另外也可通过scheduler的add_jobstore动态添加Jobstore。每个jobstore都会绑定一个alias,scheduler在Add Job时,根据指定的jobstore在scheduler中找到相应的jobstore,并将job添加到jobstore中。
Jobstore主要是通过pickle库的loads和dumps【实现核心是通过python的__getstate__和__setstate__重写实现】,每次变更时将Job动态保存到存储中,使用时再动态的加载出来,作为存储的可以是redis,也可以是数据库【通过sqlarchemy这个库集成多种数据库】,也可以是mongodb等
目前APScheduler支持的Jobstore:
MemoryJobStore
MongoDBJobStore
RedisJobStore
RethinkDBJobStore
SQLAlchemyJobStore
ZooKeeperJobStore
Event是APScheduler在进行某些操作时触发相应的事件,用户可以自定义一些函数来监听这些事件,当触发某些Event时,做一些具体的操作
常见的比如。Job执行异常事件 EVENT_JOB_ERROR。Job执行时间错过事件 EVENT_JOB_MISSED。
目前APScheduler定义的Event
EVENT_SCHEDULER_STARTED
EVENT_SCHEDULER_START
EVENT_SCHEDULER_SHUTDOWN
EVENT_SCHEDULER_PAUSED
EVENT_SCHEDULER_RESUMED
EVENT_EXECUTOR_ADDED
EVENT_EXECUTOR_REMOVED
EVENT_JOBSTORE_ADDED
EVENT_JOBSTORE_REMOVED
EVENT_ALL_JOBS_REMOVED
EVENT_JOB_ADDED
EVENT_JOB_REMOVED
EVENT_JOB_MODIFIED
EVENT_JOB_EXECUTED
EVENT_JOB_ERROR
EVENT_JOB_MISSED
EVENT_JOB_SUBMITTED
EVENT_JOB_MAX_INSTANCES
Listener表示用户自定义监听的一些Event,当Job触发了EVENT_JOB_MISSED事件时
可以根据需求做一些其他处理。
Scheduler是APScheduler的核心,所有相关组件通过其定义。scheduler启动之后,将开始按照配置的任务进行调度。
除了依据所有定义Job的trigger生成的将要调度时间唤醒调度之外。当发生Job信息变更时也会触发调度。
scheduler可根据自身的需求选择不同的组件,如果是使用AsyncIO则选择AsyncIOScheduler,使用tornado则
选择TornadoScheduler。
目前APScheduler支持的Scheduler:
AsyncIOScheduler
BackgroundScheduler
BlockingScheduler
GeventScheduler
QtScheduler
TornadoScheduler
TwistedScheduler
这里重点挑选两个重要的流程画一个简陋的流程图,来看一下scheduler的工作原理。其一个是添加add job,另一是scheduler每次唤醒调度时的执行过程
AsyncIO调度示例
import asyncio import datetime from apscheduler.events import EVENT_JOB_EXECUTED from apscheduler.executors.asyncio import AsyncIOExecutor from apscheduler.jobstores.redis import RedisJobStore # 需要安装redis from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger from apscheduler.triggers.cron import CronTrigger # 定义jobstore 使用redis 存储job信息 default_redis_jobstore = RedisJobStore( db=2, jobs_key="apschedulers.default_jobs", run_times_key="apschedulers.default_run_times", host="127.0.0.1", port=6379, password="test" ) # 定义executor 使用asyncio是的调度执行规则 first_executor = AsyncIOExecutor() # 初始化scheduler时,可以直接指定jobstore和executor init_scheduler_options = { "jobstores": { # first 为 jobstore的名字,在创建Job时直接直接此名字即可 "default": default_redis_jobstore }, "executors": { # first 为 executor 的名字,在创建Job时直接直接此名字,执行时则会使用此executor执行 "first": first_executor }, # 创建job时的默认参数 "job_defaults": { 'coalesce': False, # 是否合并执行 'max_instances': 1 # 最大实例数 } } # 创建scheduler scheduler = AsyncIOScheduler(**init_scheduler_options) # 启动调度 scheduler.start() second_redis_jobstore = RedisJobStore( db=2, jobs_key="apschedulers.second_jobs", run_times_key="apschedulers.second_run_times", host="127.0.0.1", port=6379, password="test" ) scheduler.add_jobstore(second_redis_jobstore, 'second') # 定义executor 使用asyncio是的调度执行规则 second_executor = AsyncIOExecutor() scheduler.add_executor(second_executor, "second") # *********** 关于 APScheduler中有关Event相关使用示例 ************* # 定义函数监听事件 def job_execute(event): """ 监听事件处理 :param event: :return: """ print( "job执行job:\ncode => {}\njob.id => {}\njobstore=>{}".format( event.code, event.job_id, event.jobstore )) # 给EVENT_JOB_EXECUTED[执行完成job事件]添加回调,这里就是每次Job执行完成了我们就输出一些信息 scheduler.add_listener(job_execute, EVENT_JOB_EXECUTED) # *********** 关于 APScheduler中有关Job使用示例 ************* # 使用的是asyncio,所以job执行的函数可以是一个协程,也可以是一个普通函数,AsyncIOExecutor会根据配置的函数来进行调度, # 如果是协程则会直接丢入到loop中,如果是普通函数则会启用线程处理 # 我们定义两个函数来看看执行的结果 def interval_func(message): print("现在时间: {}".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))) print("我是普通函数") print(message) async def async_func(message): print("现在时间: {}".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))) print("我是协程") print(message) # 将上述的两个函数按照不同的方式创造触发器来执行 # *********** 关于 APScheduler中有关Trigger使用示例 ************* # 使用Trigger有两种方式,一种是用类创建使用,另一个是使用字符串的方式 # 使用字符串指定别名, scheduler初始化时已将其定义的trigger加载,所以指定字符串可以直接使用 if scheduler.get_job("interval_func_test", "default"): # 存在的话,先删除 scheduler.remove_job("interval_func_test", "default") # 立马开始 2分钟后结束, 每10s执行一次 存储到first jobstore second执行 scheduler.add_job(interval_func, "interval", args=["我是10s执行一次,存放在jobstore default, executor default"], seconds=10, id="interval_func_test", jobstore="default", executor="default", start_date=datetime.datetime.now(), end_date=datetime.datetime.now() + datetime.timedelta(seconds=240)) # 先创建tigger trigger = IntervalTrigger(seconds=5) if scheduler.get_job("interval_func_test_2", "second"): # 存在的话,先删除 scheduler.remove_job("interval_func_test_2", "second") # 每隔5s执行一次 scheduler.add_job(async_func, trigger, args=["我是每隔5s执行一次,存放在jobstore second, executor = second"], id="interval_func_test_2", jobstore="second", executor="second") # 使用协程的函数执行,且使用cron的方式配置触发器 if scheduler.get_job("cron_func_test", "default"): # 存在的话,先删除 scheduler.remove_job("cron_func_test", "default") # 立马开始 每10s执行一次 scheduler.add_job(async_func, "cron", args=["我是 每分钟 30s 时执行一次,存放在jobstore default, executor default"], second='30', id="cron_func_test", jobstore="default", executor="default") # 先创建tigger trigger = CronTrigger(second='20,40') if scheduler.get_job("cron_func_test_2", "second"): # 存在的话,先删除 scheduler.remove_job("cron_func_test_2", "second") # 每隔5s执行一次 scheduler.add_job(async_func, trigger, args=["我是每分钟 20s 40s时各执行一次,存放在jobstore second, executor = second"], id="cron_func_test_2", jobstore="second", executor="second") # 使用创建trigger对象直接创建 print("启动: {}".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))) asyncio.get_event_loop().run_forever()
输出结果部分截取
启动之后,每隔5s运行一次的JOB
启动: 2019-12-05 14:13:11
【这部分是定义的协程函数输出的内容】
现在时间: 2019-12-05 14:13:16
我是协程
我是每隔5s执行一次,存放在jobstore second, executor = second
【这部分是监听job执行完成之后的回调输出】
job执行job: code => 4096
job.id => interval_func_test_2
jobstore=>second
在20s和40s时各执行一次的Job
现在时间: 2019-12-05 14:13:20
我是协程
我是每分钟 20s 40s时各执行一次,存放在jobstore second, executor = second
job执行
job: code => 4096
job.id => cron_func_test_2
jobstore=>second
每隔10s执行一次的job
现在时间: 2019-12-05 14:13:21
我是普通函数
我是10s执行一次,存放在jobstore default, executor default
现在时间: 2019-12-05 14:13:21
我是协程
我是每隔5s执行一次,存放在jobstore second, executor = second
job执行job: code => 4096
job.id => interval_func_test
jobstore=>default
job执行
job: code => 4096
job.id => interval_func_test_2
jobstore=>second
每隔5s执行一次的Job
现在时间: 2019-12-05 14:13:26
我是协程
我是每隔5s执行一次,存放在jobstore second, executor = second
job执行
job: code => 4096
job.id => interval_func_test_2
jobstore=>second
每分钟30s时执行一次
现在时间: 2019-12-05 14:13:30
我是协程
我是 每分钟 30s 时执行一次,存放在jobstore default, executor default
job执行
job: code => 4096
job.id => cron_func_test
jobstore=>default
apscheduler的工作原理及用法基本这样。
apscheduler强大的地方是可以集成到tornado,django,flask等框架,也可以单独运行。比如CronTrigger还有更强大的用法,可以参照官网的cron用法
上面例子只列举了一些常规用法,其实还有一些更切合实际的用法,利用APSchedulder的特性,动态的添加Job,暂停Job,删除Job,重启Job等。先按照功能性质定义好不同的函数,然后开发一个web服务。在web服务中动态操作各种Job,可以想象在监控系统中根据需求添加一些任务,岂不美哉。
有时间将这部分做一个例子再来分享。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:mmqy2019@163.com进行举报,并提供相关证据,查实之后,将立刻删除涉嫌侵权内容。
猜你喜欢
这篇文章主要为大家详细介绍了python实现简单的飞机大战游戏,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下<BR>
怎样用Python实现文本的滚动播放?对于文本滚动播放的应用场景有很多,我们经常能在网站的顶部看到,文本滚动播放功能也是比较实用的,下面我们就来看看用Python怎样写文本的滚动播放功能。
这篇文章主要为大家详细介绍了如何利用Python清理重复的文件,文中的示例代码讲解详细,对我们学习Python有一定帮助,需要的可以参考一下
PyWebIO提供了一系列命令式的交互函数来在浏览器上获取用户输入和进行输出,将浏览器变成了一个“富文本终端”,可以用于构建简单的Web应用或基于浏览器的GUI应用。本文将利用PyWebIO制作一个网页版的数据查询器,感兴趣的可以学习一下
这篇文章主要为大家介绍了Python上下文管理器,具有一定的参考价值,感兴趣的小伙伴们可以参考一下,希望能够给你带来帮助<BR>
成为群英会员,开启智能安全云计算之旅
立即注册Copyright © QY Network Company Ltd. All Rights Reserved. 2003-2020 群英 版权所有
增值电信经营许可证 : B1.B2-20140078 粤ICP备09006778号 域名注册商资质 粤 D3.1-20240008