首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何编写DAG来传递sql文件?

如何编写DAG来传递sql文件?
EN

Stack Overflow用户
提问于 2022-06-12 04:09:02
回答 3查看 999关注 0票数 1

我对气流很陌生。我需要编写一个DAG,在其中我需要传递sql文件。sql文件由许多查询组成,它使用大查询表。它应该安排在PST凌晨3点左右每天运行一次。对于这个DAG,我需要使用哪些操作符?在DML中,还有一个名为event_current_date的变量,它被设置为获取前一天的数据,但是它应该是参数化的,因此需要在前一天运行这个过程。

代码语言:javascript
运行
复制
 DECLARE current_event_date STRING DEFAULT CONCAT(
'",CAST(DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY) AS STRING),"' 

如果有人能让我更深入地了解如何在此时编写一个DAG,以及如何参数化event_current_date变量,那就太好了。需要在DAG文件中还是在Sql文件中进行参数化?

我的sql文件包含以下查询:-

代码语言:javascript
运行
复制
DECLARE idx, col_cnt, row_cnt, idx_row INT64;
DECLARE col_name, col_flag STRING;
DECLARE cmp_cond,lookup_query, lookup_query_row STRING;
DECLARE col_list ARRAY <STRING>;
DECLARE is_required BOOLEAN;
DECLARE event_names_len, valid_values_len INT64;
DECLARE logic_based_fields STRING; -- this varible is used to hard-coded the rules that are not in the lookup table
DECLARE current_event_date STRING DEFAULT CONCAT("'",CAST(DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY) AS STRING),"'");

-- Re-create temp table to get invalid flags fields from base and lookup tables
CREATE OR REPLACE TABLE `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp` AS
   SELECT
         base.column_name,
         base.column_flag,
         base.required_field_flag,
         base.event_names,
         base.valid_values,
         base.field_name,
         base.__row_number,
         DENSE_RANK() OVER(PARTITION BY base.column_name ORDER BY base.__row_number) AS field_rank
      FROM
      (
         SELECT 
            bc.column_name,
            vlk.field_name,
            bc.column_flag,
            vlk.required_field_flag,
            vlk.event_names,
            vlk.valid_values,
            ROW_NUMBER() OVER() AS __row_number
         FROM 
            (SELECT
               column_name as column_flag,
               SUBSTR (column_name, 1, INSTR(column_name, 'is_invalid')-2) column_name
            FROM
               `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.INFORMATION_SCHEMA.COLUMNS`
            WHERE
            table_name = 'st_vix_ott_dev_dq_monitoring_base_test'
            AND column_name LIKE "%is_invalid%"
            ) bc
            INNER JOIN `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_valid_values_lookup_test` vlk
            ON bc.column_name = vlk.field_name
            ORDER BY 1
         ) base
      ORDER BY base.__row_number;

--SELECT * FROM `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp`;

-- Set control variables
SET col_cnt = (SELECT COUNT (*) 
               FROM `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp`);
SET idx = 1;
SET lookup_query = '';

