Coverage for airflow/utils/helpers.py : 22%
Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
|
# -*- coding: utf-8 -*- # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #
if not isinstance(k, basestring): raise TypeError("The key has to be a string") elif len(k) > max_length: raise AirflowException( "The key has to be less than {0} characters".format(max_length)) elif not re.match(r'^[A-Za-z0-9_\-\.]+$', k): raise AirflowException( "The key ({k}) has to be made of alphanumeric characters, dashes, " "dots and underscores exclusively".format(**locals())) else: return True
""" Transforms a SQLAlchemy model instance into a dictionary """ if not obj: return None d = {} for c in obj.__table__.columns: value = getattr(obj, c.name) if type(value) == datetime: value = value.isoformat() d[c.name] = value return d
yes = set(['yes', 'y']) no = set(['no', 'n'])
done = False print(question) while not done: choice = input().lower() if choice in yes: return True elif choice in no: return False else: print("Please respond by yes or no.")
""" Checks whether an object is one of the item in the list. This is different from ``in`` because ``in`` uses __cmp__ when present. Here we change based on the object itself """ for item in l: if item is obj: return True return False
""" Test if an object is a container (iterable) but not a string """ return hasattr(obj, '__iter__') and not isinstance(obj, basestring)
""" If obj is a container, returns obj as a tuple. Otherwise, returns a tuple containing obj. """ if is_container(obj): return tuple(obj) else: return tuple([obj])
""" Given a number of tasks, builds a dependency chain.
chain(task_1, task_2, task_3, task_4)
is equivalent to
task_1.set_downstream(task_2) task_2.set_downstream(task_3) task_3.set_downstream(task_4) """ for up_task, down_task in zip(tasks[:-1], tasks[1:]): up_task.set_downstream(down_task)
"""Returns a pretty ascii table from tuples
If namedtuple are used, the table will have headers """ if not rows: return if hasattr(rows[0], '_fields'): # if namedtuple headers = rows[0]._fields else: headers = ["col{}".format(i) for i in range(len(rows[0]))] lens = [len(s) for s in headers]
for row in rows: for i in range(len(rows[0])): slenght = len("{}".format(row[i])) if slenght > lens[i]: lens[i] = slenght formats = [] hformats = [] for i in range(len(rows[0])): if isinstance(rows[0][i], int): formats.append("%%%dd" % lens[i]) else: formats.append("%%-%ds" % lens[i]) hformats.append("%%-%ds" % lens[i]) pattern = " | ".join(formats) hpattern = " | ".join(hformats) separator = "-+-".join(['-' * n for n in lens]) s = "" s += separator + '\n' s += (hpattern % tuple(headers)) + '\n' s += separator + '\n' def f(t): return "{}".format(t) if isinstance(t, basestring) else t for line in rows: s += pattern % tuple(f(t) for t in line) + '\n' s += separator + '\n' return s
""" Importer that dynamically loads a class and module from its parent. This allows Airflow to support `from airflow.operators.bash_operator import BashOperator` even though BashOperator is actually in airflow.operators.bash_operator.
The importer also takes over for the parent_module by wrapping it. This is required to support attribute-based usage:
from airflow import operators operators.BashOperator(...) """
""" :param parent_module: The string package name of the parent module. For example, 'airflow.operators' :type parent_module: string :param module_attributes: The file to class mappings for all importable classes. :type module_attributes: string """ self._parent_module = parent_module self._attribute_modules = self._build_attribute_modules(module_attributes) self._loaded_modules = {}
# Wrap the module so we can take over __getattr__. sys.modules[parent_module.__name__] = self
def _build_attribute_modules(module_attributes): """ Flips and flattens the module_attributes dictionary from:
module => [Attribute, ...]
To:
Attribute => module
This is useful so that we can find the module to use, given an attribute. """ attribute_modules = {}
for module, attributes in list(module_attributes.items()): for attribute in attributes: attribute_modules[attribute] = module
return attribute_modules
""" Load the class attribute if it hasn't been loaded yet, and return it. """ module = self._attribute_modules.get(attribute, False)
if not module: # This shouldn't happen. The check happens in find_modules, too. raise ImportError(attribute) elif module not in self._loaded_modules: # Note that it's very important to only load a given modules once. # If they are loaded more than once, the memory reference to the # class objects changes, and Python thinks that an object of type # Foo that was declared before Foo's module was reloaded is no # longer the same type as Foo after it's reloaded. path = os.path.realpath(self._parent_module.__file__) folder = os.path.dirname(path) f, filename, description = imp.find_module(module, [folder]) self._loaded_modules[module] = imp.load_module(module, f, filename, description)
# This functionality is deprecated, and AirflowImporter should be # removed in 2.0. warnings.warn( "Importing {i} directly from {m} has been " "deprecated. Please import from " "'{m}.[operator_module]' instead. Support for direct " "imports will be dropped entirely in Airflow 2.0.".format( i=attribute, m=self._parent_module), DeprecationWarning)
loaded_module = self._loaded_modules[module]
return getattr(loaded_module, attribute)
""" Get an attribute from the wrapped module. If the attribute doesn't exist, try and import it as a class from a submodule.
This is a Python trick that allows the class to pretend it's a module, so that attribute-based usage works:
from airflow import operators operators.BashOperator(...)
It also allows normal from imports to work:
from airflow.operators.bash_operator import BashOperator """ if hasattr(self._parent_module, attribute): # Always default to the parent module if the attribute exists. return getattr(self._parent_module, attribute) elif attribute in self._attribute_modules: # Try and import the attribute if it's got a module defined. loaded_attribute = self._load_attribute(attribute) setattr(self, attribute, loaded_attribute) return loaded_attribute
raise AttributeError |