Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >将多个cron表达式结合起来的气流时间表?

将多个cron表达式结合起来的气流时间表?
EN

Stack Overflow用户
提问于 2022-06-02 07:12:59
回答 1查看 736关注 0票数 1

我有几个cron表达式,需要应用到一个DAG中。没有办法用一个cron表达式来表达它们。

气流2.2引入了Timetable。是否有一个包含cron表达式列表的实现?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-06-03 08:09:53

我在找同样的东西,但什么也没找到。如果有标准的气流,那就太好了。

这是我为气流2.2.5编写的0.1版本。

代码语言:javascript
运行
AI代码解释
复制
# This file is <airflow plugins directory>/timetable.py

from typing import Any, Dict, List, Optional
import pendulum
from croniter import croniter
from pendulum import DateTime, Duration, timezone, instance as pendulum_instance
from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
from airflow.exceptions import AirflowTimetableInvalid


class MultiCronTimetable(Timetable):
    valid_units = ['minutes', 'hours', 'days']

    def __init__(self,
                 cron_defs: List[str],
                 timezone: str = 'Europe/Berlin',
                 period_length: int = 0,
                 period_unit: str = 'hours'):

        self.cron_defs = cron_defs
        self.timezone = timezone
        self.period_length = period_length
        self.period_unit = period_unit

    def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
        """
        Determines date interval for manually triggered runs.
        This is simply (now - period) to now.
        """
        end = run_after
        if self.period_length == 0:
            start = end
        else:
            start = self.data_period_start(end)
        return DataInterval(start=start, end=end)

    def next_dagrun_info(
        self,
        *,
        last_automated_data_interval: Optional[DataInterval],
        restriction: TimeRestriction) -> Optional[DagRunInfo]:
        """
        Determines when the DAG should be scheduled.

        """

        if restriction.earliest is None:
            # No start_date. Don't schedule.
            return None

        is_first_run = last_automated_data_interval is None

        if is_first_run:
            if restriction.catchup:
                scheduled_time = self.next_scheduled_run_time(restriction.earliest)

            else:
                scheduled_time = self.previous_scheduled_run_time()
                if scheduled_time is None:
                    # No previous cron time matched. Find one in the future.
                    scheduled_time = self.next_scheduled_run_time()
        else:
            last_scheduled_time = last_automated_data_interval.end

            if restriction.catchup:
                scheduled_time = self.next_scheduled_run_time(last_scheduled_time)

            else:
                scheduled_time = self.previous_scheduled_run_time()

                if scheduled_time is None or scheduled_time == last_scheduled_time:
                    # No previous cron time matched,
                    # or the matched cron time was the last execution time,
                    scheduled_time = self.next_scheduled_run_time()

                elif scheduled_time > last_scheduled_time:
                    # Matched cron time was after last execution time, but before now.
                    # Use this cron time
                    pass

                else:
                    # The last execution time is after the most recent matching cron time.
                    # Next scheduled run will be in the future
                    scheduled_time = self.next_scheduled_run_time()

        if scheduled_time is None:
            return None

        if restriction.latest is not None and scheduled_time > restriction.latest:
            # Over the DAG's scheduled end; don't schedule.
            return None

        start = self.data_period_start(scheduled_time)
        return DagRunInfo(run_after=scheduled_time, data_interval=DataInterval(start=start, end=scheduled_time))

    def data_period_start(self, period_end: DateTime):
        return period_end - Duration(**{self.period_unit: self.period_length})

    def croniter_values(self, base_datetime=None):
        if not base_datetime:
            tz = timezone(self.timezone)
            base_datetime = pendulum.now(tz)

        return [croniter(expr, base_datetime) for expr in self.cron_defs]

    def next_scheduled_run_time(self, base_datetime: DateTime = None):
        min_date = None
        tz = timezone(self.timezone)
        if base_datetime:
            base_datetime_localized = base_datetime.in_timezone(tz)
        else:
            base_datetime_localized = pendulum.now(tz)

        for cron in self.croniter_values(base_datetime_localized):
            next_date = cron.get_next(DateTime)
            if not min_date:
                min_date = next_date
            else:
                min_date = min(min_date, next_date)
        if min_date is None:
            return None
        return pendulum_instance(min_date)

    def previous_scheduled_run_time(self, base_datetime: DateTime = None):
        """
        Get the most recent time in the past that matches one of the cron schedules
        """
        max_date = None
        tz = timezone(self.timezone)
        if base_datetime:
            base_datetime_localized = base_datetime.in_timezone(tz)
        else:
            base_datetime_localized = pendulum.now(tz)

        for cron in self.croniter_values(base_datetime_localized):
            prev_date = cron.get_prev(DateTime)
            if not max_date:
                max_date = prev_date
            else:
                max_date = max(max_date, prev_date)
        if max_date is None:
            return None
        return pendulum_instance(max_date)


    def validate(self) -> None:
        if not self.cron_defs:
            raise AirflowTimetableInvalid("At least one cron definition must be present")

        if self.period_unit not in self.valid_units:
            raise AirflowTimetableInvalid(f'period_unit must be one of {self.valid_units}')

        if self.period_length < 0:
            raise AirflowTimetableInvalid(f'period_length must not be less than zero')

        try:
            self.croniter_values()
        except Exception as e:
            raise AirflowTimetableInvalid(str(e))

    @property
    def summary(self) -> str:
        """A short summary for the timetable.

        This is used to display the timetable in the web UI. A cron expression
        timetable, for example, can use this to display the expression.
        """
        return ' || '.join(self.cron_defs) + f' [TZ: {self.timezone}]'

    def serialize(self) -> Dict[str, Any]:
        """Serialize the timetable for JSON encoding.

        This is called during DAG serialization to store timetable information
        in the database. This should return a JSON-serializable dict that will
        be fed into ``deserialize`` when the DAG is deserialized.
        """
        return dict(cron_defs=self.cron_defs,
                    timezone=self.timezone,
                    period_length=self.period_length,
                    period_unit=self.period_unit)

    @classmethod
    def deserialize(cls, data: Dict[str, Any]) -> "MultiCronTimetable":
        """Deserialize a timetable from data.

        This is called when a serialized DAG is deserialized. ``data`` will be
        whatever was returned by ``serialize`` during DAG serialization.
        """
        return cls(**data)