--build case statements dynamically based on valid values on the lookup table
WHILE idx <= col_cnt DO
   SET (col_flag,col_name) = (
     SELECT AS STRUCT column_flag,column_name 
     FROM `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp`
     WHERE __row_number = idx
     );

   SET row_cnt = (SELECT count(*) 
                  FROM `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp`
                  WHERE IFNULL(field_name,'') = col_name );

   IF row_cnt = 1 THEN
         --Check if event_names & valid_values arrays are not empty
      SET (event_names_len, valid_values_len, is_required) = (
            SELECT AS STRUCT ARRAY_LENGTH(event_names),ARRAY_LENGTH(valid_values), required_field_flag 
            FROM  `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp`
            WHERE field_name = col_name
         );

      --Check event_name + field_name is required + field name has invalid values
      IF (is_required AND event_names_len > 0 AND valid_values_len > 0) THEN
            SET cmp_cond = (
               SELECT CONCAT (",CASE WHEN REGEXP_CONTAINS(LOWER(event_name), '^(",LOWER(ARRAY_TO_STRING(event_names,"|")),
               ")') AND (IFNULL(",col_name,",'') = '' OR LOWER(CAST(", LOWER(col_name), " AS STRING)) NOT IN ('", LOWER(ARRAY_TO_STRING (valid_values, "', '")), 
               "')) THEN true ELSE false END AS ",col_flag)
               FROM `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp`
               WHERE field_name = col_name
            );
            SET lookup_query = CONCAT(lookup_query, cmp_cond);

         --Check event_name + field_name is required
         ELSEIF (is_required AND event_names_len > 0 AND valid_values_len = 0) THEN
            SET cmp_cond = (
               SELECT CONCAT (",CASE WHEN REGEXP_CONTAINS(LOWER(event_name), '^(",LOWER(ARRAY_TO_STRING(event_names,"|")),")') AND IFNULL("
               ,col_name,",'') = '' THEN true ELSE false END AS ",col_flag)
               FROM `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp`
               WHERE field_name = col_name
            );
            SET lookup_query = CONCAT(lookup_query, cmp_cond);

         --Check field_name is required      
         ELSEIF (is_required AND event_names_len = 0 AND valid_values_len = 0) THEN
            SET cmp_cond = (
                  SELECT CONCAT (",CASE WHEN IFNULL(", col_name, ",'') = '' THEN true ELSE false END AS ",col_flag)
                  FROM `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp`
                  WHERE field_name = col_name
            );
            SET lookup_query = CONCAT(lookup_query, cmp_cond);
      END IF;
   
   -- field_name with multiple rows
   ELSEIF row_cnt > 1 THEN
      SET idx_row = 1;
      SET lookup_query_row = '';
         
      WHILE idx_row <= row_cnt DO
         --Check if event_names & valid_values arrays are not empty
         SET (event_names_len, valid_values_len, is_required) = (
               SELECT AS STRUCT ARRAY_LENGTH(event_names),ARRAY_LENGTH(valid_values), required_field_flag 
               FROM  `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp`
               WHERE column_name = col_name and field_rank = idx_row
            );

         --Check event_name + field_name is required + field name has invalid values
         IF (is_required AND event_names_len > 0 AND valid_values_len > 0) THEN
            SET cmp_cond = (
               SELECT CONCAT ("WHEN REGEXP_CONTAINS(LOWER(event_name), '^(",LOWER(ARRAY_TO_STRING(event_names,"|")),
               ")') AND (IFNULL(",col_name,",'') = '' OR LOWER(CAST(", LOWER(col_name), " AS STRING)) NOT IN ('", LOWER(ARRAY_TO_STRING (valid_values, "', '")), 
               "')) THEN true ")
               FROM `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp`
               WHERE column_name = col_name and field_rank = idx_row
            );
            SET lookup_query_row = CONCAT(lookup_query_row, cmp_cond);
         END IF;
         SET idx_row = idx_row + 1;
      END WHILE;
      SET lookup_query = CONCAT(lookup_query,",CASE ", lookup_query_row, "ELSE false END AS ", col_flag);
      SET idx = idx + row_cnt - 1; -- increment to go to the next field
   
   ELSE
      SET cmp_cond = CONCAT(",NULL AS ", col_flag);
      SET lookup_query = CONCAT(lookup_query, cmp_cond); 
   END IF;
   SET idx = idx + 1; --counter main while loop  
END WHILE;

