
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py

from airflow import DAG
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from operators.gcp_container_operator import GKEPodOperator
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, bigquery_dq_check, bigquery_bigeye_check

docs = """
### bqetl_test_dag

Built from bigquery-etl repo, [`dags/bqetl_test_dag.py`](https://github.com/mozilla/bigquery-etl/blob/generated-sql/dags/bqetl_test_dag.py)

#### Owner

test@example.org

#### Tags

* repo/bigquery-etl
"""


default_args = {
    "owner": "test@example.org",
    "start_date": datetime.datetime(2020, 1, 1, 0, 0),
    "end_date": None,
    "email": [],
    "depends_on_past": False,
    "retry_delay": datetime.timedelta(seconds=1800),
    "email_on_failure": True,
    "email_on_retry": True,
    "retries": 2,
}

tags = ["repo/bigquery-etl"]

with DAG(
    "bqetl_test_dag",
    default_args=default_args,
    schedule_interval="@daily",
    doc_md=docs,
    tags=tags,
) as dag:

    wait_for_task1 = ExternalTaskSensor(
        task_id="wait_for_task1",
        external_dag_id="external",
        external_task_id="task1",
        check_existence=True,
        mode="reschedule",
        poke_interval=datetime.timedelta(minutes=5),
        allowed_states=ALLOWED_STATES,
        failed_states=FAILED_STATES,
        pool="DATA_ENG_EXTERNALTASKSENSOR",
    )

    test__no_metadata_query__v1 = bigquery_etl_query(
        task_id="test__no_metadata_query__v1",
        destination_table='no_metadata_query_v1${{ macros.ds_format(macros.ds_add(ds, -2), "%Y-%m-%d", "%Y%m%d") }}',
        dataset_id="test",
        project_id="moz-fx-data-test-project",
        owner="test@example.com",
        email=["test@example.com"],
        date_partition_parameter=None,
        depends_on_past=True,
        parameters=["date:DATE:{{macros.ds_add(ds, -2)}}"],
    )

    test__non_incremental_query__v1 = bigquery_etl_query(
        task_id="test__non_incremental_query__v1",
        destination_table='non_incremental_query_v1${{ macros.ds_format(macros.ds_add(ds, -2), "%Y-%m-%d", "%Y%m%d") }}',
        dataset_id="test",
        project_id="moz-fx-data-test-project",
        owner="test@example.com",
        email=["test@example.com"],
        date_partition_parameter=None,
        depends_on_past=True,
        parameters=["date:DATE:{{macros.ds_add(ds, -2)}}"],
    )

    test__no_metadata_query__v1.set_upstream(wait_for_task1)

    test__non_incremental_query__v1.set_upstream(wait_for_task1)
