数据处理逻辑多,脚本相互依赖强,运维管理监测难,怎么办?!为了解决这些问题,最近比较深入研究Airflow的使用方法,重点参考了官方文档和Data Pipelines with Apache Airflow,特此笔记,跟大家分享共勉。
前面文章我们已经讲到了Airflow的搭建这里主要讲一下Airflow的其他特性。
DAG
配置表中的变量DAG_FOLDER是DAG文件存储的地址,DAG文件是定义任务流的python代码,airflow会定期去查看这些代码,自动加载到系统里面。
DAG是多个脚本处理任务组成的工作流pipeline,概念上包含以下元素
1) 各个脚本任务内容是什么
2) 什么时候开始执行工作流
3) 脚本执行的前后顺序是什么
针对1),通过operator来实现对任务的定义。Operator,翻译成“操作单元”,有很多种形式,可以是一个bash命令,也可以是一个python函数,或者是一个数据库连接任务。Airflow封装了很多operator,开发者基于需要来做二次开发。实际上各种形式的operator都是python语言写的对象。
针对2),在DAG的配置函数中有一个参数schedule_interval,约定被调度的频次,是按照每天、每周或者固定的时间来执行。这个参数,跟start_date开始时间和end_date结束时间(需要某个时间段后不需要执行该任务)配合着用,来约定什么时候跑这个DAG。logical date指的是这个DAG后续预计执行发生的时间。
下图是参数设置为@daily的执行节奏
airflow有事先定义好的参数,例如@daily,@hourly,@weekly等,一般场景下足够使用,如果需要更精细化的定义,可以使用cron-based配置方法、基于次数的方法。
Schedule本质上是一个while true循环,不断检查每个任务的状态,如果其上游任务都跑完,并且当前系统资源足够task slots,就会把该任务变成queued状态,等待executor去具体执行
针对3),使用>>或者<<来定义任务之间的依赖关系,例如start >> [fetch_weather, fetch_sales]意思是,start执行完以后,同时执行fetch_weather和fetch_sales。进一步定义[clean_weather, clean_sales] >> join_datasets,就会形成下面的DAG图。
注意:在图里面的分支,有的时候是都需要执行,有的时候可能两个分支会根据条件选择一个分支执行。这种分支判断(branch)的逻辑,可以在函数里面写,也可以通过brach operator实现。用后者的好处是,可以在DAG里面直观的看到具体执行的是哪个分支。
一般来讲,只有当上游任务“执行成功”时,才会开始执行下游任务。但是除了“执行成功all_success”这个条件以外,还有其他的trigger rule,例如one_success, one_failed(至少一个上游失败),none_failed ,none_skipped
DAG在配置的时候,可以配置同时运行的任务数concurrency,默认是16个。这个16,就是task slot,可以理解为资源,如果资源满了,具备运行条件的task就需要等待。
定义DAG的方式有两种:可以使用with语法,也可以使用修饰函数@dag。
with DAG(
dag_id='example_bash_operator',
schedule_interval='0 0 * * *',
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
dagrun_timeout=datetime.timedelta(minutes=60),
tags=['example', 'example2'],
params={"example_key": "example_value"},
) as dag:
配置DAG的参数:
'depends_on_past': False, | 前置任务成功后或者skip,才能运行 |
---|---|
'email': ['airflow@example.com'], | 警告邮件发件地址 |
'email_on_failure': False, | 失败的时候发邮件 |
'email_on_retry': False, | 任务重新尝试的时候发邮件 |
'retries': 1, | 尝试次数 |
'retry_delay': timedelta(minutes=5), | 尝试之间的间隔 |
'queue': 'bash_queue', | 指定一个 队列 运行该任务,CeleryExecutor用到 |
'pool': 'backfill', | |
'priority_weight': 10, | 任务优先级 |
'end_date': datetime(2016, 1, 1), | 任务计划的截止时间 |
'wait_for_downstream': False, | 如果前一个任务实例的下游任务没有跑完,该任务是否可以跑 |
'sla': timedelta(hours=2), | 如果在规定的时间间隔内任务没有跑完,会发警告 |
'execution_timeout': timedelta(seconds=300), | 如果执行超出所设置的时间,任务被当做失败 |
'on_failure_callback': some_function, | 当任务失败时,调用的函数 |
'on_success_callback': some_other_function, | 当任务成功时,调用的函数 |
'on_retry_callback': another_function, | 当任务重新尝试的时候,调用的函数 |
'sla_miss_callback': yet_another_function, | |
'trigger_rule': 'all_success' | 前置任务的执行状态符合什么条件时,该任务会被启动 |
tags:[‘example’] | 相当于是对DAG的一个分类,方便在前台UI根据tag来进行查询 |
DAG Run是DAG运行一次的对象(记录),记录所包含任务的状态信息。如果所有的任务状态是success或者skipped,就是success;如果任务有failed或者upstream_failed,就是falied。
其中的run_id的前缀会有如下几个
启动DAG,除了根据定时方法,也可以通过CLI命令或者Rest api的方式。在调用的时候可以通过指定dag_run.conf,作为参数让DAG根据不同的参数处理不同的数据。
在定义DAG的时候,有时会使用Edge Labels,可以理解成是虚拟的节点,目的是为了在前端UI更方便看到任务之间的依赖关系(类似注释的方法)。为了提高相同DAG操作的复用性,可以使用subDAG或者Taskgroup。
Operator
在任务流中的具体任务执行中,需要依据一些外部条件,例如之前任务的执行时间、开始时间等。这些“公有变量参数”,我们称为模板参数。airflow利用Jinja templates,实现“公有变量”调用的机制。在bashoprator中引用,例如 {{ execution_date}}就代表一个参数。在前端UI中,点击graph中的具体任务,在点击弹出菜单中rendered tempalate可以看到该参数在具体任务中代表的值。
除了公有变量,如果operator之间要互相传递参数或者中间过程数据,例如一个operator要依赖另一个operator的输出结果进行执行,有以下几个方式
# 从该实例中的xcom里面取 前面任务train_model设置的键值为model_id的值。
model_id = context["task_instance"].xcom_pull(
task_ids="train_model", key="model_id")
BranchDateTimeOperator
cond1 = BranchDateTimeOperator(
task_id='datetime_branch',
follow_task_ids_if_true=['date_in_range'],
follow_task_ids_if_false=['date_outside_range'],
target_upper=pendulum.datetime(2020, 10, 10, 15, 0, 0),
target_lower=pendulum.datetime(2020, 10, 10, 14, 0, 0),
dag=dag,)
BranchDayOfWeekOperator
根据是哪一天来选择跑哪个任务
BranchPythonOperator
根据业务逻辑条件,选择下游的一个task运行
dummy_task_1 = DummyOperator(task_id='branch_true', dag=dag)
dummy_task_2 = DummyOperator(task_id='branch_false', dag=dag)
branch = BranchDayOfWeekOperator(
task_id="make_choice",
follow_task_ids_if_true="branch_true",
follow_task_ids_if_false="branch_false",
week_day="Monday",
)
# Run dummy_task_1 if branch executes on Monday
branch >> [dummy_task_1, dummy_task_2]
4)PythonOperator
用的最广泛的Operator,在airflow1.0的时候,定义pythonOperator会有两部分,一个是operator的申明,一个是python函数。这时候函数传参是需要用到op_args 或者op_kwargs或者templates_dict
def _get_data(output_path, **context):
year, month, day, hour, *_ = context["execution_date"].timetuple() url = (
"https://dumps.wikimedia.org/other/pageviews/"
f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz" )
request.urlretrieve(url, output_path)
get_data = PythonOperator(
task_id="get_data",
python_callable=_get_data,
op_args=["/tmp/wikipageviews.gz"], dag=dag,
)
def _calculate_stats(input_path, output_path):
"""Calculates event statistics."""
Path(output_path).parent.mkdir(exist_ok=True)
events = pd.read_json(input_path)
stats = events.groupby(["date", "user"]).size().reset_index() stats.to_csv(output_path, index=False)
calculate_stats = PythonOperator(
task_id="calculate_stats",
python_callable=_calculate_stats,
op_kwargs={
"input_path": "/data/events.json",
"output_path": "/data/stats.csv",
},
dag=dag, )
在airflow2.0以后,用TaskFlow API以后,传参简单很多,就是当函数参数用即可。但是需要注意的是,这种传参本质上还是通过xcom来实现传递的,必须是可序列号的对象,所以参数必须是python最基本的数据类型,像dataframe就不能作为参数来传递。
@task(task_id="print_the_context")
def print_context(ds=None, **kwargs):
"""Print the Airflow context and ds variable from the context."""
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
5)图之间依赖关系的operator
如果两个任务流之间,存在一些依赖关系。
使用ExternalTaskSensor,根据另一个DAG中的某一个任务的执行情况,例如当负责下载数据的DAG完成以后,这个负责计算指标的DAG才能启动。
child_task1 = ExternalTaskSensor(
task_id="child_task1",
external_dag_id=parent_dag.dag_id,
external_task_id=parent_task.task_id,
timeout=600,
allowed_states=['success'],
failed_states=['failed', 'skipped'],
mode="reschedule",
)
当 任务图的某一个任务的执行状态被清理(clear),其相应影响的另一个图的任务状态也要随着连带被清理,就要用上ExternalTaskMarker。使用TriggerDagRunOperator ,可以让DAG的某一个任务 启动另一个DAG
6)LatestOnlyOperator
LatestOnlyOperator,是为了标识该DAG是不是最新的执行时间,只有在最新的时候才有必要执行下游任务,例如部署模型的任务,只需要在最近一次的时间进行部署即可。
from airflow.operators.latest_only import LatestOnlyOperator
latest_only = LatestOnlyOperator(
task_id="latest_only",
dag=dag,
)
train_model >> latest_only >> deploy_model
7)Sensor
Sensor 是用来判断外部条件是否成熟的感应器,例如判断输入文件是否到位(可以设置一个时间窗口内,例如到某个时间点之前检查文件是否到位),但是sensor很耗费计算资源(设置mode为reschedule可以减少开销,默认是poke),DAG会设置concurrency约定同时最多有多少个任务可以运行,称为task slot。所以一种办法是使用Deferrable Operators。FileSensor,判断是否文件存在了;自定义sensor,继承BaseSensorOperator,通过实现poke函数来实现检查逻辑
8)自定义Operator
#自定义一个从PostgreSQL取数,转移数据到S3的operator
def execute(self, context):
postgres_hook = PostgresHook(postgres_conn_id=self._postgres_conn_id) s3_hook = S3Hook(aws_conn_id=self._s3_conn_id)
results = postgres_hook.get_records(self._query) s3_hook.load_string(
Fetch records from the PostgreSQL database.
string_data=str(results),
bucket_name=self._s3_bucket,
key=self._s3_key,
)
关于dag和operator的相关特性介绍到此,后续会讲述Airflow的集群搭建(从入门到精通三),Dolphinscheduler , Dataworks(阿里云)的调度工具后续也会介绍,敬请期待。。。