-- This is a workaround due to BQ's dynamic SQL limitations with nested CASE statements
-- These fields aren't in the valid values lookup table
SET logic_based_fields = (SELECT """
                                 ,CASE 
                                       WHEN LOWER(event_name) LIKE '%video%' AND IFNULL(video_id_channel_id_sports_event_id,'') = '' THEN true 
                                    ELSE false END AS video_id_channel_id_sports_event_id_is_invalid_flag
                                 ,CASE 
                                       WHEN LOWER(event_name) LIKE '%video%' 
                                          AND ((IFNULL(navigation_section,'') ='' AND is_epg IS NOT NULL) 
                                             OR (is_epg IS NULL AND IFNULL(navigation_section,'') <>'') 
                                             OR (is_epg = TRUE AND IFNULL(epg_category,'') = '')) THEN true 
                                    ELSE false END AS client_path_sensitive_properties_is_invalid_flag
                                 ,CASE
                                    WHEN LOWER(event_name) = 'video content playing' 
                                       AND (video_heartbeat_value IS NULL OR video_heartbeat_value > 60 OR video_heartbeat_value <= 0) THEN TRUE
                                    ELSE FALSE END AS video_heartbeat_value_is_invalid_flag
                                 ,CASE WHEN LOWER(event_name) LIKE '%video%' THEN 1 ELSE 0 END AS video_event_flag
                           """);


-- Dynamic SQL to create temp table that will be use to insert into base table and invalid values table
EXECUTE IMMEDIATE format("""
   CREATE OR REPLACE TABLE `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_temp` 
   AS
      SELECT
         event_date
         ,anonymous_id
         ,sl.context_segment_source AS platform_name
         ,os_version
         ,event_id
         ,event_name
         ,event_type
         ,stream_type
         ,session_id
         ,stream_id
         ,ip
         ,navigation_section
         ,is_epg
         ,epg_category
         ,screen_id
         ,screen_title
         ,screen_type
         ,video_content_vertical
         ,video_genres_first
         ,video_id_channel_id_sports_event_id
         ,video_id
         ,channel_id
         ,sports_event_id
         ,video_is_kids
         ,video_player_mode
         ,video_title
         ,video_type
         ,video_heartbeat_value
         ,CASE WHEN event_name = 'Video Content Started' THEN true ELSE false END AS event_is_video_start_flag
         %s
         %s
      FROM (
      SELECT
            context_protocols_source_id,
            DATE(original_timestamp) AS event_date,
            id AS event_id,
            original_event_name AS event_name,
            original_event_type AS event_type,
            context_ip AS ip,
            anonymous_id,
            user_id,
            COALESCE(session_id,
            context_screen_properties_session_id) AS session_id,
            screen_id,
            screen_title,
            screen_type,
            stream_id,
            stream_type,
            video_id,
            video_type,
            video_title,
            video_genres_first,
            video_content_vertical,
            video_is_kids,
            video_player_mode,
            video_heartbeat_value,
            channel_id,
            sports_event_id,
            COALESCE(COALESCE(channel_id,video_id),sports_event_id) AS video_id_channel_id_sports_event_id,
            is_epg,
            epg_category_id,
            epg_category,
            navigation_section,
            context_os_version AS os_version,
            ROW_NUMBER() OVER (PARTITION BY id ORDER BY loaded_at DESC) AS __row_number
      FROM
         `st-vix-ott-dev.vix_collapsed_events_dev.master_event`
      WHERE
         DATE(_PARTITIONTIME) = %s 
         AND DATE(original_timestamp) = %s
         ) AS mev
      LEFT JOIN
      `st-vix-ott-dev.st_vix_ott_dev_us_data_master_dataset.st_vix_ott_dev_data_segment_lookup_table` sl 
      ON
      mev.context_protocols_source_id = sl.context_protocols_source_id
      WHERE mev.__row_number = 1
""",
logic_based_fields,
lookup_query,
current_event_date,
current_event_date
);

