Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

from builtins import range 

from builtins import object 

import logging 

 

from airflow.utils import State 

from airflow import configuration 

 

PARALLELISM = configuration.getint('core', 'PARALLELISM') 

 

 

class BaseExecutor(object): 

 

    def __init__(self, parallelism=PARALLELISM): 

        """ 

        Class to derive in order to interface with executor-type systems 

        like Celery, Mesos, Yarn and the likes. 

 

        :param parallelism: how many jobs should run at one time. Set to 

            ``0`` for infinity 

        :type parallelism: int 

        """ 

        self.parallelism = parallelism 

        self.queued_tasks = {} 

        self.running = {} 

        self.event_buffer = {} 

 

    def start(self):  # pragma: no cover 

        """ 

        Executors may need to get things started. For example LocalExecutor 

        starts N workers. 

        """ 

        pass 

 

    def queue_command(self, key, command, priority=1, queue=None): 

        if key not in self.queued_tasks and key not in self.running: 

            logging.info("Adding to queue: " + command) 

            self.queued_tasks[key] = (command, priority, queue) 

 

    def queue_task_instance( 

            self, task_instance, mark_success=False, pickle_id=None, 

            force=False, ignore_dependencies=False, task_start_date=None, 

            pool=None): 

        pool = pool or task_instance.pool 

        command = task_instance.command( 

            local=True, 

            mark_success=mark_success, 

            force=force, 

            ignore_dependencies=ignore_dependencies, 

            task_start_date=task_start_date, 

            pool=pool, 

            pickle_id=pickle_id) 

        self.queue_command( 

            task_instance.key, 

            command, 

            priority=task_instance.task.priority_weight_total, 

            queue=task_instance.task.queue) 

 

    def sync(self): 

        """ 

        Sync will get called periodically by the heartbeat method. 

        Executors should override this to perform gather statuses. 

        """ 

        pass 

 

    def heartbeat(self): 

        # Calling child class sync method 

        logging.debug("Calling the {} sync method".format(self.__class__)) 

        self.sync() 

 

        # Triggering new jobs 

        if not self.parallelism: 

            open_slots = len(self.queued_tasks) 

        else: 

            open_slots = self.parallelism - len(self.running) 

 

        logging.debug("{} running task instances".format(len(self.running))) 

        logging.debug("{} in queue".format(len(self.queued_tasks))) 

        logging.debug("{} open slots".format(open_slots)) 

 

        sorted_queue = sorted( 

            [(k, v) for k, v in self.queued_tasks.items()], 

            key=lambda x: x[1][1], 

            reverse=True) 

        for i in range(min((open_slots, len(self.queued_tasks)))): 

            key, (command, priority, queue) = sorted_queue.pop(0) 

            self.running[key] = command 

            self.queued_tasks.pop(key) 

            self.execute_async(key, command=command, queue=queue) 

 

    def change_state(self, key, state): 

        self.running.pop(key) 

        self.event_buffer[key] = state 

 

    def fail(self, key): 

        self.change_state(key, State.FAILED) 

 

    def success(self, key): 

        self.change_state(key, State.SUCCESS) 

 

    def get_event_buffer(self): 

        """ 

        Returns and flush the event buffer 

        """ 

        d = self.event_buffer 

        self.event_buffer = {} 

        return d 

 

    def execute_async(self, key, command, queue=None):  # pragma: no cover 

        """ 

        This method will execute the command asynchronously. 

        """ 

        raise NotImplementedError() 

 

    def end(self):  # pragma: no cover 

        """ 

        This method is called when the caller is done submitting job and is 

        wants to wait synchronously for the job submitted previously to be 

        all done. 

        """ 

        raise NotImplementedError()