Coverage for airflow.plugins_manager : 65%
Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
|
def validate(cls): if not cls.name: raise AirflowPluginException("Your plugin needs a name.")
plugins_folder = configuration.get('core', 'airflow_home') + '/plugins'
# Crawl through the plugins folder to find AirflowPlugin derivatives for f in files: try: filepath = os.path.join(root, f) if not os.path.isfile(filepath): continue mod_name, file_ext = os.path.splitext( os.path.split(filepath)[-1]) if file_ext != '.py': continue namespace = root.replace('/', '__') + '_' + mod_name m = imp.load_source(namespace, filepath) for obj in list(m.__dict__.values()): if ( inspect.isclass(obj) and issubclass(obj, AirflowPlugin) and obj is not AirflowPlugin): obj.validate() if obj not in plugins: plugins.append(obj)
except Exception as e: logging.exception(e) logging.error('Failed to import plugin ' + filepath)
|