Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >python:定时任务模块schedul

python:定时任务模块schedul

作者头像
py3study
发布于 2020-01-22 04:13:53
发布于 2020-01-22 04:13:53
1.2K00
代码可运行
举报
文章被收录于专栏:python3python3
运行总次数:0
代码可运行

1.安装

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
pip install schedule

2.文档

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
https://schedule.readthedocs.io/en/stable/faq.html#how-to-execute-jobs-in-parallel

3.官网使用demo

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

4.我的schedule使用demo

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import sys
import time
import schedule
import os
import logging
if not os.path.exists('/var/log/video_download/'):
    os.makedirs('/var/log/video_download')
log = logging.getLogger()
log.setLevel(logging.DEBUG)
fmt = logging.Formatter("%(asctime)s %(pathname)s %(filename)s %(funcName)s %(lineno)s %(levelname)s - %(message)s",
                        "%Y-%m-%d %H:%M:%S")
stream_handler = logging.FileHandler(
    '/var/log/video_download/debug-%s.log' % (time.strftime('%Y-%m-%d', time.localtime(time.time()))))
stream_handler.setLevel(logging.DEBUG)
stream_handler.setFormatter(fmt)
log.addHandler(stream_handler)
def handler():
    print("this is handler")
def main():
    if len(sys.argv) != 2:
        print('python video_data.py hour')
        sys.exit()
    param = sys.argv[1]
    if param == 'hour':
        log.debug("enter main")
        schedule.every().day.at("00:00").do(handler)
        schedule.every().hour.do(handler)
        while True:
            schedule.run_pending()
            time.sleep(1)
    else:
        print("python video_data.py hour")
        sys.exit()


if __name__ == "__main__":
    main()

5.拓展:

  并行执行任务

  (1)默认情况下,schedule按顺序执行所有作业。这背后的原因是很难找到一个让每个人都开心的并行执行模型

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import threading
import time
import schedule


def job():
    print("I'm running on thread %s" % threading.current_thread())


def run_threaded(job_func):
    job_thread = threading.Thread(target=job_func)
    job_thread.start()


schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)


while 1:
    schedule.run_pending()
    time.sleep(1)

  (2)如果需要控制线程数,就需要用queue

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import Queue
import time
import threading
import schedule


def job():
    print("I'm working")


def worker_main():
    while 1:
        job_func = jobqueue.get()
        job_func()
        jobqueue.task_done()

jobqueue = Queue.Queue()

schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)

worker_thread = threading.Thread(target=worker_main)
worker_thread.start()

while 1:
    schedule.run_pending()
    time.sleep(1)

  (3)抛出异常

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import functools

def catch_exceptions(cancel_on_failure=False):
    def catch_exceptions_decorator(job_func):
        @functools.wraps(job_func)
        def wrapper(*args, **kwargs):
            try:
                return job_func(*args, **kwargs)
            except:
                import traceback
                print(traceback.format_exc())
                if cancel_on_failure:
                    return schedule.CancelJob
        return wrapper
    return catch_exceptions_decorator

@catch_exceptions(cancel_on_failure=True)
def bad_task():
    return 1 / 0

schedule.every(5).minutes.do(bad_task)

  (4)只运行一次

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def job_that_executes_once():
    # Do some work ...
    return schedule.CancelJob

schedule.every().day.at('22:30').do(job_that_executes_once)

  (5)一次取消多个任务

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def greet(name):
    print('Hello {}'.format(name))

schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')

schedule.clear('daily-tasks')

  (6)在任务中加入日志功能

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import functools
import time

import schedule


