Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >python apschedule安装使用与源码分析

python apschedule安装使用与源码分析

作者头像
用户1225216
发布于 2018-03-05 06:27:30
发布于 2018-03-05 06:27:30
2.6K00
代码可运行
举报
文章被收录于专栏:扎心了老铁扎心了老铁
运行总次数:0
代码可运行

我们的项目中用apschedule作为核心定时调度模块。所以对apschedule进行了一些调查和源码级的分析。

1、为什么选择apschedule?

听信了一句话,apschedule之于python就像是quartz之于java。实际用起来还是不错的。

2、安装

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# pip安装方式
$ pip install apscheduler
# 源码编译方式
$ wget https://pypi.python.org/pypi/APScheduler/#downloads
$ python setup.py install

3、apschedule有四个主要的组件

1)trigger - 触发器

2)job stores - 任务存储(内存memory和持久化persistence)

3)executor - 执行器(实现是基于concurrent.futures的线程池或者进程池)

4)schedulers - 调度器(控制着其他的组件,最常用的是background方式和blocking方式)

先上一个例子

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# -*- coding:utf-8 -*-
import redis
from datetime import datetime, timedelta
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.events import EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR, EVENT_JOB_MISSED
class ScheduleFactory(object):
    def __init__(self):
        if not hasattr(ScheduleFactory, '__scheduler'):
            __scheduler = ScheduleFactory.get_instance()
        self.scheduler = __scheduler

    @staticmethod
    def get_instance():
        pool = redis.ConnectionPool(
            host='10.94.99.56',
            port=6379,
        )
        r = redis.StrictRedis(connection_pool=pool)
        jobstores = {
            'redis': RedisJobStore(2, r),
            'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
        }
        executors = {
            'default': ThreadPoolExecutor(max_workers=30),
            'processpool': ProcessPoolExecutor(max_workers=30)
        }
        job_defaults = {
            'coalesce': False,
            'max_instances': 3
        }
        scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, daemonic=False)
return scheduler

说明:上例中,scheduleFactory被实现为一个单例模式,保证new出的对象全局唯一

4、对scheduler的选择

这里只给出两个场景:

1)BackgroundScheduler:这种方式在创建scheduler的父进程退出后,任务同时停止调度。适用范围:集成在服务中,例如django。

2)BlockingScheduler:这种方式会阻塞住创建shceduler的进程,适用范围:该程序只干调度这一件事情。

选择完调度器之后

1)scheduler.start() 启动调度器

2)scheduler.shutdown() 停止调度器,调用该方法,调度器等到所有执行中的任务执行完成再退出,可以使用wait=False禁用

程序变为如下样子

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
class ScheduleFactory(object):
    def __init__(self):
        if not hasattr(ScheduleFactory, '__scheduler'):
            __scheduler = ScheduleFactory.get_instance()
        self.scheduler = __scheduler

    @staticmethod
    def get_instance():
        pool = redis.ConnectionPool(
            host='10.94.99.56',
            port=6379,
        )
        r = redis.StrictRedis(connection_pool=pool)
        jobstores = {
            'redis': RedisJobStore(2, r),
            'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
        }
        executors = {
            'default': ThreadPoolExecutor(max_workers=30),
            'processpool': ProcessPoolExecutor(max_workers=30)
        }
        job_defaults = {
            'coalesce': False,
            'max_instances': 3
        }
        scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, daemonic=False)
        # scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, daemonic=False)
        return scheduler

    def start(self):
        self.scheduler.start()

    def shutdown(self):
        self.scheduler.shutdown()

5、对jobstores的选择

大的方向有两个:

1)非持久化

可选的stores:MemoryJobStrore

适用于你不会频繁启动和关闭调度器,而且对定时任务丢失批次不敏感。

2)持久化

可选的stores:SQLAlchemyJobStore, RedisJobStore,MongoDBJobStore,ZooKeeperJobStore

适用于你对定时任务丢失批次敏感的情况

jobStores初始化配置的方式是使用一个字典,例如

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
jobstores = {
            'redis': RedisJobStore(2, r),
            'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
        }