--Insert into the base and invalid values tables
IF (SELECT COUNT(*) FROM `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_temp`) > 0 THEN

   --Delete current event date data to handle multiple runs in the same day
   EXECUTE IMMEDIATE format(""" 
               DELETE
               FROM
               `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_test`
               WHERE event_date = %s;
   """,
   current_event_date
   );

   EXECUTE IMMEDIATE format("""                
               DELETE
               FROM
               `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_invalid_values_test`
               WHERE event_date = %s;
   """,
   current_event_date
   );

   --Insert into base table
   INSERT INTO `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_test`
   (
      event_date,
      anonymous_id,
      platform_name,
      os_version,
      event_id,
      event_name,
      event_type,
      stream_type,
      session_id,
      stream_id,
      ip,
      navigation_section,
      is_epg,
      epg_category,
      screen_id,
      screen_title,
      screen_type,
      video_content_vertical,
      video_genres_first,
      video_id_channel_id_sports_event_id,
      video_id,
      channel_id,
      sports_event_id,
      video_is_kids,
      video_player_mode,
      video_type,
      anonymous_id_is_invalid_flag,
      client_path_sensitive_properties_is_invalid_flag,
      event_is_video_start_flag,
      ip_is_invalid_flag,
      screen_id_is_invalid_flag,
      screen_title_is_invalid_flag,
      screen_type_is_invalid_flag,
      session_id_is_invalid_flag,
      stream_id_is_invalid_flag,
      stream_type_is_invalid_flag,
      video_heartbeat_value,
      video_content_vertical_is_invalid_flag,
      video_genres_first_is_invalid_flag,
      video_heartbeat_value_is_invalid_flag,
      video_id_channel_id_sports_event_id_is_invalid_flag,
      video_is_kids_is_invalid_flag,
      video_player_mode_is_invalid_flag,
      video_type_is_invalid_flag,
      video_event_flag,
      created_datetime
   )
   SELECT 
      event_date,
      anonymous_id,
      platform_name,
      os_version,
      event_id,
      event_name,
      event_type,
      stream_type,
      session_id,
      stream_id,
      ip,
      navigation_section,
      is_epg,
      epg_category,
      screen_id,
      screen_title,
      screen_type,
      video_content_vertical,
      video_genres_first,
      video_id_channel_id_sports_event_id,
      video_id,
      channel_id,
      sports_event_id,
      video_is_kids,
      video_player_mode,
      video_type,
      anonymous_id_is_invalid_flag,
      client_path_sensitive_properties_is_invalid_flag,
      event_is_video_start_flag,
      ip_is_invalid_flag,
      screen_id_is_invalid_flag,
      screen_title_is_invalid_flag,
      screen_type_is_invalid_flag,
      session_id_is_invalid_flag,
      stream_id_is_invalid_flag,
      stream_type_is_invalid_flag,
      video_heartbeat_value,
      video_content_vertical_is_invalid_flag,
      video_genres_first_is_invalid_flag,
      video_heartbeat_value_is_invalid_flag,
      video_id_channel_id_sports_event_id_is_invalid_flag,
      video_is_kids_is_invalid_flag,
      video_player_mode_is_invalid_flag,
      video_type_is_invalid_flag,
      video_event_flag,
      CURRENT_DATETIME()
   FROM `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_temp`;

   --Insert into invalid value_values tables
   INSERT INTO `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_invalid_values_test`
      (
      event_date,
      anonymous_id,
      platform_name,
      os_version,
      event_name,
      event_type,
      event_invalid_values,
      created_datetime  
      )
      WITH cte_invalid
      AS
      (
      SELECT 
            event_date, 
            anonymous_id,       
            platform_name,
            os_version,     
            event_id,       
            event_name,     
            event_type,         
            CASE WHEN video_id_channel_id_sports_event_id_is_invalid_flag THEN video_id_channel_id_sports_event_id ELSE 'valid' END  AS video_id_channel_id_sports_event_id,
            CASE WHEN video_heartbeat_value_is_invalid_flag THEN cast(video_heartbeat_value as string) ELSE 'valid' END AS video_heartbeat_value,
            CASE WHEN ip_is_invalid_flag THEN ip ELSE 'valid' END  AS ip,
            CASE WHEN screen_id_is_invalid_flag THEN screen_id ELSE 'valid' END  AS screen_id,
            CASE WHEN screen_title_is_invalid_flag THEN screen_title ELSE 'valid' END  AS screen_title,
            CASE WHEN screen_type_is_invalid_flag THEN screen_type ELSE 'valid' END  AS screen_type,
            CASE WHEN session_id_is_invalid_flag THEN session_id ELSE 'valid' END  AS session_id,
            CASE WHEN stream_id_is_invalid_flag THEN stream_id ELSE 'valid' END  AS stream_id,
            CASE WHEN stream_type_is_invalid_flag THEN stream_type ELSE 'valid' END  AS stream_type,
            CASE WHEN video_content_vertical_is_invalid_flag THEN video_content_vertical ELSE 'valid' END  AS video_content_vertical,
            CASE WHEN video_genres_first_is_invalid_flag THEN video_genres_first ELSE 'valid' END  AS video_genres_first,
            CASE WHEN video_is_kids_is_invalid_flag THEN video_is_kids ELSE 'valid' END  AS video_is_kids,
            CASE WHEN video_player_mode_is_invalid_flag THEN video_player_mode ELSE 'valid' END  AS video_player_mode,
            CASE WHEN video_type_is_invalid_flag THEN video_type ELSE 'valid' END  AS video_type,
            CASE WHEN client_path_sensitive_properties_is_invalid_flag THEN navigation_section ELSE 'valid' END  AS navigation_section,
            CASE WHEN client_path_sensitive_properties_is_invalid_flag THEN CAST(is_epg AS STRING) ELSE 'valid' END  AS is_epg,
            CASE WHEN client_path_sensitive_properties_is_invalid_flag THEN epg_category ELSE 'valid' END  AS epg_category
      FROM
         `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_temp`
      WHERE 
            video_id_channel_id_sports_event_id_is_invalid_flag 
            OR client_path_sensitive_properties_is_invalid_flag 
            OR video_heartbeat_value_is_invalid_flag 
            OR anonymous_id_is_invalid_flag 
            OR ip_is_invalid_flag 
            OR screen_id_is_invalid_flag 
            OR screen_title_is_invalid_flag 
            OR screen_type_is_invalid_flag 
            OR session_id_is_invalid_flag 
            OR stream_id_is_invalid_flag 
            OR stream_type_is_invalid_flag 
            OR video_content_vertical_is_invalid_flag 
            OR video_genres_first_is_invalid_flag 
            OR video_is_kids_is_invalid_flag 
            OR video_player_mode_is_invalid_flag 
            OR video_type_is_invalid_flag

      ),
      cte_invalid_agg
      AS
      (SELECT 
            event_date, 
            anonymous_id,       
            platform_name,
            os_version,             
            event_name,     
            event_type,
            event_id,
         ARRAY_AGG(STRUCT(field_name,invalid_field_value)) AS invalid_field_value,
      FROM
      (
         SELECT
            *
         FROM 
         cte_invalid
      ) sl
      UNPIVOT INCLUDE NULLS
      (
         invalid_field_value FOR field_name IN (video_id_channel_id_sports_event_id,
                                                video_heartbeat_value,
                                                ip,
                                                screen_id,
                                                screen_title,
                                                screen_type,
                                                session_id,
                                                stream_id,
                                                stream_type,
                                                video_content_vertical,
                                                video_genres_first,
                                                video_is_kids,
                                                video_player_mode,
                                                video_type,
                                                navigation_section,
                                                is_epg,
                                                epg_category
                                                )
      
      )
      WHERE IFNULL(invalid_field_value,'') <> 'valid'
      GROUP BY       
            event_date, 
            anonymous_id,       
            platform_name,
            os_version,             
            event_name,     
            event_type,
            event_id
      )

      SELECT 
            event_date, 
            anonymous_id,       
            platform_name,
            os_version,             
            event_name,     
            event_type,
            ARRAY_AGG(STRUCT(event_id, invalid_field_value)) AS event_invalid_values,
            CURRENT_DATETIME()
      FROM cte_invalid_agg
      GROUP BY
            event_date, 
            anonymous_id,       
            platform_name,
            os_version,             
            event_name,     
            event_type;

   --Drop temp tables
   DROP TABLE `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_temp`;
   DROP TABLE `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp`;
      