class CustomTimetablePlugin(AirflowPlugin):
    name = "custom_timetable_plugin"
    timetables = [MultiCronTimetable]

要使用它,您可以提供cron表达式列表,还可以提供时区字符串,还可以提供句点长度和句点单位。

对于我的用例,我实际上不需要周期长度+单位,它用于确定DAG的data_interval。如果您的DAG不关心data_interval,则可以将它们保留在0分钟的默认值中。

我试着模仿标准的schedule_interval行为。例如,如果自上次运行以来,catchup = False和DAG可能已被多次触发(例如,由于任何原因,DAG运行的时间比预期的要长,或者调度程序没有运行,或者是DAG的第一次调度),那么DAG将被调度为运行最近的前一次匹配时间。

我还没有在catchup = True上真正测试过它,但理论上它会运行自DAG的start_date以来的每一个匹配的cron时间(但是每隔一段时间只运行一次,例如,对于*/30 * * * *0 * * * *,DAG将每小时运行两次,而不是三次)。

示例DAG文件:

代码语言:javascript
运行
AI代码解释
复制
from time import sleep
import airflow
from airflow.operators.python import PythonOperator
import pendulum
from timetable import MultiCronTimetable

def sleepy_op():
    sleep(660)


with airflow.DAG(
        dag_id='timetable_test',
        start_date=pendulum.datetime(2022, 6, 2, tz=pendulum.timezone('America/New_York')),
        timetable=MultiCronTimetable(['*/5 * * * *', '*/3 * * * fri,sat', '1 12 3 * *'], timezone='America/New_York', period_length=10, period_unit='minutes'),
        catchup=False,
        max_active_runs=1) as dag:

    sleepy = PythonOperator(
        task_id='sleepy',
        python_callable=sleepy_op
    )
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72478492

