Coverage for airflow.utils : 77%
Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
|
""" Static class with task instance states constants and color method to avoid hardcoding. """
QUEUED: 'gray', RUNNING: 'lime', SUCCESS: 'green', SHUTDOWN: 'blue', FAILED: 'red', UP_FOR_RETRY: 'gold', UPSTREAM_FAILED: 'orange', SKIPPED: 'pink', }
def color(cls, state): else: return 'white'
def runnable(cls): None, cls.FAILED, cls.UP_FOR_RETRY, cls.UPSTREAM_FAILED, cls.SKIPPED]
'@hourly': '0 * * * *', '@daily': '0 0 * * *', '@weekly': '0 0 * * 0', '@monthly': '0 0 1 * *', '@yearly': '0 0 1 1 *', }
""" Function decorator that provides a session if it isn't provided. If you want to reuse a session or run the function as part of a database transaction, you pass it to the function, if not this wrapper will create one and close it for you. """ def wrapper(*args, **kwargs):
def ping_connection(dbapi_connection, connection_record, connection_proxy): ''' Disconnect Handling - Pessimistic, taken from: http://docs.sqlalchemy.org/en/rel_0_9/core/pooling.html ''' except: raise exc.DisconnectionError()
session.add(conn) session.commit()
models.Connection( conn_id='airflow_db', conn_type='mysql', host='localhost', login='root', password='', schema='airflow')) models.Connection( conn_id='beeline_default', conn_type='beeline', host='localhost', schema='airflow')) models.Connection( conn_id='local_mysql', conn_type='mysql', host='localhost', login='airflow', password='airflow', schema='airflow')) models.Connection( conn_id='presto_default', conn_type='presto', host='localhost', schema='hive', port=3400)) models.Connection( conn_id='hive_cli_default', conn_type='hive_cli', schema='default',)) models.Connection( conn_id='hiveserver2_default', conn_type='hiveserver2', host='localhost', schema='default', port=10000)) models.Connection( conn_id='metastore_default', conn_type='hive_metastore', host='localhost', port=10001)) models.Connection( conn_id='mysql_default', conn_type='mysql', login='root', host='localhost')) models.Connection( conn_id='postgres_default', conn_type='postgres', login='postgres', schema='airflow', host='localhost')) models.Connection( conn_id='sqlite_default', conn_type='sqlite', host='/tmp/sqlite_default.db')) models.Connection( conn_id='http_default', conn_type='http', host='https://www.google.com/')) models.Connection( conn_id='mssql_default', conn_type='mssql', host='localhost', port=1433)) models.Connection( conn_id='vertica_default', conn_type='vertica', host='localhost', port=5433)) models.Connection( conn_id='webhdfs_default', conn_type='hdfs', host='localhost', port=50070))
# Known event types session.add(KET(know_event_type='Holiday')) session.add(KET(know_event_type='Outage')) KET.know_event_type == 'Natural Disaster').first(): session.add(KET(know_event_type='Natural Disaster')) KET.know_event_type == 'Marketing Campaign').first(): session.add(KET(know_event_type='Marketing Campaign'))
chart = Chart( label=chart_label, conn_id='airflow_db', chart_type='bar', x_is_date=False, sql=( "SELECT state, COUNT(1) as number " "FROM task_instance " "WHERE dag_id LIKE 'example%' " "GROUP BY state"), ) session.add(chart)
configuration.get('core', 'SQL_ALCHEMY_CONN'))
''' Clear out the database ''' from airflow import models
logging.info("Dropping tables that exist") models.Base.metadata.drop_all(settings.engine) mc = MigrationContext.configure(settings.engine) if mc._version.exists(settings.engine): mc._version.drop(settings.engine) initdb()
raise TypeError("The key has to be a string") raise AirflowException( "The key has to be less than {0} characters".format(max_length)) raise AirflowException( "The key ({k}) has to be made of alphanumeric characters, dashes, " "dots and underscores exclusively".format(**locals())) else:
start_date, end_date=None, num=None, delta=None): """ Get a set of dates as a list based on a start, end and delta, delta can be something that can be added to ``datetime.datetime`` or a cron expression as a ``str``
:param start_date: anchor date to start the series from :type start_date: datetime.datetime :param end_date: right boundary for the date range :type end_date: datetime.datetime :param num: alternatively to end_date, you can specify the number of number of entries you want in the range. This number can be negative, output will always be sorted regardless :type num: int
>>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), delta=timedelta(1)) [datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 1, 2, 0, 0), datetime.datetime(2016, 1, 3, 0, 0)] >>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), delta='0 0 * * *') [datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 1, 2, 0, 0), datetime.datetime(2016, 1, 3, 0, 0)] >>> date_range(datetime(2016, 1, 1), datetime(2016, 3, 3), delta="0 0 0 * *") [datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 2, 1, 0, 0), datetime.datetime(2016, 3, 1, 0, 0)] """ raise Exception("Wait. start_date needs to be before end_date") raise Exception("Wait. Either specify end_date OR num") end_date = datetime.now()
else: else: else: start_date = cron.get_next(datetime) else: else: if num > 0: start_date += delta else: start_date -= delta
""" json serializer that deals with dates usage: json.dumps(object, default=utils.json_ser) """
""" Transforms a SQLAlchemy model instance into a dictionary """ return None
f = open(filepath) content = f.read() f.close() return content
""" Function decorator that Looks for an argument named "default_args", and fills the unspecified arguments from it.
Since python2.* isn't clear about which arguments are missing when calling a function, and that this can be quite confusing with multi-level inheritance and argument defaults, this decorator also alerts with specific information about the missing arguments. """ def wrapper(*args, **kwargs): raise AirflowException( "Use keyword arguments when initializing operators")
default_args = kwargs['default_args'] if 'params' in default_args: dag_params.update(default_args['params']) del default_args['params']
msg = "Argument {0} is required".format(missing_args) raise AirflowException(msg)
yes = set(['yes', 'y']) no = set(['no', 'n'])
done = False print(question) while not done: choice = input().lower() if choice in yes: return True elif choice in no: return False else: print("Please respond by yes or no.")
""" Send an email with html content
>>> send_email('test@example.com', 'foo', '<b>Foo</b> bar', ['/dev/null'], dryrun=True) """
to = to.split(',') to = to.split(';') else:
f.read(), Content_Disposition='attachment; filename="%s"' % basename, Name=basename ))
s = smtplib.SMTP(SMTP_HOST, SMTP_PORT) if SMTP_STARTTLS: s.starttls() if SMTP_USER and SMTP_PASSWORD: s.login(SMTP_USER, SMTP_PASSWORD) logging.info("Sent an alert email to " + str(e_to)) s.sendmail(e_from, e_to, mime_msg.as_string()) s.quit()
''' Attempts to import a set of modules and specified attributes in the form of a dictionary. The attributes are copied in the parent module's namespace. The function returns a list of attributes names that can be affected to __all__.
This is used in the context of ``operators`` and ``hooks`` and silence the import errors for when libraries are missing. It makes for a clean package abstracting the underlying modules and only brings functional operators to those namespaces. ''' mod=mod, err=err))
""" Checks whether an object is one of the item in the list. This is different from ``in`` because ``in`` uses __cmp__ when present. Here we change based on the object itself """
finally: except OSError as e: # ENOENT - no such file or directory if e.errno != errno.ENOENT: raise e
""" To be used in a ``with`` block and timeout its content. """
""" Test if an object is a container (iterable) but not a string """ return hasattr(obj, '__iter__') and not isinstance(obj, basestring)
""" If obj is a container, returns obj as a tuple. Otherwise, returns a tuple containing obj. """ if is_container(obj): return tuple(obj) else: return tuple([obj])
""" Returns the datetime of the form start_date + i * delta which is closest to dt for any non-negative integer i.
Note that delta may be a datetime.timedelta or a dateutil.relativedelta
>>> round_time(datetime(2015, 1, 1, 6), timedelta(days=1)) datetime.datetime(2015, 1, 1, 0, 0) >>> round_time(datetime(2015, 1, 2), relativedelta(months=1)) datetime.datetime(2015, 1, 1, 0, 0) >>> round_time(datetime(2015, 9, 16, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0)) datetime.datetime(2015, 9, 16, 0, 0) >>> round_time(datetime(2015, 9, 15, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0)) datetime.datetime(2015, 9, 15, 0, 0) >>> round_time(datetime(2015, 9, 14, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0)) datetime.datetime(2015, 9, 14, 0, 0) >>> round_time(datetime(2015, 9, 13, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0)) datetime.datetime(2015, 9, 14, 0, 0) """
# It's cron based, so it's easy cron = croniter(delta, start_date) prev = cron.get_prev(datetime) if prev == start_date: return start_date else: return prev
# Ignore the microseconds of dt
# We are looking for a datetime in the form start_date + i * delta # which is as close as possible to dt. Since delta could be a relative # delta we don't know it's exact length in seconds so we cannot rely on # division to find i. Instead we employ a binary search algorithm, first # finding an upper and lower limit and then disecting the interval until # we have found the closest match.
# We first search an upper limit for i for which start_date + upper * delta # exceeds dt. # To speed up finding an upper limit we grow this exponentially by a # factor of 2
# Since upper is the first value for which start_date + upper * delta # exceeds dt, upper // 2 is below dt and therefore forms a lower limited # for the i we are looking for
# We now continue to intersect the interval between # start_date + lower * delta and start_date + upper * delta # until we find the closest value # Invariant: start + lower * delta < dt <= start + upper * delta # If start_date + (lower + 1)*delta exceeds dt, then either lower or # lower+1 has to be the solution we are searching for # Check if start_date + (lower + 1)*delta or # start_date + lower*delta is closer to dt and return the solution (start_date + (lower + 1) * delta) - dt <= dt - (start_date + lower * delta)): else:
# We intersect the interval and either replace the lower or upper # limit with the candidate else:
# in the special case when start_date > dt the search for upper will # immediately stop for upper == 1 which results in lower = upper // 2 = 0 # and this function returns start_date.
""" Given a number of tasks, builds a dependency chain.
chain(task_1, task_2, task_3, task_4)
is equivalent to
task_1.set_downstream(task_2) task_2.set_downstream(task_3) task_3.set_downstream(task_4) """
# convert dates and numpy objects in a json serializable format if isinstance(obj, datetime): return obj.strftime('%Y-%m-%dT%H:%M:%SZ') elif isinstance(obj, date): return obj.strftime('%Y-%m-%d') elif type(obj) in [np.int_, np.intc, np.intp, np.int8, np.int16, np.int32, np.int64, np.uint8, np.uint16, np.uint32, np.uint64]: return int(obj) elif type(obj) in [np.bool_]: return bool(obj) elif type(obj) in [np.float_, np.float16, np.float32, np.float64, np.complex_, np.complex64, np.complex128]: return float(obj)
# Let the base class default method raise the TypeError return json.JSONEncoder.default(self, obj) |