Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

# -*- coding: utf-8 -*- 

# 

# Licensed under the Apache License, Version 2.0 (the "License"); 

# you may not use this file except in compliance with the License. 

# You may obtain a copy of the License at 

# 

# http://www.apache.org/licenses/LICENSE-2.0 

# 

# Unless required by applicable law or agreed to in writing, software 

# distributed under the License is distributed on an "AS IS" BASIS, 

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

# See the License for the specific language governing permissions and 

# limitations under the License. 

# 

from __future__ import absolute_import 

from __future__ import division 

from __future__ import print_function 

from __future__ import unicode_literals 

 

from builtins import input 

from past.builtins import basestring 

from datetime import datetime 

import imp 

import logging 

import os 

import re 

import sys 

import warnings 

 

from airflow.exceptions import AirflowException 

 

 

def validate_key(k, max_length=250): 

if not isinstance(k, basestring): 

raise TypeError("The key has to be a string") 

elif len(k) > max_length: 

raise AirflowException( 

"The key has to be less than {0} characters".format(max_length)) 

elif not re.match(r'^[A-Za-z0-9_\-\.]+$', k): 

raise AirflowException( 

"The key ({k}) has to be made of alphanumeric characters, dashes, " 

"dots and underscores exclusively".format(**locals())) 

else: 

return True 

 

 

def alchemy_to_dict(obj): 

""" 

Transforms a SQLAlchemy model instance into a dictionary 

""" 

if not obj: 

return None 

d = {} 

for c in obj.__table__.columns: 

value = getattr(obj, c.name) 

if type(value) == datetime: 

value = value.isoformat() 

d[c.name] = value 

return d 

 

 

def ask_yesno(question): 

yes = set(['yes', 'y']) 

no = set(['no', 'n']) 

 

done = False 

print(question) 

while not done: 

choice = input().lower() 

if choice in yes: 

return True 

elif choice in no: 

return False 

else: 

print("Please respond by yes or no.") 

 

def is_in(obj, l): 

""" 

Checks whether an object is one of the item in the list. 

This is different from ``in`` because ``in`` uses __cmp__ when 

present. Here we change based on the object itself 

""" 

for item in l: 

if item is obj: 

return True 

return False 

 

 

def is_container(obj): 

""" 

Test if an object is a container (iterable) but not a string 

""" 

return hasattr(obj, '__iter__') and not isinstance(obj, basestring) 

 

 

def as_tuple(obj): 

""" 

If obj is a container, returns obj as a tuple. 

Otherwise, returns a tuple containing obj. 

""" 

if is_container(obj): 

return tuple(obj) 

else: 

return tuple([obj]) 

 

 

def chain(*tasks): 

""" 

Given a number of tasks, builds a dependency chain. 

 

chain(task_1, task_2, task_3, task_4) 

 

is equivalent to 

 

task_1.set_downstream(task_2) 

task_2.set_downstream(task_3) 

task_3.set_downstream(task_4) 

""" 

for up_task, down_task in zip(tasks[:-1], tasks[1:]): 

up_task.set_downstream(down_task) 

 

 

def pprinttable(rows): 

"""Returns a pretty ascii table from tuples 

 

If namedtuple are used, the table will have headers 

""" 

if not rows: 

return 

if hasattr(rows[0], '_fields'): # if namedtuple 

headers = rows[0]._fields 

else: 

headers = ["col{}".format(i) for i in range(len(rows[0]))] 

lens = [len(s) for s in headers] 

 

for row in rows: 

for i in range(len(rows[0])): 

slenght = len("{}".format(row[i])) 

if slenght > lens[i]: 

lens[i] = slenght 

formats = [] 

hformats = [] 

for i in range(len(rows[0])): 

if isinstance(rows[0][i], int): 

formats.append("%%%dd" % lens[i]) 

else: 

formats.append("%%-%ds" % lens[i]) 

hformats.append("%%-%ds" % lens[i]) 

pattern = " | ".join(formats) 

