# Generated via https://github.com/mozilla/bigquery-etl/blob/master/bigquery_etl/query_scheduling/generate_airflow_dags.py

from airflow import DAG
from airflow.operators.sensors import ExternalTaskSensor
import datetime
from operators.gcp_container_operator import GKEPodOperator
from utils.gcp import gke_command

default_args = {
    "owner": "test@example.org",
    "start_date": datetime.datetime(2020, 1, 1, 0, 0),
    "end_date": None,
    "email": ["test@example.org"],
    "depends_on_past": False,
    "retry_delay": datetime.timedelta(seconds=3600),
    "email_on_failure": True,
    "email_on_retry": True,
    "retries": 2,
}

with DAG(
    "bqetl_public_data_json", default_args=default_args, schedule_interval="@daily"
) as dag:
    docker_image = "mozilla/bigquery-etl:latest"

    export_public_data_json_test__non_incremental_query__v1 = GKEPodOperator(
        task_id="export_public_data_json_test__non_incremental_query__v1",
        name="export_public_data_json_test__non_incremental_query__v1",
        arguments=["script/publish_public_data_json"]
        + [
            "--query_file=sql/moz-fx-data-test-project/test/non_incremental_query_v1/query.sql"
        ]
        + ["--destination_table=non_incremental_query${{ds_nodash}}"]
        + ["--dataset_id=test"]
        + ["--project_id=moz-fx-data-test-project"]
        + ["--parameter=submission_date:DATE:{{ds}}"],
        image=docker_image,
        dag=dag,
    )

    wait_for_test__non_incremental_query__v1 = ExternalTaskSensor(
        task_id="wait_for_test__non_incremental_query__v1",
        external_dag_id="bqetl_core",
        external_task_id="test__non_incremental_query__v1",
        check_existence=True,
        mode="reschedule",
        pool="DATA_ENG_EXTERNALTASKSENSOR",
    )

    export_public_data_json_test__non_incremental_query__v1.set_upstream(
        wait_for_test__non_incremental_query__v1
    )

    public_data_gcs_metadata = gke_command(
        task_id="public_data_gcs_metadata",
        command=["script/publish_public_data_gcs_metadata"],
        docker_image=docker_image,
        dag=dag,
    )

    public_data_gcs_metadata.set_upstream(
        [
            export_public_data_json_test__non_incremental_query__v1,
        ]
    )
