Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

# -*- coding: utf-8 -*- 

# 

# Licensed under the Apache License, Version 2.0 (the "License"); 

# you may not use this file except in compliance with the License. 

# You may obtain a copy of the License at 

# 

# http://www.apache.org/licenses/LICENSE-2.0 

# 

# Unless required by applicable law or agreed to in writing, software 

# distributed under the License is distributed on an "AS IS" BASIS, 

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

# See the License for the specific language governing permissions and 

# limitations under the License. 

# 

import os 

 

# inspect.signature is only available in Python 3. funcsigs.signature is 

# a backport. 

try: 

import inspect 

signature = inspect.signature 

except AttributeError: 

import funcsigs 

signature = funcsigs.signature 

 

from copy import copy 

from functools import wraps 

 

from airflow.exceptions import AirflowException 

 

 

def apply_defaults(func): 

""" 

Function decorator that Looks for an argument named "default_args", and 

fills the unspecified arguments from it. 

 

Since python2.* isn't clear about which arguments are missing when 

calling a function, and that this can be quite confusing with multi-level 

inheritance and argument defaults, this decorator also alerts with 

specific information about the missing arguments. 

""" 

@wraps(func) 

def wrapper(*args, **kwargs): 

if len(args) > 1: 

raise AirflowException( 

"Use keyword arguments when initializing operators") 

dag_args = {} 

dag_params = {} 

import airflow.models 

if kwargs.get('dag', None) or airflow.models._CONTEXT_MANAGER_DAG: 

dag = kwargs.get('dag', None) or airflow.models._CONTEXT_MANAGER_DAG 

dag_args = copy(dag.default_args) or {} 

dag_params = copy(dag.params) or {} 

 

params = {} 

if 'params' in kwargs: 

params = kwargs['params'] 

dag_params.update(params) 

 

default_args = {} 

if 'default_args' in kwargs: 

default_args = kwargs['default_args'] 

if 'params' in default_args: 

dag_params.update(default_args['params']) 

del default_args['params'] 

 

dag_args.update(default_args) 

default_args = dag_args 

 

sig = signature(func) 

non_optional_args = [ 

name for (name, param) in sig.parameters.items() 

if param.default == param.empty and 

param.name != 'self' and 

param.kind not in (param.VAR_POSITIONAL, param.VAR_KEYWORD)] 

for arg in sig.parameters: 

if arg in default_args and arg not in kwargs: 

kwargs[arg] = default_args[arg] 

missing_args = list(set(non_optional_args) - set(kwargs)) 

if missing_args: 

msg = "Argument {0} is required".format(missing_args) 

raise AirflowException(msg) 

 

kwargs['params'] = dag_params 

 

result = func(*args, **kwargs) 

return result 

return wrapper 

 

if 'BUILDING_AIRFLOW_DOCS' in os.environ: 

# Monkey patch hook to get good function headers while building docs 

apply_defaults = lambda x: x