CeleryExecutor可用于正式环境,使用 Celery 作为Task执行的引擎, 扩展性很好。这里使用rabbitmq作为celery的消息存储。
在机器A和机器B上安装airflow
pip2 install airflow[celery]
pip2 install airflow[rabbitmq]
注意:最新版本的celery(4.0.2)可能与rabbitmq的管理端不兼容,如果在rabbitmq管理端或用命令行工具显示多列时,报错如下
{error,{exit,{ucs,{bad_utf8_character_code}},
[{xmerl_ucs,from_utf8,1,[{file,"xmerl_ucs.erl"},{line,185}]},
{mochijson2,json_encode_string,2,[]},
{mochijson2,'-json_encode_proplist/2-fun-0-',3,[]},
{lists,foldl,3,[{file,"lists.erl"},{line,1197}]},
{mochijson2,json_encode_proplist,2,[]},
{mochijson2,'-json_encode_array/2-fun-0-',3,[]},
{lists,foldl,3,[{file,"lists.erl"},{line,1197}]},
{mochijson2,json_encode_array,2,[]}]}}
可以安装最新的celery-3.x.x的版本
pip2 -U install celery==3.1.24
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
#executor = SequentialExecutor
executor = CeleryExecutor
# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
# a sqlalchemy database. Refer to the Celery documentation for more
# information.
#broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
broker_url = amqp://username:passowrd@201401@host:5672/project
# Another key Celery setting
celery_result_backend = db+mysql://user:passowrd@host:port/airflow
airflow worker -D
airflow scheduler -D
将airflow例子example_bash_operator中的 schedule_interval 改为@once
dag = DAG(
dag_id='example_bash_operator', default_args=args,
#schedule_interval='0 0 * * *',
schedule_interval="@once",
dagrun_timeout=timedelta(minutes=60))
另存为文件 example_bash_operator.py
分别上传到机器A和机器B的dags_folder目录
$ tree
.
`-- airflow
|-- airflow-scheduler.err
|-- airflow-scheduler.log
|-- airflow-scheduler.out
|-- airflow.cfg
|-- dags
| |-- example_bash_operator.py
airflow trigger_dag example_bash_operator
查看DAG任务
$ airflow list_tasks example_bash_operator
also_run_this
run_after_loop
run_this_last
runme_0
runme_1
runme_2
机器A执行日志如下
$ tree
.
|-- also_run_this
| `-- 2017-04-13T20:42:35
|-- run_this_last
| `-- 2017-04-13T20:42:35
`-- runme_1
`-- 2017-04-13T20:42:35
机器B执行日志如下
$ tree
.
|-- run_after_loop
| `-- 2017-04-13T20:42:35
|-- runme_0
| `-- 2017-04-13T20:42:35
`-- runme_2
`-- 2017-04-13T20:42:35
从上面的日志文件可以看出,这个DAG的6个任务被分发到两台机器执行,每台机器执行3个任务。
airflow的log日志默认存储在文件中,也可以远程存储,配置如下
# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
# must supply a remote location URL (starting with either 's3://...' or
# 'gs://...') and an Airflow connection id that provides access to the storage
# location.
remote_base_log_folder =
remote_log_conn_id =
# Use server-side encryption for logs stored in S3
encrypt_s3_logs = False
# DEPRECATED option for remote log storage, use remote_base_log_folder instead!
s3_log_folder =
也可以通过logstach将日志搜集到Elasticsearch中存储
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。