# Generated via https://github.com/mozilla/bigquery-etl/blob/master/bigquery_etl/query_scheduling/generate_airflow_dags.py

from airflow import DAG
from airflow.operators.sensors import ExternalTaskSensor
import datetime
from utils.gcp import bigquery_etl_query, gke_command

default_args = {
    "owner": "test@example.org",
    "start_date": datetime.datetime(2020, 1, 1, 0, 0),
    "end_date": None,
    "email": [],
    "depends_on_past": False,
    "retry_delay": datetime.timedelta(seconds=1800),
    "email_on_failure": True,
    "email_on_retry": True,
    "retries": 2,
}

with DAG(
    "bqetl_test_dag", default_args=default_args, schedule_interval="@daily"
) as dag:

    test__no_metadata_query__v1 = bigquery_etl_query(
        task_id="test__no_metadata_query__v1",
        destination_table="no_metadata_query_v1",
        dataset_id="test",
        project_id="moz-fx-data-test-project",
        owner="test@example.com",
        email=["test@example.com"],
        date_partition_parameter="submission_date",
        depends_on_past=True,
        dag=dag,
    )

    test__non_incremental_query__v1 = bigquery_etl_query(
        task_id="test__non_incremental_query__v1",
        destination_table="non_incremental_query_v1",
        dataset_id="test",
        project_id="moz-fx-data-test-project",
        owner="test@example.com",
        email=["test@example.com"],
        date_partition_parameter="submission_date",
        depends_on_past=True,
        dag=dag,
    )

    wait_for_external_task1 = ExternalTaskSensor(
        task_id="wait_for_external_task1",
        external_dag_id="external",
        external_task_id="task1",
        check_existence=True,
        mode="reschedule",
        pool="DATA_ENG_EXTERNALTASKSENSOR",
        dag=dag,
    )

    test__no_metadata_query__v1.set_upstream(wait_for_external_task1)

    test__non_incremental_query__v1.set_upstream(wait_for_external_task1)