# This decorator can be applied to
def with_logging(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        print('LOG: Running job "%s"' % func.__name__)
        result = func(*args, **kwargs)
        print('LOG: Job "%s" completed' % func.__name__)
        return result
    return wrapper

@with_logging
def job():
    print('Hello, World.')

schedule.every(3).seconds.do(job)

while 1:
    schedule.run_pending()
    time.sleep(1)

  (7)随机开展工作

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def my_job():
    # This job will execute every 5 to 10 seconds.
    print('Foo')

schedule.every(5).to(10).seconds.do(my_job)

  (8)传参给作业函数

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def greet(name):
    print('Hello', name)

schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019/05/15 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Python 实现定时任务的八种方案!
来源:https://www.biaodianfu.com/python-schedule.html
Python编程与实战
2021/10/12
34.3K0
Python设置定时任务
如果需要用Python实现定时任务,包括多长时间之后执行、每隔多久循环执行、每天的几点执行等,都算定时任务。实现定时任务的方法很多,python自身的库也有多种方式可以实现。其中最简单好用的一个库我感觉是threading库中的Timer。
Cloudox
2021/11/23
2.4K0
Schedule轻量化的定时任务框架
定时任务,在我们实际开发中经常会用到,比如 Linux 的 Corntab,Django 的 Django-celery,Django-corntab 等。但是这些工具和框架总有某些不合适的地方,比如不灵活,笨重等。今天我们就来介绍一款轻量化的定时任务框架 Schedule。
Python研究所
2022/06/17
5730
Schedule轻量化的定时任务框架
python-schedule模块(定时任务)基于官方文档总结
一.模块安装 pip3 install schedule 官方文档 二.常用的使用案例 #基本格式 #创建方法 def func(): print("方法") #创建定时 schedule.every().seconds.do(func) #每秒运行一次 #如果方法需要传参的话do(func,参数1) #开循环 while True: schedule.run_pending() #如果值运行一次schedule.every().day.do(func).run()
小小咸鱼YwY
2020/06/19
10.6K0
Python3学习(八):使用sched
python中有一个轻量级的定时任务调度的库:schedule。他可以完成每分钟,每小时,每天,周几,特定日期的定时任务。因此十分方便我们执行一些轻量级的定时任务。
py3study
2020/01/13
1.1K0
Python中使用定时调度任务(Schedule Jobs)的5种方式
今天构建的大多数应用程序都需要某种方式的调度机制。轮询 API 或数据库、不断检查系统健康状况、将日志存档等是常见的例子。 Kubernetes和Apache Mesos等使用自动伸缩扩容技术(Auto-scaling)的软件需要检查部署的应用程序的状态,为此它们使用定期运行的存活探针(Liveness Probe)。调度任务需要与业务逻辑解耦,因此我们要使用解耦的执行队列,例如Redis队列。
Regan Yue
2021/10/12
2.4K0
python发送邮件(单发/群发)——yagmail模块
一、有关邮件的基本概念 POP3: Post Office Protocol3 的简称,即邮局协议的第3个版本,它规定了怎样将个人计算机连接到 Internet 的邮件服务器和下载电子邮件的电子协议 SMTP: Simple Mail Transfer Protocol , 即简单邮件传输协议 IMAP: Internet Mail Access Protocol , 即交互式邮件存取协议 二、运用yagmail模块发送邮件 1、以163邮箱为例开启POP3/SMTP/IMAP服务,(QQ邮箱也是类似
Elsa_阿尼
2021/07/26
3.1K0
python发送邮件(单发/群发)——yagmail模块
Python3-定时任务四种实现方式
老猫最近做一个小程序开发任务,主要负责后台部分开发;根据项目需求老猫需要实现三个定时任务:
py3study
2020/01/06
2.3K0
解密python实现定时任务的8种方式
定时任务是编程中常见的需求,它可以按照预定的时间表执行特定的任务或操作。在Python中,有多种方法可以实现定时任务。
雷子
2023/12/20
8780
解密python实现定时任务的8种方式
python正则循环提取、替换等操作
获取httpserver 资源,展示在邮件中 #!/bin/bash # coding=utf-8 """ 作者:gaojs 功能: 新增功能: schedule 是一个第三方轻量级的任务调度模块,可以按照秒,分,小时,日期或者自定义事件执行时间。 schedule 允许用户使用简单、人性化的语法以预定的时间间隔定期运行 Python 函数(或其它可调用函数) demo如下: schedule.
懿曲折扇情
2022/08/24
1.6K0
python正则循环提取、替换等操作
绝了!Python定时爬取微博热搜+pyecharts动态图展示
python中有一个轻量级的定时任务调度的库:schedule。他可以完成每分钟,每小时,每天,周几,特定日期的定时任务。因此十分方便我们执行一些轻量级的定时任务。
朱小五
2020/09/24
1.8K1
绝了!Python定时爬取微博热搜+pyecharts动态图展示
自动化运维?看看Python怎样完成自动任务调度⛵
计划任务是几乎每个开发人员都会用到的功能,在服务器上可以用 Cron 作业来进行任务调度,它也是一种稳定的方式。但我们也可以完全程序化,全部使用 Python 来完成调度程序,而且可以有更简单的配置方式。
ShowMeAI
2022/11/08
7510
自动化运维?看看Python怎样完成自动任务调度⛵
python定时任务demo,实现工作日发送群消息
# _*_ coding:utf_8 _*_ import requests import json import schedule import time import datetime num=[
JQ实验室
2022/02/17
3090
Python爬虫定时计划任务的几种常见方法
记得以前的Windows任务定时是可以正常使用的,今天试了下,发现不能正常使用了,任务计划总是挂起。接下来记录下Python爬虫定时任务的几种解决方法。
Python进阶者
2021/03/11
1.8K0
Python爬虫定时计划任务的几种常见方法
别再说学习无趣了,跟你分享这 8 条 Python 奇技淫巧!
这篇文章主要和大家分享一些 Python 不一样的技巧,感受 Python 带给你的乐趣吧。
小小詹同学
2019/08/20
5190
Python 实现定时任务的九种方案
在现代软件开发中,定时任务(Cron Jobs)是确保应用程序按时执行某些操作的重要组成部分。本文将介绍九种在 Python 中实现后台服务定时任务的方案,帮助开发者选择适合自己需求的方法。
Michel_Rolle
2024/09/27
2.8K0
Python 调度相关包的使用
Job store:如果任务调度信息存在内存中,当程序退出后会丢失,可以其他存储器进行持久化存储
dandelion1990
2024/03/09
1690
利用Python和Selenium实现定时任务爬虫
定时爬虫是指能够按照预设的时间周期性地执行网络爬取任务的程序。这种类型的爬虫通常用于需要定期更新数据的场景,比如新闻网站、股票信息等。使用定时爬虫可以减轻人工操作的负担,保证数据的及时性和准确性。
小白学大数据
2024/06/08
4210
使用python获取热搜榜单,每小时发送一次到钉钉群消息
#! coding=utf-8 import requests from bs4 import BeautifulSoup import json import schedule,time def s
JQ实验室
2022/03/21
7580
使用python获取热搜榜单,每小时发送一次到钉钉群消息
Python 模块与包
# Python 模块与包 # 什么叫模块 模块 模块英文为Modules 函数与模块的关系 一个模块中可以包含N多个函数 在Python中一个扩展名为.py的文件就是一个模块 使用模块的好处 方便其他程序和脚本的导入并使用 避免函数名和变量名冲突 提高代码的可维护性 提高代码的可重复性 # 自定义模块 创建模块 新建一个.py文件,名称尽量不要与Python自带的标准模块名称相同 导入模块 import 模块名称 [as别名] from 模块名称 import 函数/变量/类 """
用户9615083
2022/12/25
4510
推荐阅读
相关推荐
Python 实现定时任务的八种方案!
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验