Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

# -*- coding: utf-8 -*- 

# 

# Licensed under the Apache License, Version 2.0 (the "License"); 

# you may not use this file except in compliance with the License. 

# You may obtain a copy of the License at 

# 

# http://www.apache.org/licenses/LICENSE-2.0 

# 

# Unless required by applicable law or agreed to in writing, software 

# distributed under the License is distributed on an "AS IS" BASIS, 

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

# See the License for the specific language governing permissions and 

# limitations under the License. 

# 

from __future__ import absolute_import 

from __future__ import division 

from __future__ import print_function 

from __future__ import unicode_literals 

 

from builtins import object 

import imp 

import inspect 

import logging 

import os 

import re 

import sys 

from itertools import chain 

merge = chain.from_iterable 

 

from airflow import configuration 

from airflow.utils.timeout import timeout 

 

 

class AirflowPluginException(Exception): 

pass 

 

 

class AirflowPlugin(object): 

name = None 

operators = [] 

hooks = [] 

executors = [] 

macros = [] 

admin_views = [] 

flask_blueprints = [] 

menu_links = [] 

 

@classmethod 

def validate(cls): 

if not cls.name: 

raise AirflowPluginException("Your plugin needs a name.") 

 

 

plugins_folder = configuration.get('core', 'plugins_folder') 

if not plugins_folder: 

plugins_folder = configuration.get('core', 'airflow_home') + '/plugins' 

plugins_folder = os.path.expanduser(plugins_folder) 

 

if plugins_folder not in sys.path: 

sys.path.append(plugins_folder) 

 

plugins = [] 

 

norm_pattern = re.compile(r'[/|.]') 

plugin_import_timeout = configuration.getint('core', 'plugin_import_timeout') 

 

# Crawl through the plugins folder to find AirflowPlugin derivatives 

for root, dirs, files in os.walk(plugins_folder, followlinks=True): 

logging.debug('Loading plugin located in {}'.format(root)) 

for f in files: 

print("-="*40) 

print(f) 

try: 

filepath = os.path.join(root, f) 

if not os.path.isfile(filepath): 

continue 

mod_name, file_ext = os.path.splitext( 

os.path.split(filepath)[-1]) 

if file_ext != '.py': 

continue 

 

# normalize root path as namespace 

namespace = '_'.join([re.sub(norm_pattern, '__', root), mod_name]) 

 

with timeout(plugin_import_timeout): 

m = imp.load_source(namespace, filepath) 

for obj in list(m.__dict__.values()): 

if ( 

inspect.isclass(obj) and 

issubclass(obj, AirflowPlugin) and 

obj is not AirflowPlugin): 

obj.validate() 

if obj not in plugins: 

plugins.append(obj) 

 

except Exception as e: 

logging.exception(e) 

logging.error('Failed to import plugin ' + filepath) 

 

 

def make_module(name, objects): 

name = name.lower() 

module = imp.new_module(name) 

module._name = name.split('.')[-1] 

module._objects = objects 

for o in objects: 

module.__dict__.update((o.__name__, o)) 

return module 

 

operators, hooks, executors, macros, admin_views = [], [], [], [], [] 

flask_blueprints, menu_links = [], [] 

 

for p in plugins: 

operators.append(make_module('airflow.operators.' + p.name, p.operators)) 

hooks.append(make_module('airflow.hooks.' + p.name, p.hooks)) 

executors.append(make_module('airflow.executors.' + p.name, p.executors)) 

macros.append(make_module('airflow.macros.' + p.name, p.macros)) 

admin_views.append( 

make_module('airflow.www.admin_views' + p.name, p.admin_views)) 

flask_blueprints.append( 

make_module( 

'airflow.www.flask_blueprints' + p.name, p.flask_blueprints)) 

menu_links.append( 

make_module('airflow.www.menu_links' + p.name, p.menu_links))