import datetime
import os

from airflow import models

from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator

# from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook

# [START composer_simple_define_dag]

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.datetime(2019, 3, 1),
    'email': ['jonas.mack@cognite.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'task_concurrency': 1,
    'retries': 0,
    # 'retry_delay': datetime.timedelta(minutes=10),
    'dataflow_default_options': {
        'project': '{{ gcp_project_name }}',
        'region': 'europe-west1',
        'tempLocation': 'gs://{{ job_configs_bucket }}/temp',
        'additionalExperiments': [
            'shuffle_mode=service',
            'enable_stackdriver_agent_metrics'
        ]
    }

    # 'end_date': datetime(2016, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
        '{{ dag_name }}',
        catchup=False,
        default_args=default_dag_args,
        schedule_interval='{{schedule }}',
        max_active_runs=1,
        concurrency=1) as dag:
    # [END composer_simple_define_dag]
    # [START composer_simple_operators]
    runExtractCdpAssetsFull = DataflowTemplateOperator(
        task_id='forge-extractor-workflow-replicator',
        template='gs://{{ job_definitions_bucket }}/dataflow/template/replicate-{{ job }}',
        poll_sleep=30,
        parameters={
            'cdfInputSecret': '{{ gcp_project_name }}.{{ tenant }}-{{ src_env }}',
            'cdfOutputSecret': '{{ gcp_project_name }}.{{ tenant }}-{{ dst_env }}',
            'jobConfigFile': 'gs://{{ job_configs_bucket }}/extractor_workflow/{{ tenant }}/{{ src_env }}-{{ dst_env }}/{{ extractor_id }}/{{ job }}-config.toml'
        }
    )

    # [START composer_simple_relationships]
    # Define the order in which the tasks complete by using the >> and <<

    # [END composer_simple_relationships]
# [END composer_simple]