key是你配置store的名字,后面在添加任务的使用,可以指定对应的任务使用对应的store,例如这里选用的都是key=default的store。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def add_job(self, job_func, interval, id, job_func_params=None)
    self.scheduler.add_job(job_func, jobstore='default', trigger='interval', seconds=interval, id=id, kwargs=job_func_params, executor='default', next_run_time=next_run_time, misfire_grace_time=30)

6、executor的选择

只说两个,线程池和进程池。默认default是线程池方式。这个数是执行任务的实际并发数,如果你设置的小了而job添加的比较多,可能出现丢失调度的情况。

同时对于python多线程场景,如果是计算密集型任务,实际的并发度达不到配置的数量。所以这个数字要根据具体的要求设置。

一般来说我们设置并发为30,对一般的场景是没有问题的。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
executors = {
            'default': ThreadPoolExecutor(max_workers=30),
            'processpool': ProcessPoolExecutor(max_workers=30)
        }

同样在add_job的时候,我们可以选择对应的执行器

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def add_job(self, job_func, interval, id, job_func_params=None)
    self.scheduler.add_job(job_func, jobstore='default', trigger='interval', seconds=interval, id=id, kwargs=job_func_params, executor='default', next_run_time=next_run_time, misfire_grace_time=30)

7、trigger的选择

这是最简单的一个了,有三种,不用配置

1、date - 每天的固定时间

2、interval - 间隔多长时间执行

3、cron - 正则

8、job的增删改查接口api可以参看手册

http://apscheduler.readthedocs.io/en/latest/userguide.html#choosing-the-right-scheduler-job-store-s-executor-s-and-trigger-s

9、问题fix

1)2017-07-24 14:06:28,480 [apscheduler.executors.default:120] [WARNING]- Run time of job "etl_func (trigger: interval[0:01:00], next run at: 2017-07-24 14:07:27 CST)" was missed by 0:00:01.245424

这个问题对应的源码片段是

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def run_job(job, jobstore_alias, run_times, logger_name):
    """
    Called by executors to run the job. Returns a list of scheduler events to be dispatched by the
    scheduler.

    """
    events = []
    logger = logging.getLogger(logger_name)
    for run_time in run_times:
        # See if the job missed its run time window, and handle
        # possible misfires accordingly
        if job.misfire_grace_time is not None:
            difference = datetime.now(utc) - run_time
            grace_time = timedelta(seconds=job.misfire_grace_time)
            if difference > grace_time:
                events.append(JobExecutionEvent(EVENT_JOB_MISSED, job.id, jobstore_alias,
                                                run_time))
                logger.warning('Run time of job "%s" was missed by %s', job, difference)
                continue

        logger.info('Running job "%s" (scheduled at %s)', job, run_time)
        try:
            retval = job.func(*job.args, **job.kwargs)
        except:
            exc, tb = sys.exc_info()[1:]
            formatted_tb = ''.join(format_tb(tb))
            events.append(JobExecutionEvent(EVENT_JOB_ERROR, job.id, jobstore_alias, run_time,
                                            exception=exc, traceback=formatted_tb))
            logger.exception('Job "%s" raised an exception', job)
        else:
            events.append(JobExecutionEvent(EVENT_JOB_EXECUTED, job.id, jobstore_alias, run_time,
                                            retval=retval))
            logger.info('Job "%s" executed successfully', job)

    return events

这里面有个参数是misfire_grace_time,默认是1s,如果任务的实际执行时间与任务调度时间的时间差>misfire_grace_time,就会warning并且跳过这次任务的调度!!!

为什么会发生这个问题?

1)executor并发度不够,你添加的任务太多

2) misfire_grace_time,还是太小了

2)如果你使用的trigger=interval,并且设置了misfire_grace_time=30这种的话,如果你首次启动的时间是10:50那么调度间隔和实际执行可能有1分钟的误差

怎么解决这个问题呢,你可以通过next_run_time设置首次调度的时间,让这个时间取整分钟。例如

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def add_job(self, job_func, interval, id, job_func_params=None):
        next_minute = (datetime.now() + timedelta(minutes=1)).strftime("%Y-%m-%d %H:%M")
        next_run_time = datetime.strptime(next_minute, "%Y-%m-%d %H:%M")
        self.scheduler.add_job(job_func, jobstore='default', trigger='interval', seconds=interval, id=id, kwargs=job_func_params, executor='default', next_run_time=next_run_time, misfire_grace_time=30)

