Coverage for airflow/www/utils.py : 79%
Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
|
not AUTHENTICATE or ( not current_user.is_anonymous() and current_user.is_authenticated() ) )
not AUTHENTICATE or (not current_user.is_anonymous() and current_user.is_superuser()) )
not AUTHENTICATE or (not current_user.is_anonymous() and current_user.data_profiling()) )
sql = """\ SELECT TOP {limit} * FROM ( {sql} ) qry """.format(**locals()) sql = """\ SELECT * FROM ( {sql} ) qry WHERE ROWNUM <= {limit} """.format(**locals()) else: SELECT * FROM ( {sql} ) qry LIMIT {limit} """.format(**locals())
''' Decorator to log user actions ''' def wrapper(*args, **kwargs):
user = current_user.username else:
event=f.__name__, task_instance=None, owner=user, extra=str(request.args.items()), task_id=request.args.get('task_id'), dag_id=request.args.get('dag_id'))
request.args.get('execution_date'))
''' Decorator to notify owner of actions taken on their DAGs by others ''' def wrapper(*args, **kwargs): """ if request.args.get('confirmed') == "true": dag_id = request.args.get('dag_id') task_id = request.args.get('task_id') dagbag = models.DagBag( os.path.expanduser(configuration.get('core', 'DAGS_FOLDER')))
dag = dagbag.get_dag(dag_id) task = dag.get_task(task_id)
if current_user and hasattr(current_user, 'username'): user = current_user.username else: user = 'anonymous'
if task.owner != user: subject = ( 'Actions taken on DAG {0} by {1}'.format( dag_id, user)) items = request.args.items() content = Template(''' action: <i>{{ f.__name__ }}</i><br> <br> <b>Parameters</b>:<br> <table> {% for k, v in items %} {% if k != 'origin' %} <tr> <td>{{ k }}</td> <td>{{ v }}</td> </tr> {% endif %} {% endfor %} </table> ''').render(**locals()) if task.email: utils.send_email(task.email, subject, content) """
""" returns a json response from a json serializable python object """ response=json.dumps( obj, indent=4, cls=utils.AirflowJsonEncoder), status=200, mimetype="application/json")
''' Decorator to make a view compressed ''' def view_func(*args, **kwargs): def zipper(response):
response.direct_passthrough = False
if (response.status_code < 200 or response.status_code >= 300 or 'Content-Encoding' in response.headers): return response gzip_buffer = IO() gzip_file = gzip.GzipFile(mode='wb', fileobj=gzip_buffer) gzip_file.write(response.data) gzip_file.close()
response.data = gzip_buffer.getvalue() response.headers['Content-Encoding'] = 'gzip' response.headers['Vary'] = 'Accept-Encoding' response.headers['Content-Length'] = len(response.data)
return response
''' Used by cache to get a unique key per URL ''' path = request.path args = str(hash(frozenset(request.args.items()))) return (path + args).encode('ascii', 'ignore')
""" Renders an ACE code editor. """ <div id="{el_id}" style="height:100px;">{contents}</div> <textarea id="{el_id}_ace" name="{form_name}" style="display:none;visibility:hidden;"> </textarea> '''.format( el_id=kwargs.get('id', field.id), contents=escape(text_type(field._value())), form_name=field.id, ) |