import datetime
import os

from airflow import models

from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator

# from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook

# [START composer_simple_define_dag]

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.datetime(2019, 3, 1),
    'email': ['jonas.mack@cognite.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'task_concurrency': 1,
    'retries': 0,
    # 'retry_delay': datetime.timedelta(minutes=10),
    'dataflow_default_options': {
        'project': '{{ gcp_project_name }}',
        'region': 'europe-west1',
        'tempLocation': 'gs://{{ job_configs_bucket }}/temp',
        'additionalExperiments': [
            'shuffle_mode=service',
            'enable_stackdriver_agent_metrics'
        ]
    }

    # 'end_date': datetime(2016, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
        '{{ dag_name }}', # Needs to be unique in case schedule_interval ='@once'
        catchup=False,
        default_args=default_dag_args,
        schedule_interval='{{ schedule }}',
        max_active_runs=1,
        concurrency=1) as dag:
    # [END composer_simple_define_dag]
    # [START composer_simple_operators]
    runExtractCdpAssetsFull = DataflowTemplateOperator(
        task_id='forge-custom-data-replicator',
        template='gs://{{ job_definitions_bucket }}/dataflow/template/replicate-{{ data_type }}',
        poll_sleep=30,
        parameters={
            'cdfInputSecret': '{{ gcp_project_name }}.{{ src_tenant }}',
            'cdfOutputSecret': '{{ gcp_project_name }}.{{ dst_tenant }}',
            'jobConfigFile': 'gs://{{ job_configs_bucket }}/custom_replicator/{{ path }}/{{ job_config_name }}',
            'cdfInputHost': '{{ src_host }}',
            'cdfOutputHost': '{{ dst_host }}'
        }
    )

    # [START composer_simple_relationships]
    # Define the order in which the tasks complete by using the >> and <<

    # [END composer_simple_relationships]
# [END composer_simple]