3)2017-07-25 11:02:00,003 [apscheduler.scheduler:962] [WARNING]- Execution of job "rule_func (trigger: interval[0:01:00], next run at: 2017-07-25 11:02:00 CST)" skipped: maximum number of running instances reached (1)

对应的源码为

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
         for job in due_jobs:
                    # Look up the job's executor
                    try:
                        executor = self._lookup_executor(job.executor)
                    except:
                        self._logger.error(
                            'Executor lookup ("%s") failed for job "%s" -- removing it from the '
                            'job store', job.executor, job)
                        self.remove_job(job.id, jobstore_alias)
                        continue

                    run_times = job._get_run_times(now)
                    run_times = run_times[-1:] if run_times and job.coalesce else run_times
                    if run_times:
                        try:
                            executor.submit_job(job, run_times)
                        except MaxInstancesReachedError:
                            self._logger.warning(
                                'Execution of job "%s" skipped: maximum number of running '
                                'instances reached (%d)', job, job.max_instances)
                            event = JobSubmissionEvent(EVENT_JOB_MAX_INSTANCES, job.id,
                                                       jobstore_alias, run_times)
                            events.append(event)
                       

submit_job的源码

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    with self._lock:
            if self._instances[job.id] >= job.max_instances:
                raise MaxInstancesReachedError(job)

            self._do_submit_job(job, run_times)
            self._instances[job.id] += 1
代码语言:javascript
代码运行次数:0
运行
复制

这是什么意思呢,当对一个job的一次调度的任务数>max_instances,会触发这个异常,并终止调度。例如对一个批次的调度,比如job1,在10:00这次的调度,执行的时候发现有两个任务被添加了。这怎么会发生呢?会。可能09:59分的调度没有成功执行,但是持久化了下来,那么在10:00会尝试再次执行。

max_instances默认是1,如果想让这种异常放过的话,你可以设置max_instances大一些,比如max_instances=3

10、如果你想监控你的调度,那么apschedule提供了listener机制,可以监听一些异常。只需要注册监听者就好

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  def add_err_listener(self):
        self.scheduler.add_listener(err_listener, EVENT_JOB_MAX_INSTANCES|EVENT_JOB_MISSED|EVENT_JOB_ERROR)

def err_listener(ev):
    msg = ''
    if ev.code == EVENT_JOB_ERROR:
        msg = ev.traceback
    elif ev.code == EVENT_JOB_MISSED:
        msg = 'missed job, job_id:%s, schedule_run_time:%s' % (ev.job_id, ev.scheduled_run_time)
    elif ev.code == EVENT_JOB_MAX_INSTANCES:
        msg = 'reached maximum of running instances, job_id:%s' %(ev.job_id)
    rs = RobotSender()
    rs.send(
        "https://oapi.dingtalk.com/robot/send?access_token=499ca69a2b45402c00503acea611a6ae6a2f1bacb0ca4d33365595d768bb2a58",
        u"[apscheduler调度异常] 异常信息:%s" % (msg),
        '15210885002',
        False
    )

最后的代码

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# -*- coding:utf-8 -*-
import redis
from datetime import datetime, timedelta
from apscheduler.schedulers.background import BackgroundScheduler, BlockingScheduler
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.events import EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR, EVENT_JOB_MISSED
from alarmkits.send_robot import RobotSender