复制
相关文章
传统@ServerEndpoint方式开发WebSocket应用和SpringBoot构建WebSocket应用程序
通过websocket的两种使用方式,让你更加深入理解用法。很多人不懂websocket怎么辨别是谁发送过来的,文中说到实现WebSocketConfigurer接口,定义拦截器可以绑定用户信息,还有其他很多,细细品,对比看比较好!
java思维导图
2020/05/20
9.4K0
微服务体系结构——学习、构建和部署应用程序
更好地理解微服务架构,并举例这种架构好处,以及Uber如何将它们的单体应用变成微型服务。
程序你好
2018/07/23
5510
Spring国际认证指南:使用 WebSocket 构建交互式 Web 应用程序
本指南将引导您完成创建“Hello, world”应用程序的过程,该应用程序在浏览器和服务器之间来回发送消息。WebSocket 是 TCP 之上的一个轻量级的薄层。这使得它适合使用“子协议”来嵌入消息。在本指南中,我们使用带有 Spring 的STOMP消息传递来创建交互式 Web 应用程序。STOMP 是在较低级别的 WebSocket 之上运行的子协议。
IT胶囊
2022/04/08
2K0
webSocket
WebSocket介绍、使用 webSocket是一种新的传输协议,HTML5新增的协议,相较于http协议,webSocket更像是一台对讲机,可以实现实时通信,双向传输,即服务器也可以主动发送请求到前端,打破了以往只能前端发送请求,然后服务器被动响应的传统方式 node.js socket.io socket.io是一个node.js平台上的webSocket封装框架,使用难度比较容易 在使用前必须先安装: npm|cnpm i socket.io -D 语法格式: /* socket.emit('na
jinghong
2020/05/09
2.1K0
websocket
拉的方式比较耗费资源,因为http是无状态且单向的通讯协议,后端无法主动xia向前端发送信息,一般拉为前端不间断的向服务端发送http请求,这种方式前端和后端都比较头疼。没有特殊需求的话,一般使用推的方式。HTML5开始提供websocket解决方式,基于TCP实现客户端与服务端全双工通信。websocket只使用了一个连接,避免了连接的多次建立;且只有连接初次建立比较复杂,后期通信成本较低。
wo.
2021/06/15
1.4K0
WebSocket
WebSocket 对象提供了用于创建和管理 WebSocket 连接,以及可以通过该连接发送和接收数据的 API。
用户1418987
2023/10/16
3230
WebSocket
websocket
短轮询(Polling)的实现思路就是 浏览器端 每隔几秒钟向 服务器端 发送http请求,服务端在收到请求后,不论是否有数据更新,都直接进行响应。 在服务端响应完成,就会关闭这个Tcp连接 ,如下图所示:
用户10106350
2022/10/28
2.7K0
WebSocket
我很高兴地提出报告,JEP-222 从 Jenkins 每周更新版开始落地。此改进为 Jenkins 带来了实验性的 WebSocket 支持,可在连接入站代理程序或运行 CLI 时使用。WebSocket 协议允许通过 HTTP(S)端口进行双向交互式通信.
LinuxSuRen
2020/02/25
2.2K0
WebSocket
WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行全双工通讯的协议。
星哥玩云
2022/09/14
1.3K0
WebSocket
websocket
这时启动django项目会报错CommandError: You have not set ASGI_APPLICATION, which is needed to run the server.
GH
2020/03/19
2.9K0
WebSocket
HTTP:HTTP是应用层协议(在传输层使用 TCP,在网络层使用 IP 协议),是一种无状态(即每个请求都是一个新的请求)、无连接(每次连接只处理一个请求)协议,但是HTTP仍然可以借助Cookie(客户端数据储存)和Session(服务端数据储存)实现长连接(HTTP的长连接需要在请求头中加入Connection:keep-alive )。整个通信过程,客户端(浏览器)向服务端发送请求,服务端接收请求并返回响应信息。
田小檬
2023/08/24
3110
WebSocket
WebSocket
HTTP:HTTP是应用层协议(在传输层使用 TCP,在网络层使用 IP 协议),是一种无状态(即每个请求都是一个新的请求)、无连接(每次连接只处理一个请求)协议,但是HTTP仍然可以借助Cookie(客户端数据储存)和Session(服务端数据储存)实现长连接(HTTP的长连接需要在请求头中加入Connection:keep-alive )。整个通信过程,客户端(浏览器)向服务端发送请求,服务端接收请求并返回响应信息。
十玖八柒
2022/08/01
1.5K0
WebSocket
websocket
websocket是为了解决HTTP协议中的一些问题。因为HTTP是无状态,短连接。
zy010101
2022/07/30
1.3K0
websocket
WebSocket
HTTP协议是一种无状态协议,服务器端本身不具有识别客户端的能力,必须借助外部机制,比如session和cookie,才能与特定客户端保持对话。
奋飛
2019/08/15
1.9K0
Mysql体系结构
客户端连接器 mysql为外部程序提供的客户端connector,例如 PHP JAVA .NET RUBY 连接管理 管理客户端连接的相关操作,例如 连接线程池、权限验证、线程重用、连接限制 SQL层 SQL接口 接收客户端的SQL命令,并返回命令结果 SQL 解析器 SQL命令传递到解析器的时候会被解析器验证和解析 将SQL语句分解成数据结构——分析树,并将这个结构传递到后续步骤,以后SQL语句的传递和处理就是基于这个结构的 如果在解析中遇到错误,那么就说明这个sql语句是不
dys
2018/04/02
1.7K0
Mysql体系结构
MySQL体系结构
连接者:不同语言的代码程序和mysql的交互(SQL交互) 1、连接池 管理、缓冲用户的连接,线程处理等需要缓存的需求 2、管理服务和工具组件 系统管理和控制工具,例如备份恢复、Mysql复制、集群等  3、sql接口 接受用户的SQL命令,并且返回用户需要查询的结果 4、查询解析器 SQL命令传递到解析器的时候会被解析器验证和解析(权限、语法结构) 5、查询优化器 SQL语句在查询之前会使用查询优化器对查询进行优化 select id,name from user where age = 40;  a、这个select 查询先根据where 语句进行选取,而不是先将表全部查询出来以后再进行age过滤  b、这个select查询先根据id和name进行属性投影,而不是将属性全部取出以后再进行过滤  c、将这两个查询条件联接起来生成最终查询结果 6、缓存 如果查询缓存有命中的查询结果,查询语句就可以直接去查询缓存中取数据 7、插入式存储引擎 存储引擎说白了就是如何管理操作数据(存储数据、如何更新、查询数据等)的一种方法。因为在关系数据库 中数据的存储是以表的形式存储的,所以存储引擎也可以称为表类型(即存储和操作此表的类型)
星哥玩云
2022/08/18
5150
MySQL体系结构
CPU体系结构
在微指令架构的 CPU 里面,编译器编译出来的机器码和汇编代码并没有发生什么变化。但在指令译码的阶段,指令译码器“翻译”出来的,不再是某一条 CPU 指令。译码器会把一条机器码,“翻译”成好几条“微指令”。这里的一条条微指令,就不再是 CISC 风格的了,而是变成了固定长度的 RISC 风格的了。
斯武丶风晴
2019/12/16
1.4K0
CPU体系结构
HBase体系结构
HBase的服务器体系结构遵从简单的主从服务器架构,它由HRegion服务器(HRegion Service)群和HBase Master服务器(HBase Master Server)构成。Hbase Master服务器负责管理所有的HRegion服务器,而Hbase中所有的服务器是通过Zookeeper来进行协调,并处理HBase服务器运行期间可能遇到的错误的。
用户3003813
2018/09/06
9250
HBase体系结构
mysql体系结构
2用户名密码验证(通过授权表做的验证数据库一启动,会把授权表加载到内存中 mysql.user mysql.db mysql.table_priv mysql.column_priv)
萧晚歌
2021/11/17
1.1K0
Mysql体系结构
最上层是一些客户端和链接服务,包含本地sock 通信和大多数基于客户端/服务端工具实现的类似于 TCP/IP的通信。主要完成一些类似于连接处理、授权认证、及相关的安全方案。在该层上引入了线程池的概念,为通过认证安全接入的客户端提供线程。同样在该层上可以实现基于SSL的安全链接。服务器也会为安全接入的每个客户端验证它所具有的操作权限。
Devops海洋的渔夫
2022/01/17
6160
Mysql体系结构

相似问题

identityserver3 & MVC (Websocket)&websocket服务器体系结构

12

具有可扩展(多层)体系结构的WebSocket服务器应用程序

13

如何使用WebSocket API创建Spring WebSocket应用程序?

12

如何用简洁的体系结构和整体库来实现WebSocket?

12

Spring 4 WebSocket应用程序

28
添加站长 进交流群

领取专属 10元无门槛券

AI混元助手 在线答疑

扫码加入开发者社群
关注 腾讯云开发者公众号

洞察 腾讯核心技术

剖析业界实践案例

扫码关注腾讯云开发者公众号
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档