Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

from __future__ import absolute_import 

from __future__ import division 

from __future__ import print_function 

from __future__ import unicode_literals 

 

from builtins import object 

import imp 

import inspect 

import logging 

import os 

import sys 

from itertools import chain 

merge = chain.from_iterable 

 

from airflow import configuration 

 

class AirflowPluginException(Exception): 

    pass 

 

class AirflowPlugin(object): 

    name = None 

    operators = [] 

    hooks = [] 

    executors = [] 

    macros = [] 

    admin_views = [] 

    flask_blueprints = [] 

    menu_links = [] 

 

    @classmethod 

    def validate(cls): 

        if not cls.name: 

            raise AirflowPluginException("Your plugin needs a name.") 

 

 

plugins_folder = configuration.get('core', 'plugins_folder') 

if not plugins_folder: 

    plugins_folder = configuration.get('core', 'airflow_home') + '/plugins' 

plugins_folder = os.path.expanduser(plugins_folder) 

 

if plugins_folder not in sys.path: 

    sys.path.append(plugins_folder) 

 

plugins = [] 

 

# Crawl through the plugins folder to find AirflowPlugin derivatives 

for root, dirs, files in os.walk(plugins_folder, followlinks=True): 

    for f in files: 

        try: 

            filepath = os.path.join(root, f) 

            if not os.path.isfile(filepath): 

                continue 

            mod_name, file_ext = os.path.splitext( 

                os.path.split(filepath)[-1]) 

            if file_ext != '.py': 

                continue 

            namespace = root.replace('/', '__') + '_' + mod_name 

            m = imp.load_source(namespace, filepath) 

            for obj in list(m.__dict__.values()): 

                if ( 

                        inspect.isclass(obj) and 

                        issubclass(obj, AirflowPlugin) and 

                        obj is not AirflowPlugin): 

                    obj.validate() 

                    if obj not in plugins: 

                        plugins.append(obj) 

 

        except Exception as e: 

            logging.exception(e) 

            logging.error('Failed to import plugin ' + filepath) 

 

operators = merge([p.operators for p in plugins]) 

hooks = merge([p.hooks for p in plugins]) 

executors = merge([p.executors for p in plugins]) 

macros = merge([p.macros for p in plugins]) 

admin_views = merge([p.admin_views for p in plugins]) 

flask_blueprints = merge([p.flask_blueprints for p in plugins]) 

menu_links = merge([p.menu_links for p in plugins])