class ScheduleFactory(object):
    def __init__(self):
        if not hasattr(ScheduleFactory, '__scheduler'):
            __scheduler = ScheduleFactory.get_instance()
        self.scheduler = __scheduler

    @staticmethod
    def get_instance():
        pool = redis.ConnectionPool(
            host='10.94.99.56',
            port=6379,
        )
        r = redis.StrictRedis(connection_pool=pool)
        jobstores = {
            'redis': RedisJobStore(2, r),
            'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
        }
        executors = {
            'default': ThreadPoolExecutor(max_workers=30),
            'processpool': ProcessPoolExecutor(max_workers=30)
        }
        job_defaults = {
            'coalesce': False,
            'max_instances': 3
        }
        scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, daemonic=False)
        # scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, daemonic=False)
        return scheduler

    def start(self):
        self.scheduler.start()

    def shutdown(self):
        self.scheduler.shutdown()

    def add_job(self, job_func, interval, id, job_func_params=None):
        next_minute = (datetime.now() + timedelta(minutes=1)).strftime("%Y-%m-%d %H:%M")
        next_run_time = datetime.strptime(next_minute, "%Y-%m-%d %H:%M")
        self.scheduler.add_job(
                job_func,
                jobstore='default',
                trigger='interval',
                seconds=interval,
                id=id,
                kwargs=job_func_params,
                executor='default',
                next_run_time=next_run_time,
                misfire_grace_time=30,
                max_instances=3
        )

    def remove_job(self, id):
        self.scheduler.remove_job(id)

    def modify_job(self, id, interval):
        self.scheduler.modify_job(job_id=id, seconds=interval)

    def add_err_listener(self):
        self.scheduler.add_listener(err_listener, EVENT_JOB_MAX_INSTANCES|EVENT_JOB_MISSED|EVENT_JOB_ERROR)

def err_listener(ev):
    msg = ''
    if ev.code == EVENT_JOB_ERROR:
        msg = ev.traceback
    elif ev.code == EVENT_JOB_MISSED:
        msg = 'missed job, job_id:%s, schedule_run_time:%s' % (ev.job_id, ev.scheduled_run_time)
    elif ev.code == EVENT_JOB_MAX_INSTANCES:
        msg = 'reached maximum of running instances, job_id:%s' %(ev.job_id)
    rs = RobotSender()
    rs.send(
        "https://oapi.dingtalk.com/robot/send?access_token=499ca69a2b45402c00503acea611a6ae6a2f1bacb0ca4d33365595d768bb2a58",
        u"[apscheduler调度异常] 异常信息:%s" % (msg),
        '15210885002',
        False
    )
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2017-07-25 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
python定时任务最强框架APScheduler详细教程
上次测试女神听了我的建议,已经做好了要给项目添加定时任务的决定了。但是之前提供的四种方式中,她不知道具体选择哪一个。为了和女神更近一步,我把我入行近10年收藏的干货免费拿出来分享给女神,希望女神凌晨2点再找我的时候,不再是因为要给他调程序了。
无涯WuYa
2020/07/15
8.3K0
python定时任务最强框架APScheduler详细教程
【Python】APScheduler简介
APScheduler,全称是_Advanced Python Scheduler_,具体的介绍可以看PyPI或者readthedocs的文档介绍,这篇 blog 主要是翻译User Guide一节的主要内容,不过惯例还是先简单介绍一下这个库特别的地方。
用户5522200
2020/05/11
2.8K0
Python 实现定时任务的八种方案!
来源:https://www.biaodianfu.com/python-schedule.html
Python编程与实战
2021/10/12
34.3K0
Python任务调度模块APSched
官网文档:http://apscheduler.readthedoc... API:http://apscheduler.readthedoc...
py3study
2020/01/06
1.1K0
八种用Python实现定时执行任务的方案,一定有你用得到的!
我们在日常工作中,常常会用到需要周期性执行的任务。 一种方式是采用 Linux 系统自带的 crond 结合命令行实现; 一种方式是直接使用Python; 于是我把常见的Python定时任务实现方法整理了一下,希望对大家有所帮助。
全栈程序员站长
2022/09/07
3K0
八种用Python实现定时执行任务的方案,一定有你用得到的!
轻量级python定时器apscheduler
cron也有缺点: 1、不适合复杂的定时任务 2、定时任务修改,需要重启crontab管理 3、定时任务,没有状态存储,也不是知道是否执行了
测试加
2022/04/27
1.1K0
轻量级python定时器apscheduler
Python基于APScheduler实现定时任务
Python这个语言的优势就在于有丰富的第三方库,既然原生实现有这样那样的缺点,我们可以借助第三方库来实现定时任务。
Steve Wang
2021/12/20
2.6K0
Python定时任务神器-APScheduler
但是这些定时任务库都只是提供了简答的,或者只支持静态的定时任务。而对于需要复杂定时功能,或者动态注册定时任务的场景,则无法满足。
上帝De助手
2019/09/17
3.1K0
Python下定时任务框架APSched
今天准备实现一个功能需要用到定时执行任务,所以就看到了Python的一个定时任务框架APScheduler,试了一下感觉还不错。
py3study
2020/01/08
1.5K0
Python实现定时任务的几种方法
该方法能够执行固定间隔时间的任务,时间间隔由time.sleep()的睡眠时间指定。
Steve Wang
2021/12/20
3.9K0
Python Apscheduler源
最近公司有项目需要使用到定时任务,其定时逻辑类似于linux的Cron,就使用了Apscheduler这个类库。基于公司的业务,需要修改Apshceduler,故而研究了一下Apscheduler的代码。
py3study
2020/01/06
7140
高效定时任务处理:深入学习Python中APScheduler库的奥秘
APScheduler是Python中一个强大的第三方库,用于在后台执行定时任务。它允许我们根据设定的时间间隔、日期规则或特定时间来执行任务,适用于定时执行脚本、定时发送邮件、定时处理数据等场景。APScheduler的功能使得在Python中实现定时任务变得非常简单和高效。本文将从入门到精通地介绍APScheduler库的使用方法,带你掌握在Python中实现定时任务的技巧。
子午Python
2023/08/14
3.3K0
Python 定时任务的实现方式
目前所在的项目组需要经常执行一些定时任务,之前都是用 Node.JS 的 cron来实现 schedule job。可是这次需要连接不同的 DB,而且实现的逻辑也有些许不同,于是选择使用 Python 的定时器。
李振
2021/11/26
1.6K0
Python 定时任务的实现方式
Python 调度相关包的使用
Job store:如果任务调度信息存在内存中,当程序退出后会丢失,可以其他存储器进行持久化存储
dandelion1990
2024/03/09
1640
Python定时任务框架之Apscheduler 案例分享
  前面已经讲过Celery做定时任务的场景,现在分享另一个框架Apscheduler。Apscheduler的全称是Advanced Python Scheduler。它是一个轻量级的 Python 定时任务调度框架。同时,它还支持异步执行、后台执行调度任务。本人小小的建议是一般项目用APScheduler,因为不用像Celery那样再单独启动worker、beat进程,而且API也很简洁。