END IF

我写了一个DAG,它被触发并插入了今天的值,但是它没有删除在末尾写的表。有人能检查一下吗?还有我如何安排它,以便它在PST凌晨3点每天触发一次。这是我的DAG代码:-

代码语言:javascript
运行
复制
import datetime
import os
import logging
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from composer_plugins import get_query_content

# Environments variables
event_collection_project_id = os.environ["EVENT_COLLECTION_PROJECT_ID"]
sql_scripts_folder = os.environ["SQL_SCRIPTS_FOLDER"]
QA_CHECK_QUERY= "DMLs/data_qa_checks/DQ_check_base_table_new.sql"


yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

insert_data_from_sql_file = get_query_content(
    sql_scripts_folder,QA_CHECK_QUERY
)

logging.info(f"query: {insert_data_from_sql_file}")

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
    # If a task fails, retry it once after waiting at least what's specified in retry_delay
    "retries": 1,
    "retry_delay": datetime.timedelta(seconds=10),
    "project_id": event_collection_project_id,
}

with DAG(
        dag_id="data_qa_checks",
        schedule_interval=None,
        default_args=default_dag_args,
) as dag:

    # call the query that will insert the data from sql file and it will do the operations which are mentioned in the query
    DQ_dml = BigQueryInsertJobOperator(
        task_id="DQ_dml",
        job_id="{{ ts_nodash }}-DQ_dml",
        configuration={
            "query": {
                "query": insert_data_from_sql_file,
                "useLegacySql": "False",
            },
        },
        dag=dag,
    )      
EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2022-06-29 19:09:56

由于气流使用UTC时区,所以我已经将PST转换为UTC,现在是美国南太时间上午11点。写了一个DAG,并安排在上午11点

代码语言:javascript
运行
复制
import datetime
import os
import logging
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from composer_plugins import get_query_content


# Environments variables
event_collection_project_id = os.environ["EVENT_COLLECTION_PROJECT_ID"]
sql_scripts_folder = os.environ["SQL_SCRIPTS_FOLDER"]


QA_CHECK_QUERY= "DMLs/data_qa_checks/DQ_check_base_table_new_updated.sql"


yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

insert_data_from_sql_file = get_query_content(
    sql_scripts_folder,QA_CHECK_QUERY
)

logging.info(f"query: {insert_data_from_sql_file}")

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
    # If a task fails, retry it once after waiting at least what's specified in retry_delay
    "retries": 1,
    "retry_delay": datetime.timedelta(seconds=10),
    "project_id": event_collection_project_id,
}

with DAG(
        dag_id="data_qa_checks",
        schedule_interval="0 11 * * *",
        default_args=default_dag_args,
) as dag:

    # call the query that will insert the data from sql file and it will do the operations which are mentioned in the query
    DQ_dml = BigQueryInsertJobOperator(
        task_id="DQ_dml",
        job_id="{{ ts_nodash }}-DQ_dml",
        configuration={
            "query": {
                "query": insert_data_from_sql_file,
                "useLegacySql": "False",
            },
        },
        dag=dag,
    ) 
票数 0
EN

Stack Overflow用户

发布于 2022-06-12 07:09:42

如果您的气流设置在ec2/k8s机器上,那么源(sql文件)可能位于本地(如果ec2)或s3文件(如果k8s)路径中。

可以使用参数在运行时传递变量(例如sql文件路径或日期)。

https://stackoverflow.com/a/53686174/2986344

关于调用bigquery,您可以利用钩子(通过修改) https://registry.astronomer.io/dags/xcom-gcs-ds

票数 0
EN

Stack Overflow用户

发布于 2022-06-12 15:45:13

您可以使用Jinja模板系统

  1. 在DAGs文件夹中创建一个目录来存储DAG使用的查询,例如我创建了{AIRFLOW_HOME}/dags/templates
  2. 在这个目录中创建一个.sql文件,例如dml_merge.sql
  3. 将查询内容粘贴到此文件中
