我对气流很陌生。我需要编写一个DAG,在其中我需要传递sql文件。sql文件由许多查询组成,它使用大查询表。它应该安排在PST凌晨3点左右每天运行一次。对于这个DAG,我需要使用哪些操作符?在DML中,还有一个名为event_current_date的变量,它被设置为获取前一天的数据,但是它应该是参数化的,因此需要在前一天运行这个过程。
DECLARE current_event_date STRING DEFAULT CONCAT(
'",CAST(DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY) AS STRING),"'
如果有人能让我更深入地了解如何在此时编写一个DAG,以及如何参数化event_current_date变量,那就太好了。需要在DAG文件中还是在Sql文件中进行参数化?
我的sql文件包含以下查询:-
DECLARE idx, col_cnt, row_cnt, idx_row INT64;
DECLARE col_name, col_flag STRING;
DECLARE cmp_cond,lookup_query, lookup_query_row STRING;
DECLARE col_list ARRAY <STRING>;
DECLARE is_required BOOLEAN;
DECLARE event_names_len, valid_values_len INT64;
DECLARE logic_based_fields STRING; -- this varible is used to hard-coded the rules that are not in the lookup table
DECLARE current_event_date STRING DEFAULT CONCAT("'",CAST(DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY) AS STRING),"'");
-- Re-create temp table to get invalid flags fields from base and lookup tables
CREATE OR REPLACE TABLE `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp` AS
SELECT
base.column_name,
base.column_flag,
base.required_field_flag,
base.event_names,
base.valid_values,
base.field_name,
base.__row_number,
DENSE_RANK() OVER(PARTITION BY base.column_name ORDER BY base.__row_number) AS field_rank
FROM
(
SELECT
bc.column_name,
vlk.field_name,
bc.column_flag,
vlk.required_field_flag,
vlk.event_names,
vlk.valid_values,
ROW_NUMBER() OVER() AS __row_number
FROM
(SELECT
column_name as column_flag,
SUBSTR (column_name, 1, INSTR(column_name, 'is_invalid')-2) column_name
FROM
`st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.INFORMATION_SCHEMA.COLUMNS`
WHERE
table_name = 'st_vix_ott_dev_dq_monitoring_base_test'
AND column_name LIKE "%is_invalid%"
) bc
INNER JOIN `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_valid_values_lookup_test` vlk
ON bc.column_name = vlk.field_name
ORDER BY 1
) base
ORDER BY base.__row_number;
--SELECT * FROM `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp`;
-- Set control variables
SET col_cnt = (SELECT COUNT (*)
FROM `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp`);
SET idx = 1;
SET lookup_query = '';
--build case statements dynamically based on valid values on the lookup table
WHILE idx <= col_cnt DO
SET (col_flag,col_name) = (
SELECT AS STRUCT column_flag,column_name
FROM `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp`
WHERE __row_number = idx
);
SET row_cnt = (SELECT count(*)
FROM `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp`
WHERE IFNULL(field_name,'') = col_name );
IF row_cnt = 1 THEN
--Check if event_names & valid_values arrays are not empty
SET (event_names_len, valid_values_len, is_required) = (
SELECT AS STRUCT ARRAY_LENGTH(event_names),ARRAY_LENGTH(valid_values), required_field_flag
FROM `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp`
WHERE field_name = col_name
);
--Check event_name + field_name is required + field name has invalid values
IF (is_required AND event_names_len > 0 AND valid_values_len > 0) THEN
SET cmp_cond = (
SELECT CONCAT (",CASE WHEN REGEXP_CONTAINS(LOWER(event_name), '^(",LOWER(ARRAY_TO_STRING(event_names,"|")),
")') AND (IFNULL(",col_name,",'') = '' OR LOWER(CAST(", LOWER(col_name), " AS STRING)) NOT IN ('", LOWER(ARRAY_TO_STRING (valid_values, "', '")),
"')) THEN true ELSE false END AS ",col_flag)
FROM `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp`
WHERE field_name = col_name
);
SET lookup_query = CONCAT(lookup_query, cmp_cond);
--Check event_name + field_name is required
ELSEIF (is_required AND event_names_len > 0 AND valid_values_len = 0) THEN
SET cmp_cond = (
SELECT CONCAT (",CASE WHEN REGEXP_CONTAINS(LOWER(event_name), '^(",LOWER(ARRAY_TO_STRING(event_names,"|")),")') AND IFNULL("
,col_name,",'') = '' THEN true ELSE false END AS ",col_flag)
FROM `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp`
WHERE field_name = col_name
);
SET lookup_query = CONCAT(lookup_query, cmp_cond);
--Check field_name is required
ELSEIF (is_required AND event_names_len = 0 AND valid_values_len = 0) THEN
SET cmp_cond = (
SELECT CONCAT (",CASE WHEN IFNULL(", col_name, ",'') = '' THEN true ELSE false END AS ",col_flag)
FROM `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp`
WHERE field_name = col_name
);
SET lookup_query = CONCAT(lookup_query, cmp_cond);
END IF;
-- field_name with multiple rows
ELSEIF row_cnt > 1 THEN
SET idx_row = 1;
SET lookup_query_row = '';
WHILE idx_row <= row_cnt DO
--Check if event_names & valid_values arrays are not empty
SET (event_names_len, valid_values_len, is_required) = (
SELECT AS STRUCT ARRAY_LENGTH(event_names),ARRAY_LENGTH(valid_values), required_field_flag
FROM `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp`
WHERE column_name = col_name and field_rank = idx_row
);
--Check event_name + field_name is required + field name has invalid values
IF (is_required AND event_names_len > 0 AND valid_values_len > 0) THEN
SET cmp_cond = (
SELECT CONCAT ("WHEN REGEXP_CONTAINS(LOWER(event_name), '^(",LOWER(ARRAY_TO_STRING(event_names,"|")),
")') AND (IFNULL(",col_name,",'') = '' OR LOWER(CAST(", LOWER(col_name), " AS STRING)) NOT IN ('", LOWER(ARRAY_TO_STRING (valid_values, "', '")),
"')) THEN true ")
FROM `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp`
WHERE column_name = col_name and field_rank = idx_row
);
SET lookup_query_row = CONCAT(lookup_query_row, cmp_cond);
END IF;
SET idx_row = idx_row + 1;
END WHILE;
SET lookup_query = CONCAT(lookup_query,",CASE ", lookup_query_row, "ELSE false END AS ", col_flag);
SET idx = idx + row_cnt - 1; -- increment to go to the next field
ELSE
SET cmp_cond = CONCAT(",NULL AS ", col_flag);
SET lookup_query = CONCAT(lookup_query, cmp_cond);
END IF;
SET idx = idx + 1; --counter main while loop
END WHILE;
-- This is a workaround due to BQ's dynamic SQL limitations with nested CASE statements
-- These fields aren't in the valid values lookup table
SET logic_based_fields = (SELECT """
,CASE
WHEN LOWER(event_name) LIKE '%video%' AND IFNULL(video_id_channel_id_sports_event_id,'') = '' THEN true
ELSE false END AS video_id_channel_id_sports_event_id_is_invalid_flag
,CASE
WHEN LOWER(event_name) LIKE '%video%'
AND ((IFNULL(navigation_section,'') ='' AND is_epg IS NOT NULL)
OR (is_epg IS NULL AND IFNULL(navigation_section,'') <>'')
OR (is_epg = TRUE AND IFNULL(epg_category,'') = '')) THEN true
ELSE false END AS client_path_sensitive_properties_is_invalid_flag
,CASE
WHEN LOWER(event_name) = 'video content playing'
AND (video_heartbeat_value IS NULL OR video_heartbeat_value > 60 OR video_heartbeat_value <= 0) THEN TRUE
ELSE FALSE END AS video_heartbeat_value_is_invalid_flag
,CASE WHEN LOWER(event_name) LIKE '%video%' THEN 1 ELSE 0 END AS video_event_flag
""");
-- Dynamic SQL to create temp table that will be use to insert into base table and invalid values table
EXECUTE IMMEDIATE format("""
CREATE OR REPLACE TABLE `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_temp`
AS
SELECT
event_date
,anonymous_id
,sl.context_segment_source AS platform_name
,os_version
,event_id
,event_name
,event_type
,stream_type
,session_id
,stream_id
,ip
,navigation_section
,is_epg
,epg_category
,screen_id
,screen_title
,screen_type
,video_content_vertical
,video_genres_first
,video_id_channel_id_sports_event_id
,video_id
,channel_id
,sports_event_id
,video_is_kids
,video_player_mode
,video_title
,video_type
,video_heartbeat_value
,CASE WHEN event_name = 'Video Content Started' THEN true ELSE false END AS event_is_video_start_flag
%s
%s
FROM (
SELECT
context_protocols_source_id,
DATE(original_timestamp) AS event_date,
id AS event_id,
original_event_name AS event_name,
original_event_type AS event_type,
context_ip AS ip,
anonymous_id,
user_id,
COALESCE(session_id,
context_screen_properties_session_id) AS session_id,
screen_id,
screen_title,
screen_type,
stream_id,
stream_type,
video_id,
video_type,
video_title,
video_genres_first,
video_content_vertical,
video_is_kids,
video_player_mode,
video_heartbeat_value,
channel_id,
sports_event_id,
COALESCE(COALESCE(channel_id,video_id),sports_event_id) AS video_id_channel_id_sports_event_id,
is_epg,
epg_category_id,
epg_category,
navigation_section,
context_os_version AS os_version,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY loaded_at DESC) AS __row_number
FROM
`st-vix-ott-dev.vix_collapsed_events_dev.master_event`
WHERE
DATE(_PARTITIONTIME) = %s
AND DATE(original_timestamp) = %s
) AS mev
LEFT JOIN
`st-vix-ott-dev.st_vix_ott_dev_us_data_master_dataset.st_vix_ott_dev_data_segment_lookup_table` sl
ON
mev.context_protocols_source_id = sl.context_protocols_source_id
WHERE mev.__row_number = 1
""",
logic_based_fields,
lookup_query,
current_event_date,
current_event_date
);
--Insert into the base and invalid values tables
IF (SELECT COUNT(*) FROM `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_temp`) > 0 THEN
--Delete current event date data to handle multiple runs in the same day
EXECUTE IMMEDIATE format("""
DELETE
FROM
`st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_test`
WHERE event_date = %s;
""",
current_event_date
);
EXECUTE IMMEDIATE format("""
DELETE
FROM
`st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_invalid_values_test`
WHERE event_date = %s;
""",
current_event_date
);
--Insert into base table
INSERT INTO `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_test`
(
event_date,
anonymous_id,
platform_name,
os_version,
event_id,
event_name,
event_type,
stream_type,
session_id,
stream_id,
ip,
navigation_section,
is_epg,
epg_category,
screen_id,
screen_title,
screen_type,
video_content_vertical,
video_genres_first,
video_id_channel_id_sports_event_id,
video_id,
channel_id,
sports_event_id,
video_is_kids,
video_player_mode,
video_type,
anonymous_id_is_invalid_flag,
client_path_sensitive_properties_is_invalid_flag,
event_is_video_start_flag,
ip_is_invalid_flag,
screen_id_is_invalid_flag,
screen_title_is_invalid_flag,
screen_type_is_invalid_flag,
session_id_is_invalid_flag,
stream_id_is_invalid_flag,
stream_type_is_invalid_flag,
video_heartbeat_value,
video_content_vertical_is_invalid_flag,
video_genres_first_is_invalid_flag,
video_heartbeat_value_is_invalid_flag,
video_id_channel_id_sports_event_id_is_invalid_flag,
video_is_kids_is_invalid_flag,
video_player_mode_is_invalid_flag,
video_type_is_invalid_flag,
video_event_flag,
created_datetime
)
SELECT
event_date,
anonymous_id,
platform_name,
os_version,
event_id,
event_name,
event_type,
stream_type,
session_id,
stream_id,
ip,
navigation_section,
is_epg,
epg_category,
screen_id,
screen_title,
screen_type,
video_content_vertical,
video_genres_first,
video_id_channel_id_sports_event_id,
video_id,
channel_id,
sports_event_id,
video_is_kids,
video_player_mode,
video_type,
anonymous_id_is_invalid_flag,
client_path_sensitive_properties_is_invalid_flag,
event_is_video_start_flag,
ip_is_invalid_flag,
screen_id_is_invalid_flag,
screen_title_is_invalid_flag,
screen_type_is_invalid_flag,
session_id_is_invalid_flag,
stream_id_is_invalid_flag,
stream_type_is_invalid_flag,
video_heartbeat_value,
video_content_vertical_is_invalid_flag,
video_genres_first_is_invalid_flag,
video_heartbeat_value_is_invalid_flag,
video_id_channel_id_sports_event_id_is_invalid_flag,
video_is_kids_is_invalid_flag,
video_player_mode_is_invalid_flag,
video_type_is_invalid_flag,
video_event_flag,
CURRENT_DATETIME()
FROM `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_temp`;
--Insert into invalid value_values tables
INSERT INTO `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_invalid_values_test`
(
event_date,
anonymous_id,
platform_name,
os_version,
event_name,
event_type,
event_invalid_values,
created_datetime
)
WITH cte_invalid
AS
(
SELECT
event_date,
anonymous_id,
platform_name,
os_version,
event_id,
event_name,
event_type,
CASE WHEN video_id_channel_id_sports_event_id_is_invalid_flag THEN video_id_channel_id_sports_event_id ELSE 'valid' END AS video_id_channel_id_sports_event_id,
CASE WHEN video_heartbeat_value_is_invalid_flag THEN cast(video_heartbeat_value as string) ELSE 'valid' END AS video_heartbeat_value,
CASE WHEN ip_is_invalid_flag THEN ip ELSE 'valid' END AS ip,
CASE WHEN screen_id_is_invalid_flag THEN screen_id ELSE 'valid' END AS screen_id,
CASE WHEN screen_title_is_invalid_flag THEN screen_title ELSE 'valid' END AS screen_title,
CASE WHEN screen_type_is_invalid_flag THEN screen_type ELSE 'valid' END AS screen_type,
CASE WHEN session_id_is_invalid_flag THEN session_id ELSE 'valid' END AS session_id,
CASE WHEN stream_id_is_invalid_flag THEN stream_id ELSE 'valid' END AS stream_id,
CASE WHEN stream_type_is_invalid_flag THEN stream_type ELSE 'valid' END AS stream_type,
CASE WHEN video_content_vertical_is_invalid_flag THEN video_content_vertical ELSE 'valid' END AS video_content_vertical,
CASE WHEN video_genres_first_is_invalid_flag THEN video_genres_first ELSE 'valid' END AS video_genres_first,
CASE WHEN video_is_kids_is_invalid_flag THEN video_is_kids ELSE 'valid' END AS video_is_kids,
CASE WHEN video_player_mode_is_invalid_flag THEN video_player_mode ELSE 'valid' END AS video_player_mode,
CASE WHEN video_type_is_invalid_flag THEN video_type ELSE 'valid' END AS video_type,
CASE WHEN client_path_sensitive_properties_is_invalid_flag THEN navigation_section ELSE 'valid' END AS navigation_section,
CASE WHEN client_path_sensitive_properties_is_invalid_flag THEN CAST(is_epg AS STRING) ELSE 'valid' END AS is_epg,
CASE WHEN client_path_sensitive_properties_is_invalid_flag THEN epg_category ELSE 'valid' END AS epg_category
FROM
`st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_temp`
WHERE
video_id_channel_id_sports_event_id_is_invalid_flag
OR client_path_sensitive_properties_is_invalid_flag
OR video_heartbeat_value_is_invalid_flag
OR anonymous_id_is_invalid_flag
OR ip_is_invalid_flag
OR screen_id_is_invalid_flag
OR screen_title_is_invalid_flag
OR screen_type_is_invalid_flag
OR session_id_is_invalid_flag
OR stream_id_is_invalid_flag
OR stream_type_is_invalid_flag
OR video_content_vertical_is_invalid_flag
OR video_genres_first_is_invalid_flag
OR video_is_kids_is_invalid_flag
OR video_player_mode_is_invalid_flag
OR video_type_is_invalid_flag
),
cte_invalid_agg
AS
(SELECT
event_date,
anonymous_id,
platform_name,
os_version,
event_name,
event_type,
event_id,
ARRAY_AGG(STRUCT(field_name,invalid_field_value)) AS invalid_field_value,
FROM
(
SELECT
*
FROM
cte_invalid
) sl
UNPIVOT INCLUDE NULLS
(
invalid_field_value FOR field_name IN (video_id_channel_id_sports_event_id,
video_heartbeat_value,
ip,
screen_id,
screen_title,
screen_type,
session_id,
stream_id,
stream_type,
video_content_vertical,
video_genres_first,
video_is_kids,
video_player_mode,
video_type,
navigation_section,
is_epg,
epg_category
)
)
WHERE IFNULL(invalid_field_value,'') <> 'valid'
GROUP BY
event_date,
anonymous_id,
platform_name,
os_version,
event_name,
event_type,
event_id
)
SELECT
event_date,
anonymous_id,
platform_name,
os_version,
event_name,
event_type,
ARRAY_AGG(STRUCT(event_id, invalid_field_value)) AS event_invalid_values,
CURRENT_DATETIME()
FROM cte_invalid_agg
GROUP BY
event_date,
anonymous_id,
platform_name,
os_version,
event_name,
event_type;
--Drop temp tables
DROP TABLE `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_temp`;
DROP TABLE `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp`;
END IF
我写了一个DAG,它被触发并插入了今天的值,但是它没有删除在末尾写的表。有人能检查一下吗?还有我如何安排它,以便它在PST凌晨3点每天触发一次。这是我的DAG代码:-
import datetime
import os
import logging
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from composer_plugins import get_query_content
# Environments variables
event_collection_project_id = os.environ["EVENT_COLLECTION_PROJECT_ID"]
sql_scripts_folder = os.environ["SQL_SCRIPTS_FOLDER"]
QA_CHECK_QUERY= "DMLs/data_qa_checks/DQ_check_base_table_new.sql"
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)
insert_data_from_sql_file = get_query_content(
sql_scripts_folder,QA_CHECK_QUERY
)
logging.info(f"query: {insert_data_from_sql_file}")
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
"start_date": yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
"email_on_failure": False,
"email_on_retry": False,
# If a task fails, retry it once after waiting at least what's specified in retry_delay
"retries": 1,
"retry_delay": datetime.timedelta(seconds=10),
"project_id": event_collection_project_id,
}
with DAG(
dag_id="data_qa_checks",
schedule_interval=None,
default_args=default_dag_args,
) as dag:
# call the query that will insert the data from sql file and it will do the operations which are mentioned in the query
DQ_dml = BigQueryInsertJobOperator(
task_id="DQ_dml",
job_id="{{ ts_nodash }}-DQ_dml",
configuration={
"query": {
"query": insert_data_from_sql_file,
"useLegacySql": "False",
},
},
dag=dag,
)
发布于 2022-06-29 19:09:56
由于气流使用UTC时区,所以我已经将PST转换为UTC,现在是美国南太时间上午11点。写了一个DAG,并安排在上午11点
import datetime
import os
import logging
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from composer_plugins import get_query_content
# Environments variables
event_collection_project_id = os.environ["EVENT_COLLECTION_PROJECT_ID"]
sql_scripts_folder = os.environ["SQL_SCRIPTS_FOLDER"]
QA_CHECK_QUERY= "DMLs/data_qa_checks/DQ_check_base_table_new_updated.sql"
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)
insert_data_from_sql_file = get_query_content(
sql_scripts_folder,QA_CHECK_QUERY
)
logging.info(f"query: {insert_data_from_sql_file}")
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
"start_date": yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
"email_on_failure": False,
"email_on_retry": False,
# If a task fails, retry it once after waiting at least what's specified in retry_delay
"retries": 1,
"retry_delay": datetime.timedelta(seconds=10),
"project_id": event_collection_project_id,
}
with DAG(
dag_id="data_qa_checks",
schedule_interval="0 11 * * *",
default_args=default_dag_args,
) as dag:
# call the query that will insert the data from sql file and it will do the operations which are mentioned in the query
DQ_dml = BigQueryInsertJobOperator(
task_id="DQ_dml",
job_id="{{ ts_nodash }}-DQ_dml",
configuration={
"query": {
"query": insert_data_from_sql_file,
"useLegacySql": "False",
},
},
dag=dag,
)
发布于 2022-06-12 07:09:42
如果您的气流设置在ec2/k8s机器上,那么源(sql文件)可能位于本地(如果ec2)或s3文件(如果k8s)路径中。
可以使用参数在运行时传递变量(例如sql文件路径或日期)。
https://stackoverflow.com/a/53686174/2986344
关于调用bigquery,您可以利用钩子(通过修改) https://registry.astronomer.io/dags/xcom-gcs-ds
发布于 2022-06-12 15:45:13
您可以使用Jinja模板系统。
{AIRFLOW_HOME}/dags/templates
.sql
文件,例如dml_merge.sql
。 DECLARE current_event_date STRING DEFAULT CONCAT(
'",CAST(DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY) AS STRING),"'
如何处理变量current_event_date
取决于您如何将该变量传递到DAG。
如果没有完整的查询,就不清楚需要在查询中参数化什么,因此举个例子,我将假设您希望将current_event_date
作为DAG中的一个变量传递到查询中。
您是用conf触发进程并将current_event_date
传递到DAG中吗?
dml_merge.sql
中的dml_merge.sql
以访问dag_run
(使用气流的模板化语法{{}}
): DECLARE {{ dag_run.conf.get('current_event_date') }} STRING DEFAULT CONCAT(
'",CAST(DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY) AS STRING),"'
当您使用conf {"current_event_date": "2022-01-01"}
触发这个守护进程时,它将被替换到您的查询中。
您还可以使用它访问变量/xcoms等。只要使用这里语法,在docs sql
文件中引用的任何模板都可以在模板sql
文件中使用。
您需要自定义逻辑来计算current_event_date
吗?
在dag中定义python函数
def custom_macro():
current_event_date = "some date" # function to get the `current_event_date`
return current_event_date
通过传递user_defined_macros
将此自定义宏添加到DAG中
@dag(start_date=pendulum.today(tz="Europe/London"), user_defined_macros={"current_event_date": custom_macro})
通过将current_event_date
封装在dml_merge.sql
中(因为它现在是您定义的宏)来更改dml_merge.sql
中的{{}}
。由于它是一个宏,您需要调用它(因为它是一个函数)来区分它和变量:
DECLARE {{ current_event_date() }} STRING DEFAULT CONCAT(
'",CAST(DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY) AS STRING),"'
当您运行DAG时,气流会看到current_event_date
在您的.sql
中是用户定义的宏,并将调用函数custom_macro
,该函数返回一个值。如果需要更复杂的逻辑,而不仅仅是将日期传递给conf或将其作为变量/xcom访问,则可以使用此方法。
宏在气流中非常强大。关于Jinja模板的气流教程页面是一个很好的起点,如果你想了解更多关于它的这里。
BigQueryInsertJobOperator
运行dml_merge.sql
。如果您在GCP中使用Composer,那么这个操作符已经可以在环境中使用。
如果您的气流实例不在Composer上,则需要将apache-airflow-providers-google
安装到环境中。此提供程序的文档页为这里。
完成该操作后,可以导入操作符:
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
这个操作符的文档是这里。
最后,可以创建运行dml_merge.sql
的任务。
run_dml_merge_query = BigQueryInsertJobOperator(
task_id="run_dml_merge_query",
configuration={
"query": {
"query": "{% include 'templates/dml_merge.sql' %}",
"useLegacySql": False,
}
},
)
同样,我们使用的是带有{% include %}
语法的Jinja模板系统,这将将dml_merge.sql
的内容插入到这个查询dict中,用于操作符的配置。
此模式仅适用于模板字段中的运算符中的字段。您可以看到,操作符的configuration
参数是docs 这里中的模板字段。
当触发DAG和气流达到此任务时:
dml_merge.sql
{% include 'templates/dml_merge.sql' %}
替换为"query": "{% include 'templates/dml_merge.sql' %}",
中文件的内容。您需要确保所使用的任何服务帐户对BigQuery (roles/bigquery.jobUser
)都具有正确的权限。BigQuery的IAM页面是这里
https://stackoverflow.com/questions/72589428
复制相似问题