全栈测试开发日记
2023/02/02
1.8K0
Python定时任务框架之Apscheduler 案例分享
Flask 学习-87.Flask-APScheduler 持久化定时任务保存到mysql数据库
APScheduler 有四种组件,分别是:调度器(scheduler),作业存储(job store),触发器(trigger),执行器(executor)。
上海-悠悠
2023/01/03
2.5K0
Flask 学习-87.Flask-APScheduler 持久化定时任务保存到mysql数据库
Python定时任务(下)
上篇文章,我们了解到有三种办法能实现定时任务,但是都无法做到循环执行定时任务。因此,需要一个能够担当此重任的库。它就是 APScheduler。
猴哥yuri
2018/08/16
2K0
django-apschedule定时任务异常停止
在django项目中使用django-apschedule来实现定时任务,使用的是BackgroundScheduler调度类,该调度的实现是通过后台线程的方式执行定时任务。其中任务都是持久化到数据库中的。
编程黑洞
2023/11/17
5610
Python定时框架 Apscheduler 详解
在我们的日常工作自动化测试当中,几乎超过一半的功能都需要利用定时的任务来推动触发,例如在我们项目中有一个定时监控模块,根据自己设置的频率定时跑测试用例,定时检测是否存在线上紧急任务等等,这些都涉及到了有关定时任务的问题,很多情况下,大多数人会选择window的任务计划程序,但如果程序不在window平台下运行,就不能定时启动了;当然也可利用time模块的time.sleep()方法使程序休眠来达到定时任务的目的,但定时任务多了,代码可能看起来不太那么友好且有很大的局限性,因此,此时的Apscheduler
腾讯移动品质中心TMQ
2019/06/14
1.9K0
Python定时框架 Apscheduler 详解
Python定时任务
1、第一种办法是最简单又最暴力。那就是在一个死循环中,使用线程睡眠函数 sleep()。
周小董
2019/03/25
5.8K0
Python定时任务
相关推荐
python定时任务最强框架APScheduler详细教程
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验