我有一个非常简单的dag,它应该在每周一19:10运行。dag如下: from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash_operator import BashOperator
args = {'owner': 'AirFlow'}
with DAG(dag_id='schedule_test_weekly', default_args=args, schedule_interval=
当我安排DAG在每天的特定时间运行时,DAG根本不会执行。但是,当我重新启动Airflow when服务器和调度程序时,DAG在该特定日期的计划时间执行一次,并且从第二天起不再执行。我使用的是带有python 2.7.6的Airflow版本v1.7.1.3。下面是DAG代码:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
import time
n=time.strftime("%Y,%m
我希望一个进程在完成另一个进程之后启动。一个解决方案是使用外部传感器功能,下面您可以找到我的解决方案。我遇到的问题是依赖的守护进程陷入了戳,我检查了这个并确保这两个dag按照相同的时间表运行,我的简化代码如下所示:任何帮助都将不胜感激。领袖达格:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow
我已经查看了气流subDAG部分,并试图在网上找到任何可能有帮助的东西,但是我没有找到任何详细解释如何使subDAG工作的东西。运行subDAG的要求之一是应该启用它。如何启用/禁用subdag?
我编写了一些示例代码,这些代码在气流中不显示任何错误,但是当我尝试运行它时,subDAG中的任何一个操作符都不会被执行。
这是我的主要后台代码:
import os
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
from airfl
我有一个dag,我想安排一个基于某些来自外部来源的输入的未来日期。
{
"run_id":"run-eventId109bnfghjasdmajjsd1basdasdaaasdsdasdsk2",
"conf":{
"softwareId":"something"
},
"execution_date": "11-03T09:10:30" //could be any future dat
我正在与virtualenv合作。我正在尝试使用DAG文件夹中的包。airflow_home目录的当前状态为:
airflow_home/airflow.cfg
airflow_home/airflow.db
airflow_home/dags/__init__.py
airflow_home/dags/hello_world.py
airflow_home/dags/support/inner.py
airflow_home/dags/support/__init__.py
hello_world.py有代码:
from datetime import datetime
from air
我似乎不了解如何将模块导入到apache airflow DAG定义文件中。例如,我想这样做是为了能够创建一个库,使具有类似设置的声明任务不那么繁琐。 这是我能想到的重复这个问题的最简单的例子:我修改了airflow教程(https://airflow.apache.org/tutorial.html#recap),只需导入一个模块并从该模块运行一个定义。如下所示: 目录结构: - dags/
-- __init__.py
-- lib.py
-- tutorial.py tutorial.py: """
Code that goes along with the A
我计划从今天( 2020/08/11 )开始,从周二到周六东部标准时间(NY)上午04:00运行DAG。在编写代码和部署之后,我预计DAG会被触发。我刷新了几次我的气流用户界面页面,但它还没有触发。我正在使用气流版本v1.10.9-composer与python 3。
这是我的DAG代码:
"""
This DAG executes a retrieval job
"""
# Required packages to execute DAG
from __future__ import print_function
import pendul
我试图在(气流)中添加一个自定义操作符,但是它似乎找不到操作符。我在这方面花了相当长的时间,并尝试过:
我已经修改了示例中的代码,以尝试获取操作符。
达格/我的爸爸。
import datetime
from airflow import DAG
# from airflow.models import Variable
# from airflow.operators import StopInstanceOperator
from airflow.operators.my_operator import StopInstanceOperator
# [END da
我正在研究Airflow文档,以便更好地了解它的调度机制。我在下面遇到了一个例子。 文件中提到,当调度人员在2016-01-02上午6点选择DAG时,将创建一次DAG运行,execution_date为2016-01-01,下一次运行将在2016-01-03凌晨创建,执行日期为2016-01-02。 调度间隔是按小时提供的,执行日期是指DAG在结束时运行的时间段的开始,那么为什么不是在2016-01-02早上6点调度器挑选DAG的前一个小时呢? """
Code that goes along with the Airflow tutorial located at:
我正在使用Airflow 设置交叉依赖的DAGS。我有一个特定的使用案例,其中我的DAG B要求DAG A首先运行-但是,如果DAG A延迟足够长时间,DAG B仍应运行。因此,我本质上是在寻找一种方法来连接两个传感器之间的OR操作。
假设DAG B需要每天下午5点运行,那么我将在代码中这样做:
while True:
CURRENT_TIME = getCurrentTime()
if DAG A completed OR CURRENT_TIME > 5pm:
run DAG B
这在代码中要简单得多,但是看不到Airflow是如何做到的。
我在计划中为远程ssh上的run命令创建了这个dag。
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from airflow.contrib.hooks.ssh_hook import SSHHook as sscon
from airflow.contrib.operators.ssh_operator import SSHOpera
由于某些原因,宏'{{ ds }}‘不递增,执行日期递增,但它不适用于宏出于某种原因,我尝试了一个干净的新dag id,但仍然没有帮助。你知道吗?
from airflow import DAG
from datetime import datetime,timedelta
from airflow.operators.postgres_operator import PostgresOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
import sys
sys.path.inser
同事们我们需要帮助。有两个dags --父级和子级,父级有自己的调度,假设'30 *** ',子'1 -17***-5‘,子节点等待父级执行,例如40分钟,如果父级以错误结束,那么子类也会崩溃,否则将执行子类的下一个任务。问题是,即使在最简单的情况下,这也不起作用,我不知道如何同步它们。我写了这样的代码:
Dag亲本
import time
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_op
我们最近尝试采用气流作为我们的“数据工作流”引擎,虽然我已经完成了大部分工作,但我仍然处在调度器如何计算何时触发DAG的灰色区域。
看看这个简单的dag:
from airflow import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator
dag_options = {
'owner': 'Airflow',
&
我每天晚上7:42有一个dag,然而,当我手动触发dag时,它会运行,而不是自动运行。
我正在使用官方的yaml文件和运行容器,默认情况下已经将时区设置为IST,仍然不能自动触发dags
代码如下:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.providers.sftp.operators.sftp import SFTPOpera
由于某些原因,气流似乎不会触发每周时间表间隔的达格的最新运行。
目前日期:
$ date
$ Tue Aug 9 17:09:55 UTC 2016
达格:
from datetime import datetime
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
dag = DAG(
dag_id='superdag',
start_date=datetime(2016, 7, 1
我是Airflow的新手,但我已经定义了一个Dag,每天上午9点发送基本的电子邮件我的DAG如下: from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.dates import days_ago
date_log = str(datetime.to
在visual代码中运行气流DAG代码时出错。
误差
ImportError: cannot import name 'DummyOperator' from 'airflow.operators' (c:\Users\10679196\AppData\Local\Programs\Python\Python38\lib\site-packages\airflow\operators\__init__.py)
导入报表
from airflow import DAG
from airflow.operators import DummyOperator
版本
a
上下文:我成功地在sql_alchemy_conn上安装了气流,将executor更改为LocalExecutor,将EC2更改为postgresql+psycopg2://postgres@localhost:5432/airflow;max_threads为10。
我的问题是,当我创建一个每天都要运行的守护进程时,一切都很好,但是当我创建一个dag,它将在周一和周三上午10点运行时,气流不会运行它。有人知道我做错了什么吗?我应该怎么做才能解决这个问题?
,运行良好且正确:
import airflow
from airflow import DAG
from airflow.operato
我们希望在Dag中的Dag触发器期间从UI读取cli输入传递给dag。我尝试了下面的代码,但它不起作用。在这里,我将输入传递为{“kpi”:“ID123”},并希望在函数get_data_from_bq中打印此ip值。
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
from airflow import models
from airflow.models import Variabl