如果日常工作需求对定时器功能的依赖,
比如:
1、自动化脚本定时采集性能
2、在flask后端服务中,需要定时同步数据
3、定时启动某些程序
基于上述需要或者痛点,需要找一个定时器功能实现.
定时器功能第一时间想到的是linux自带的cron功能.
例子: crontab -e命令,新增一个如下命令,保存即可.
*/1 * * * * ~/pythonProject/dist/PingLocalMac >> ~/Desktop/PingLocalMac.txt
cron也有缺点: 1、不适合复杂的定时任务 2、定时任务修改,需要重启crontab管理 3、定时任务,没有状态存储,也不是知道是否执行了
如果你的需求正好是cron不能实现的,那给你推荐一款python轻量级定时器"apscheduler"
APScheduler是python的一个定时任务调度框架,能实现类似linux下crontab类型的任务,使用起来比较方便。它提供基于固定时间间隔、日期以及crontab配置类似的任务调度。
pip install apscheduler
触发器包含调度逻辑,描述一个任务何时被触发,按日期或按时间间隔或按 cronjob 表达式三种方式触发。每个作业都有它自己的触发器,除了初始配置之外,触发器是完全无状态的。
作业存储器指定了作业被存放的位置,默认情况下作业保存在内存,也可将作业保存在各种数据库中,当作业被存放在数据库中时,它会被序列化,当被重新加载时会反序列化。作业存储器充当保存、加载、更新和查找作业的中间商。在调度器之间不能共享作业存储。
执行器是将指定的作业(调用函数)提交到线程池或进程池中运行,当任务完成时,执行器通知调度器触发相应的事件。
调度器(schedulers):任务调度器,属于控制角色,通过它配置作业存储器、执行器和触发器,添加、修改和删除任务。调度器协调触发器、作业存储器、执行器的运行,通常只有一个调度程序运行在应用程序中,开发人员通常不需要直接处理作业存储器、执行器或触发器,配置作业存储器和执行器是通过调度器来完成的。
启动一个阻塞性脚本,每隔3s打印一次
# -*- coding: utf-8 -*-
# Time: 2018/10/13 19:01:30
# File Name: ex_interval.py
from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingScheduler
def tick():
print('Tick! The time is: %s' % datetime.now())
if __name__ == '__main__':
scheduler = BlockingScheduler()
scheduler.add_job(tick, 'interval', seconds=3)
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C '))
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
print("#######################")
jobstores = {
#'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite'),
# 本地数据库地址
# 使用内存作为数据库
'default': MemoryJobStore()
}
executors = {
'default': ThreadPoolExecutor(20), # 线程池
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3 # 实力个数
}
定时 cron 任务也非常简单,直接给触发器 trigger 传入 'cron' 即可。hour =19 ,minute =23 这里表示每天的19:23 分执行任务。这里可以填写数字,也可以填写字符串
# -*- coding: utf-8 -*-
# Time: 2018/10/13 19:21:09
# File Name: ex_cron.py
from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingScheduler
def tick():
print('Tick! The time is: %s' % datetime.now())
if __name__ == '__main__':
scheduler = BlockingScheduler()
scheduler.add_job(tick, 'cron', hour=19,minute=23)
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C '))
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
hour =19 , minute =23
hour ='19', minute ='23'
minute = '*/3' 表示每 5 分钟执行一次
hour ='19-21', minute= '23' 表示 19:23、 20:23、 21:23 各执行一次任务
"""
后台性定时器
"""
import os
import time
import datetime
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
import logging
logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)
def my_job(id='my_job_id'):
print(id, '-->', datetime.datetime.now())
#scheduler.pause() # 暂停
#scheduler.resume() # 恢复
#scheduler.start(paused=True)
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), id)
scheduler = BackgroundScheduler()
scheduler.add_job(my_job, trigger='interval',seconds=1, id='my_job_id_test')
scheduler.start()
jobstores = {
# 'mongo': MongoDBJobStore(),
#'mongo': {'type': 'mongodb'},
#'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite'),
# 使用数据库作为存储器
'default': MemoryJobStore()
}
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
scheduler.remove_job('my_job_id')
# 移除任务
print(scheduler.get_jobs())
# 获取任务列表
print(scheduler.get_job(job_id="my_job_id"))
# 获取任务详情
temp_dict = {"seconds": 5}
temp_trigger = scheduler._create_trigger(trigger='interval', trigger_args=temp_dict)
result = scheduler.modify_job(job_id='my_job_id_test', trigger=temp_trigger)
# 修改任务
result = scheduler.reschedule_job(job_id='my_job_id_test', trigger='interval', seconds=4)
# 修改任务
scheduler.pause()
# 暂停作业
scheduler.resume()
# 恢复作业
def my_listener(event):
if event.exception:
print(event.exception)
# 打印异常信息
print('The job crashed')
else:
print('The job worked'
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有