Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

from __future__ import print_function 

from builtins import range 

from airflow.operators import PythonOperator 

from airflow.models import DAG 

from datetime import datetime, timedelta 

 

import time 

from pprint import pprint 

 

seven_days_ago = datetime.combine(datetime.today() - timedelta(7), 

                                  datetime.min.time()) 

 

args = { 

    'owner': 'airflow', 

    'start_date': seven_days_ago, 

} 

 

dag = DAG(dag_id='example_python_operator', default_args=args) 

 

 

def my_sleeping_function(random_base): 

    '''This is a function that will run within the DAG execution''' 

    time.sleep(random_base) 

 

 

def print_context(ds, **kwargs): 

    pprint(kwargs) 

    print(ds) 

    return 'Whatever you return gets printed in the logs' 

 

run_this = PythonOperator( 

    task_id='print_the_context', 

    provide_context=True, 

    python_callable=print_context, 

    dag=dag) 

 

for i in range(10): 

    ''' 

    Generating 10 sleeping task, sleeping from 0 to 9 seconds 

    respectively 

    ''' 

    task = PythonOperator( 

        task_id='sleep_for_'+str(i), 

        python_callable=my_sleeping_function, 

        op_kwargs={'random_base': i}, 

        dag=dag) 

 

    task.set_upstream(run_this)