Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

# -*- coding: utf-8 -*- 

# 

# Licensed under the Apache License, Version 2.0 (the "License"); 

# you may not use this file except in compliance with the License. 

# You may obtain a copy of the License at 

# 

# http://www.apache.org/licenses/LICENSE-2.0 

# 

# Unless required by applicable law or agreed to in writing, software 

# distributed under the License is distributed on an "AS IS" BASIS, 

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

# See the License for the specific language governing permissions and 

# limitations under the License. 

# 

from __future__ import absolute_import 

from __future__ import division 

from __future__ import print_function 

from __future__ import unicode_literals 

 

import logging 

import os 

import sys 

 

from sqlalchemy import create_engine 

from sqlalchemy.orm import scoped_session, sessionmaker 

 

from airflow import configuration as conf 

 

 

class DummyStatsLogger(object): 

@classmethod 

def incr(cls, stat, count=1, rate=1): 

pass 

@classmethod 

def decr(cls, stat, count=1, rate=1): 

pass 

@classmethod 

def gauge(cls, stat, value, rate=1, delta=False): 

pass 

@classmethod 

def timing(cls, stat, dt): 

pass 

 

Stats = DummyStatsLogger 

 

if conf.getboolean('scheduler', 'statsd_on'): 

from statsd import StatsClient 

statsd = StatsClient( 

host=conf.get('scheduler', 'statsd_host'), 

port=conf.getint('scheduler', 'statsd_port'), 

prefix=conf.get('scheduler', 'statsd_prefix')) 

Stats = statsd 

else: 

Stats = DummyStatsLogger 

 

 

 

HEADER = """\ 

____________ _____________ 

____ |__( )_________ __/__ /________ __ 

____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / / 

___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ / 

_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/ 

""" 

 

BASE_LOG_URL = '/admin/airflow/log' 

AIRFLOW_HOME = os.path.expanduser(conf.get('core', 'AIRFLOW_HOME')) 

SQL_ALCHEMY_CONN = conf.get('core', 'SQL_ALCHEMY_CONN') 

LOGGING_LEVEL = logging.INFO 

DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER')) 

 

engine_args = {} 

if 'sqlite' not in SQL_ALCHEMY_CONN: 

# Engine args not supported by sqlite 

engine_args['pool_size'] = conf.getint('core', 'SQL_ALCHEMY_POOL_SIZE') 

engine_args['pool_recycle'] = conf.getint('core', 

'SQL_ALCHEMY_POOL_RECYCLE') 

 

engine = create_engine(SQL_ALCHEMY_CONN, **engine_args) 

Session = scoped_session( 

sessionmaker(autocommit=False, autoflush=False, bind=engine)) 

 

# can't move this to conf due to ConfigParser interpolation 

LOG_FORMAT = ( 

'[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s') 

SIMPLE_LOG_FORMAT = '%(asctime)s %(levelname)s - %(message)s' 

 

 

def policy(task_instance): 

""" 

This policy setting allows altering task instances right before they 

are executed. It allows administrator to rewire some task parameters. 

 

Note that the ``TaskInstance`` object has an attribute ``task`` pointing 

to its related task object, that in turns has a reference to the DAG 

object. So you can use the attributes of all of these to define your 

policy. 

 

To define policy, add a ``airflow_local_settings`` module 

to your PYTHONPATH that defines this ``policy`` function. It receives 

a ``TaskInstance`` object and can alter it where needed. 

 

Here are a few examples of how this can be useful: 

 

* You could enforce a specific queue (say the ``spark`` queue) 

for tasks using the ``SparkOperator`` to make sure that these 

task instances get wired to the right workers 

* You could force all task instances running on an 

``execution_date`` older than a week old to run in a ``backfill`` 

pool. 

* ... 

""" 

pass 

 

 

def configure_logging(): 

logging.root.handlers = [] 

logging.basicConfig( 

format=LOG_FORMAT, stream=sys.stdout, level=LOGGING_LEVEL) 

 

try: 

from airflow_local_settings import * 

logging.info("Loaded airflow_local_settings.") 

except: 

pass 

 

configure_logging()