Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

# -*- coding: utf-8 -*- 

# 

# Licensed under the Apache License, Version 2.0 (the "License"); 

# you may not use this file except in compliance with the License. 

# You may obtain a copy of the License at 

# 

# http://www.apache.org/licenses/LICENSE-2.0 

# 

# Unless required by applicable law or agreed to in writing, software 

# distributed under the License is distributed on an "AS IS" BASIS, 

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

# See the License for the specific language governing permissions and 

# limitations under the License. 

 

from builtins import range 

 

from airflow import configuration 

from airflow.utils.state import State 

from airflow.utils.logging import LoggingMixin 

 

PARALLELISM = configuration.getint('core', 'PARALLELISM') 

 

 

class BaseExecutor(LoggingMixin): 

 

def __init__(self, parallelism=PARALLELISM): 

""" 

Class to derive in order to interface with executor-type systems 

like Celery, Mesos, Yarn and the likes. 

 

:param parallelism: how many jobs should run at one time. Set to 

``0`` for infinity 

:type parallelism: int 

""" 

self.parallelism = parallelism 

self.queued_tasks = {} 

self.running = {} 

self.event_buffer = {} 

 

def start(self): # pragma: no cover 

""" 

Executors may need to get things started. For example LocalExecutor 

starts N workers. 

""" 

pass 

 

def queue_command(self, task_instance, command, priority=1, queue=None): 

key = task_instance.key 

if key not in self.queued_tasks and key not in self.running: 

self.logger.info("Adding to queue: {}".format(command)) 

self.queued_tasks[key] = (command, priority, queue, task_instance) 

 

def queue_task_instance( 

self, 

task_instance, 

mark_success=False, 

pickle_id=None, 

force=False, 

ignore_dependencies=False, 

ignore_depends_on_past=False, 

pool=None): 

pool = pool or task_instance.pool 

command = task_instance.command( 

local=True, 

mark_success=mark_success, 

force=force, 

ignore_dependencies=ignore_dependencies, 

ignore_depends_on_past=ignore_depends_on_past, 

pool=pool, 

pickle_id=pickle_id) 

self.queue_command( 

task_instance, 

command, 

priority=task_instance.task.priority_weight_total, 

queue=task_instance.task.queue) 

 

def sync(self): 

""" 

Sync will get called periodically by the heartbeat method. 

Executors should override this to perform gather statuses. 

""" 

pass 

 

def heartbeat(self): 

 

# Triggering new jobs 

if not self.parallelism: 

open_slots = len(self.queued_tasks) 

else: 

open_slots = self.parallelism - len(self.running) 

 

self.logger.debug("{} running task instances".format(len(self.running))) 

self.logger.debug("{} in queue".format(len(self.queued_tasks))) 

self.logger.debug("{} open slots".format(open_slots)) 

 

sorted_queue = sorted( 

[(k, v) for k, v in self.queued_tasks.items()], 

key=lambda x: x[1][1], 

reverse=True) 

for i in range(min((open_slots, len(self.queued_tasks)))): 

key, (command, _, queue, ti) = sorted_queue.pop(0) 

# TODO(jlowin) without a way to know what Job ran which tasks, 

# there is a danger that another Job started running a task 

# that was also queued to this executor. This is the last chance 

# to check if that hapened. The most probable way is that a 

# Scheduler tried to run a task that was originally queued by a 

# Backfill. This fix reduces the probability of a collision but 

# does NOT eliminate it. 

self.queued_tasks.pop(key) 

ti.refresh_from_db() 

if ti.state != State.RUNNING: 

self.running[key] = command 

self.execute_async(key, command=command, queue=queue) 

else: 

self.logger.debug( 

'Task is already running, not sending to ' 

'executor: {}'.format(key)) 

 

# Calling child class sync method 

self.logger.debug("Calling the {} sync method".format(self.__class__)) 

self.sync() 

 

def change_state(self, key, state): 

self.running.pop(key) 

self.event_buffer[key] = state 

 

def fail(self, key): 

self.change_state(key, State.FAILED) 

 

def success(self, key): 

self.change_state(key, State.SUCCESS) 

 

def get_event_buffer(self): 

""" 

Returns and flush the event buffer 

""" 

d = self.event_buffer 

self.event_buffer = {} 

return d 

 

def execute_async(self, key, command, queue=None): # pragma: no cover 

""" 

This method will execute the command asynchronously. 

""" 

raise NotImplementedError() 

 

def end(self): # pragma: no cover 

""" 

This method is called when the caller is done submitting job and is 

wants to wait synchronously for the job submitted previously to be 

all done. 

""" 

raise NotImplementedError() 

 

def terminate(self): 

""" 

This method is called when the daemon receives a SIGTERM 

""" 

raise NotImplementedError()