Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

from __future__ import absolute_import 

from __future__ import division 

from __future__ import print_function 

from __future__ import unicode_literals 

 

import logging 

import os 

import sys 

 

from sqlalchemy.orm import scoped_session, sessionmaker 

from sqlalchemy import create_engine 

 

from airflow import configuration 

 

HEADER = """\ 

  ____________       _____________ 

____    |__( )_________  __/__  /________      __ 

____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / / 

___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ / 

_/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/ 

""" 

 

BASE_LOG_URL = '/admin/airflow/log' 

AIRFLOW_HOME = os.path.expanduser(configuration.get('core', 'AIRFLOW_HOME')) 

SQL_ALCHEMY_CONN = configuration.get('core', 'SQL_ALCHEMY_CONN') 

LOGGING_LEVEL = logging.INFO 

DAGS_FOLDER = os.path.expanduser(configuration.get('core', 'DAGS_FOLDER')) 

 

engine_args = {} 

if 'sqlite' not in SQL_ALCHEMY_CONN: 

    # Engine args not supported by sqlite 

    engine_args['pool_size'] = 50 

    engine_args['pool_recycle'] = 3600 

 

engine = create_engine( 

    SQL_ALCHEMY_CONN, **engine_args) 

Session = scoped_session( 

    sessionmaker(autocommit=False, autoflush=False, bind=engine)) 

 

# can't move this to configuration due to ConfigParser interpolation 

LOG_FORMAT = ( 

    '[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s') 

SIMPLE_LOG_FORMAT = '%(asctime)s %(levelname)s - %(message)s' 

 

 

def policy(task_instance): 

    """ 

    This policy setting allows altering task instances right before they 

    are executed. It allows administrator to rewire some task parameters. 

 

    Note that the ``TaskInstance`` object has an attribute ``task`` pointing 

    to its related task object, that in turns has a reference to the DAG 

    object. So you can use the attributes of all of these to define your 

    policy. 

 

    To define policy, add a ``airflow_local_settings`` module 

    to your PYTHONPATH that defines this ``policy`` function. It receives 

    a ``TaskInstance`` object and can alter it where needed. 

 

    Here are a few examples of how this can be useful: 

 

    * You could enforce a specific queue (say the ``spark`` queue) 

        for tasks using the ``SparkOperator`` to make sure that these 

        task instances get wired to the right workers 

    * You could force all task instances running on an 

        ``execution_date`` older than a week old to run in a ``backfill`` 

        pool. 

    * ... 

    """ 

    pass 

 

 

try: 

    from airflow_local_settings import * 

    logging.info("Loaded airflow_local_settings.") 

except: 

    pass