# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py

from airflow import DAG
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from operators.gcp_container_operator import GKEPodOperator
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check

docs = """
### bqetl_test_dag

Built from bigquery-etl repo, [`dags/bqetl_test_dag.py`](https://github.com/mozilla/bigquery-etl/blob/generated-sql/dags/bqetl_test_dag.py)

#### Owner

test@example.org

#### Tags

* repo/bigquery-etl
"""


default_args = {
    "owner": "test@example.org",
    "start_date": datetime.datetime(2020, 5, 25, 0, 0),
    "end_date": None,
    "email": [],
    "depends_on_past": False,
    "retry_delay": datetime.timedelta(seconds=1800),
    "email_on_failure": True,
    "email_on_retry": True,
    "retries": 2,
}

tags = ["repo/bigquery-etl"]

with DAG(
    "bqetl_test_dag",
    default_args=default_args,
    schedule_interval="@daily",
    doc_md=docs,
    tags=tags,
) as dag:

    checks__fail_test__table1__v1 = bigquery_dq_check(
        task_id="checks__fail_test__table1__v1",
        source_table="table1_v1",
        dataset_id="test",
        project_id="test-project",
        is_dq_check_fail=True,
        owner="test@example.org",
        email=["test@example.org"],
        depends_on_past=False,
        parameters=["submission_date:DATE:{{ds}}"],
        retries=0,
    )

    test__table1__v1 = bigquery_etl_query(
        task_id="test__table1__v1",
        destination_table="table1_v1",
        dataset_id="test",
        project_id="test-project",
        owner="test@example.org",
        email=["test@example.org"],
        date_partition_parameter="submission_date",
        depends_on_past=False,
    )

    test__table2__v1 = bigquery_etl_query(
        task_id="test__table2__v1",
        destination_table="table2_v1",
        dataset_id="test",
        project_id="test-project",
        owner="test@example.org",
        email=["test@example.org"],
        date_partition_parameter="submission_date",
        depends_on_past=False,
    )

    checks__fail_test__table1__v1.set_upstream(test__table1__v1)

    checks__fail_test__table1__v1.set_upstream(test__table2__v1)
