# Generated via https://github.com/mozilla/bigquery-etl/blob/master/bigquery_etl/query_scheduling/generate_airflow_dags.py

from airflow import DAG
from airflow.operators.sensors import ExternalTaskSensor
import datetime
from utils.gcp import bigquery_etl_query, gke_command

default_args = {
    "owner": "test@example.org",
    "start_date": datetime.datetime(2020, 5, 25, 0, 0),
    "end_date": None,
    "email": [],
    "depends_on_past": False,
    "retry_delay": datetime.timedelta(seconds=1800),
    "email_on_failure": True,
    "email_on_retry": True,
    "retries": 2,
}

with DAG(
    "bqetl_test_dag", default_args=default_args, schedule_interval="@daily"
) as dag:

    {{ temporary_dataset }}__query__v1 = bigquery_etl_query(
        task_id="{{ temporary_dataset }}__query__v1",
        destination_table="query_v1",
        dataset_id="{{ temporary_dataset }}",
        project_id="bigquery-etl-integration-test",
        owner="test@example.org",
        email=["test@example.org"],
        date_partition_parameter="submission_date",
        depends_on_past=False,
        dag=dag,
    )

    {{ temporary_dataset }}__table1__v1 = bigquery_etl_query(
        task_id="{{ temporary_dataset }}__table1__v1",
        destination_table="table1_v1",
        dataset_id="{{ temporary_dataset }}",
        project_id="bigquery-etl-integration-test",
        owner="test@example.org",
        email=["test@example.org"],
        date_partition_parameter="submission_date",
        depends_on_past=False,
        dag=dag,
    )

    {{ temporary_dataset }}__table2__v1 = bigquery_etl_query(
        task_id="{{ temporary_dataset }}__table2__v1",
        destination_table="table2_v1",
        dataset_id="{{ temporary_dataset }}",
        project_id="bigquery-etl-integration-test",
        owner="test@example.org",
        email=["test@example.org"],
        date_partition_parameter="submission_date",
        depends_on_past=False,
        dag=dag,
    )

    wait_for_{{ temporary_dataset }}__external_table__v1 = ExternalTaskSensor(
        task_id="wait_for_{{ temporary_dataset }}__external_table__v1",
        external_dag_id="bqetl_external_test_dag",
        external_task_id="{{ temporary_dataset }}__external_table__v1",
        check_existence=True,
        mode="reschedule",
        pool="DATA_ENG_EXTERNALTASKSENSOR",
    )

    {{ temporary_dataset }}__query__v1.set_upstream(
        wait_for_{{ temporary_dataset }}__external_table__v1
    )

    {{ temporary_dataset }}__query__v1.set_upstream({{ temporary_dataset }}__table1__v1)

    {{ temporary_dataset }}__query__v1.set_upstream({{ temporary_dataset }}__table2__v1)