hpattern = " | ".join(hformats) 

separator = "-+-".join(['-' * n for n in lens]) 

s = "" 

s += separator + '\n' 

s += (hpattern % tuple(headers)) + '\n' 

s += separator + '\n' 

def f(t): 

return "{}".format(t) if isinstance(t, basestring) else t 

for line in rows: 

s += pattern % tuple(f(t) for t in line) + '\n' 

s += separator + '\n' 

return s 

 

 

class AirflowImporter(object): 

""" 

Importer that dynamically loads a class and module from its parent. This 

allows Airflow to support `from airflow.operators.bash_operator import 

BashOperator` even though BashOperator is actually in 

airflow.operators.bash_operator. 

 

The importer also takes over for the parent_module by wrapping it. This is 

required to support attribute-based usage: 

 

from airflow import operators 

operators.BashOperator(...) 

""" 

 

def __init__(self, parent_module, module_attributes): 

""" 

:param parent_module: The string package name of the parent module. For 

example, 'airflow.operators' 

:type parent_module: string 

:param module_attributes: The file to class mappings for all importable 

classes. 

:type module_attributes: string 

""" 

self._parent_module = parent_module 

self._attribute_modules = self._build_attribute_modules(module_attributes) 

self._loaded_modules = {} 

 

# Wrap the module so we can take over __getattr__. 

sys.modules[parent_module.__name__] = self 

 

@staticmethod 

def _build_attribute_modules(module_attributes): 

""" 

Flips and flattens the module_attributes dictionary from: 

 

module => [Attribute, ...] 

 

To: 

 

Attribute => module 

 

This is useful so that we can find the module to use, given an 

attribute. 

""" 

attribute_modules = {} 

 

for module, attributes in list(module_attributes.items()): 

for attribute in attributes: 

attribute_modules[attribute] = module 

 

return attribute_modules 

 

def _load_attribute(self, attribute): 

""" 

Load the class attribute if it hasn't been loaded yet, and return it. 

""" 

module = self._attribute_modules.get(attribute, False) 

 

if not module: 

# This shouldn't happen. The check happens in find_modules, too. 

raise ImportError(attribute) 

elif module not in self._loaded_modules: 

# Note that it's very important to only load a given modules once. 

# If they are loaded more than once, the memory reference to the 

# class objects changes, and Python thinks that an object of type 

# Foo that was declared before Foo's module was reloaded is no 

# longer the same type as Foo after it's reloaded. 

path = os.path.realpath(self._parent_module.__file__) 

folder = os.path.dirname(path) 

f, filename, description = imp.find_module(module, [folder]) 

self._loaded_modules[module] = imp.load_module(module, f, filename, description) 

 

# This functionality is deprecated, and AirflowImporter should be 

# removed in 2.0. 

warnings.warn( 

"Importing {i} directly from {m} has been " 

"deprecated. Please import from " 

"'{m}.[operator_module]' instead. Support for direct " 

"imports will be dropped entirely in Airflow 2.0.".format( 

i=attribute, m=self._parent_module), 

DeprecationWarning) 

 

loaded_module = self._loaded_modules[module] 

 

return getattr(loaded_module, attribute) 

 

def __getattr__(self, attribute): 

""" 

Get an attribute from the wrapped module. If the attribute doesn't 

exist, try and import it as a class from a submodule. 

 

This is a Python trick that allows the class to pretend it's a module, 

so that attribute-based usage works: 

 

from airflow import operators 

operators.BashOperator(...) 

 

It also allows normal from imports to work: 

 

from airflow.operators.bash_operator import BashOperator 

""" 

if hasattr(self._parent_module, attribute): 

# Always default to the parent module if the attribute exists. 

return getattr(self._parent_module, attribute) 

elif attribute in self._attribute_modules: 

# Try and import the attribute if it's got a module defined. 

loaded_attribute = self._load_attribute(attribute) 

setattr(self, attribute, loaded_attribute) 

return loaded_attribute 

 

raise AttributeError