代码语言:javascript
运行
复制
 DECLARE current_event_date STRING DEFAULT CONCAT(
'",CAST(DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY) AS STRING),"' 

如何处理变量current_event_date取决于您如何将该变量传递到DAG。

如果没有完整的查询,就不清楚需要在查询中参数化什么,因此举个例子,我将假设您希望将current_event_date作为DAG中的一个变量传递到查询中。

您是用conf触发进程并将current_event_date传递到DAG中吗?

  1. 更改dml_merge.sql中的dml_merge.sql以访问dag_run (使用气流的模板化语法{{}}):
代码语言:javascript
运行
复制
 DECLARE {{ dag_run.conf.get('current_event_date') }} STRING DEFAULT CONCAT(
'",CAST(DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY) AS STRING),"' 

当您使用conf {"current_event_date": "2022-01-01"}触发这个守护进程时,它将被替换到您的查询中。

您还可以使用它访问变量/xcoms等。只要使用这里语法,在docs sql文件中引用的任何模板都可以在模板sql文件中使用。

您需要自定义逻辑来计算current_event_date吗?

  1. 创建自定义宏

在dag中定义python函数

代码语言:javascript
运行
复制
def custom_macro():
    current_event_date = "some date" # function to get the `current_event_date`
    return current_event_date

通过传递user_defined_macros将此自定义宏添加到DAG中

代码语言:javascript
运行
复制
@dag(start_date=pendulum.today(tz="Europe/London"), user_defined_macros={"current_event_date": custom_macro})

通过将current_event_date封装在dml_merge.sql中(因为它现在是您定义的宏)来更改dml_merge.sql中的{{}}。由于它是一个宏,您需要调用它(因为它是一个函数)来区分它和变量:

代码语言:javascript
运行
复制
 DECLARE {{ current_event_date() }} STRING DEFAULT CONCAT(
'",CAST(DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY) AS STRING),"' 

当您运行DAG时,气流会看到current_event_date在您的.sql中是用户定义的宏,并将调用函数custom_macro,该函数返回一个值。如果需要更复杂的逻辑,而不仅仅是将日期传递给conf或将其作为变量/xcom访问,则可以使用此方法。

宏在气流中非常强大。关于Jinja模板的气流教程页面是一个很好的起点,如果你想了解更多关于它的这里

  1. 使用BigQueryInsertJobOperator运行dml_merge.sql

如果您在GCP中使用Composer,那么这个操作符已经可以在环境中使用。

如果您的气流实例不在Composer上,则需要将apache-airflow-providers-google安装到环境中。此提供程序的文档页为这里

完成该操作后,可以导入操作符:

代码语言:javascript
运行
复制
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

这个操作符的文档是这里

最后,可以创建运行dml_merge.sql的任务。

代码语言:javascript
运行
复制
run_dml_merge_query = BigQueryInsertJobOperator(
    task_id="run_dml_merge_query",
    configuration={
        "query": {
            "query": "{% include 'templates/dml_merge.sql' %}",
            "useLegacySql": False,
        }
    },
)

同样,我们使用的是带有{% include %}语法的Jinja模板系统,这将将dml_merge.sql的内容插入到这个查询dict中,用于操作符的配置。

此模式仅适用于模板字段中的运算符中的字段。您可以看到,操作符的configuration参数是docs 这里中的模板字段。

当触发DAG和气流达到此任务时:

  1. 它会读到dml_merge.sql
  2. {% include 'templates/dml_merge.sql' %}替换为"query": "{% include 'templates/dml_merge.sql' %}",中文件的内容。
  3. 运行任何宏/对此字符串内容中的变量进行任何替换。
  4. 在BigQuery上以作业的形式运行查询。

您需要确保所使用的任何服务帐户对BigQuery (roles/bigquery.jobUser)都具有正确的权限。BigQuery的IAM页面是这里

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72589428

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档