Coverage for airflow/www/views.py : 71%
Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
|
Form, SelectField, TextAreaField, PasswordField, StringField)
# filter_by_owner if authentication is enabled and filter_by_owner is true FILTER_BY_OWNER = not current_app.config['LOGIN_DISABLED']
url = url_for( 'airflow.graph', dag_id=m.dag_id) return Markup( '<a href="{url}">{m.dag_id}</a>'.format(**locals()))
url = url_for( 'airflow.log', dag_id=m.dag_id, task_id=m.task_id, execution_date=m.execution_date.isoformat()) return Markup( '<a href="{url}">' ' <span class="glyphicon glyphicon-book" aria-hidden="true">' '</span></a>').format(**locals())
url = url_for( 'airflow.task', dag_id=m.dag_id, task_id=m.task_id, execution_date=m.execution_date.isoformat()) url_root = url_for( 'airflow.graph', dag_id=m.dag_id, root=m.task_id, execution_date=m.execution_date.isoformat()) return Markup( """ <span style="white-space: nowrap;"> <a href="{url}">{m.task_id}</a> <a href="{url_root}" title="Filter on this task and upstream"> <span class="glyphicon glyphicon-filter" style="margin-left: 0px;" aria-hidden="true"></span> </a> </span> """.format(**locals()))
'<span class="label" style="background-color:{color};">' '{state}</span>'.format(**locals()))
return state_token(m.state)
if m.end_date and m.duration: return timedelta(seconds=m.duration)
attr = getattr(m, p) dttm = attr.isoformat() if attr else '' if datetime.now().isoformat()[:4] == dttm[:4]: dttm = dttm[5:] return Markup("<nobr>{}</nobr>".format(dttm))
return Markup("<nobr>{}</nobr>".format(getattr(m, p)))
try: default_params = eval(m.default_params) except: default_params = {} url = url_for( 'airflow.chart', chart_id=m.id, iteration_no=m.iteration_no, **default_params) return Markup("<a href='{url}'>{m.label}</a>".format(**locals()))
url = '/admin/taskinstance/?flt1_pool_equals=' + m.pool return Markup("<a href='{url}'>{m.pool}</a>".format(**locals()))
s, lexer(), HtmlFormatter(linenos=True), )
elif isinstance(obj, (tuple, list)): for i, s in enumerate(obj): out += "<div>List item #{}</div>".format(i) out += "<div>" + pygment_html_render(s, lexer) + "</div>" elif isinstance(obj, dict): for k, v in obj.items(): out += '<div>Dict item "{}"</div>'.format(k) out += "<div>" + pygment_html_render(v, lexer) + "</div>"
return '<div class="rich_doc">' + markdown.markdown(s) + "</div>"
'bash_command': lambda x: render(x, lexers.BashLexer), 'hql': lambda x: render(x, lexers.SqlLexer), 'sql': lambda x: render(x, lexers.SqlLexer), 'doc': lambda x: render(x, lexers.TextLexer), 'doc_json': lambda x: render(x, lexers.JsonLexer), 'doc_rst': lambda x: render(x, lexers.RstLexer), 'doc_yaml': lambda x: render(x, lexers.YamlLexer), 'doc_md': wrapped_markdown, 'python_callable': lambda x: render( inspect.getsource(x), lexers.PythonLexer), }
''' Decorator for views requiring data profiling access ''' def decorated_function(*args, **kwargs): current_app.config['LOGIN_DISABLED'] or (not current_user.is_anonymous() and current_user.data_profiling()) ): else: flash("This page requires data profiling privileges", "error") return redirect(url_for('admin.index'))
url = ( '/admin/taskinstance/' + '?flt1_pool_equals=' + m.pool + '&flt2_state_equals=running') return Markup("<a href='{0}'>{1}</a>".format(url, m.used_slots()))
url = ( '/admin/taskinstance/' + '?flt1_pool_equals=' + m.pool + '&flt2_state_equals=queued&sort=10&desc=1') return Markup("<a href='{0}'>{1}</a>".format(url, m.queued_slots()))
def index(self): return self.render('airflow/dags.html')
# @cache.cached(timeout=3600, key_prefix=wwwutils.make_cache_key) def chart_data(self): models.Connection).filter_by(conn_id=chart.conn_id).first()
# Processing templated fields raise AirflowException('Not a dict') except: args = {} payload['error'] += ( "Default params is not valid, string has to evaluate as " "a Python dictionary. ")
sql, lexers.SqlLexer(), # Lexer call HtmlFormatter(noclasses=True)) )
except Exception as e: payload['error'] += "SQL execution failed. Details: " + str(e)
return Response( response=df.to_csv(index=False), status=200, mimetype="application/text")
payload['warning'] = ( "Data has been truncated to {0}" " rows. Expect incomplete results.").format(CHART_LIMIT)
payload['error'] += "Empty result set. " not payload['error'] and chart.sql_layout == 'series' and chart.chart_type != "datatable" and len(df.columns) < 3): elif ( not payload['error'] and chart.sql_layout == 'columns'and len(df.columns) < 2): payload['error'] += "SQL needs to return at least 2 columns. " elif not payload['error']: import numpy as np chart_type = chart.chart_type
data = None if chart_type == "datatable": chart.show_datatable = True if chart.show_datatable: data = df.to_dict(orient="split") data['columns'] = [{'title': c} for c in data['columns']]
# Trying to convert time to something Highcharts likes x_col = 1 if chart.sql_layout == 'series' else 0 if chart.x_is_date: try: # From string to datetime df[df.columns[x_col]] = pd.to_datetime( df[df.columns[x_col]]) except Exception as e: raise AirflowException(str(e)) df[df.columns[x_col]] = df[df.columns[x_col]].apply( lambda x: int(x.strftime("%s")) * 1000)
series = [] colorAxis = None if chart_type == 'datatable': payload['data'] = data payload['state'] = 'SUCCESS' return wwwutils.json_response(payload)
elif chart_type == 'para': df.rename(columns={ df.columns[0]: 'name', df.columns[1]: 'group', }, inplace=True) return Response( response=df.to_csv(index=False), status=200, mimetype="application/text")
elif chart_type == 'heatmap': color_perc_lbound = float( request.args.get('color_perc_lbound', 0)) color_perc_rbound = float( request.args.get('color_perc_rbound', 1)) color_scheme = request.args.get('color_scheme', 'blue_red')
if color_scheme == 'blue_red': stops = [ [color_perc_lbound, '#00D1C1'], [ color_perc_lbound + ((color_perc_rbound - color_perc_lbound)/2), '#FFFFCC' ], [color_perc_rbound, '#FF5A5F'] ] elif color_scheme == 'blue_scale': stops = [ [color_perc_lbound, '#FFFFFF'], [color_perc_rbound, '#2222FF'] ] elif color_scheme == 'fire': diff = float(color_perc_rbound - color_perc_lbound) stops = [ [color_perc_lbound, '#FFFFFF'], [color_perc_lbound + 0.33*diff, '#FFFF00'], [color_perc_lbound + 0.66*diff, '#FF0000'], [color_perc_rbound, '#000000'] ] else: stops = [ [color_perc_lbound, '#FFFFFF'], [ color_perc_lbound + ((color_perc_rbound - color_perc_lbound)/2), '#888888' ], [color_perc_rbound, '#000000'], ]
xaxis_label = df.columns[1] yaxis_label = df.columns[2] data = [] for row in df.itertuples(): data.append({ 'x': row[2], 'y': row[3], 'value': row[4], }) x_format = '{point.x:%Y-%m-%d}' \ if chart.x_is_date else '{point.x}' series.append({ 'data': data, 'borderWidth': 0, 'colsize': 24 * 36e5, 'turboThreshold': sys.float_info.max, 'tooltip': { 'headerFormat': '', 'pointFormat': ( df.columns[1] + ': ' + x_format + '<br/>' + df.columns[2] + ': {point.y}<br/>' + df.columns[3] + ': <b>{point.value}</b>' ), }, }) colorAxis = { 'stops': stops, 'minColor': '#FFFFFF', 'maxColor': '#000000', 'min': 50, 'max': 2200, } else: if chart.sql_layout == 'series': # User provides columns (series, x, y) xaxis_label = df.columns[1] yaxis_label = df.columns[2] df[df.columns[2]] = df[df.columns[2]].astype(np.float) df = df.pivot_table( index=df.columns[1], columns=df.columns[0], values=df.columns[2], aggfunc=np.sum) else: # User provides columns (x, y, metric1, metric2, ...) xaxis_label = df.columns[0] yaxis_label = 'y' df.index = df[df.columns[0]] df = df.sort(df.columns[0]) del df[df.columns[0]] for col in df.columns: df[col] = df[col].astype(np.float)
for col in df.columns: series.append({ 'name': col, 'data': [ (k, df[col][k]) for k in df[col].keys() if not np.isnan(df[col][k])] }) series = [serie for serie in sorted( series, key=lambda s: s['data'][0][1], reverse=True)]
if chart_type == "stacked_area": stacking = "normal" chart_type = 'area' elif chart_type == "percent_area": stacking = "percent" chart_type = 'area' else: stacking = None hc = { 'chart': { 'type': chart_type }, 'plotOptions': { 'series': { 'marker': { 'enabled': False } }, 'area': {'stacking': stacking}, }, 'title': {'text': ''}, 'xAxis': { 'title': {'text': xaxis_label}, 'type': 'datetime' if chart.x_is_date else None, }, 'yAxis': { 'title': {'text': yaxis_label}, }, 'colorAxis': colorAxis, 'tooltip': { 'useHTML': True, 'backgroundColor': None, 'borderWidth': 0, }, 'series': series, }
if chart.y_log_scale: hc['yAxis']['type'] = 'logarithmic' hc['yAxis']['minorTickInterval'] = 0.1 if 'min' in hc['yAxis']: del hc['yAxis']['min']
payload['state'] = 'SUCCESS' payload['hc'] = hc payload['data'] = data payload['request_dict'] = request_dict
def chart(self): return self.render('airflow/para/para.html', chart=chart)
chart.sql, lexers.SqlLexer(), # Lexer call HtmlFormatter(noclasses=True)) ) 'airflow/highchart.html', chart=chart, title="Airflow - Chart", sql=sql, label=chart.label, embed=embed)
#@login_required def dag_stats(self): State.SUCCESS, State.RUNNING, State.FAILED, State.UPSTREAM_FAILED, State.UP_FOR_RETRY, State.QUEUED, ] session.query(TI.dag_id, TI.state, sqla.func.count(TI.task_id)) .filter(TI.task_id.in_(task_ids)) .filter(TI.dag_id.in_(dag_ids)) .group_by(TI.dag_id, TI.state) )
'state': state, 'count': count, 'dag_id': dag.dag_id, 'color': State.color(state) }
def code(self): code, lexers.PythonLexer(), HtmlFormatter(linenos=True)) 'airflow/dag_code.html', html_code=html_code, dag=dag, title=title, root=request.args.get('root'), demo_mode=conf.getboolean('webserver', 'demo_mode'))
def dag_details(self):
session.query(TI.state, sqla.func.count(TI.dag_id)) .filter(TI.dag_id == dag_id) .group_by(TI.state) .all() ) 'airflow/dag_details.html', dag=dag, title=title, states=states, State=utils.State)
def circles(self): return render_template( 'airflow/circles.html', hostname=socket.gethostname()), 404
def show_traceback(self): from airflow import ascii as ascii_ return render_template( 'airflow/traceback.html', hostname=socket.gethostname(), nukular=ascii_.nukular, info=traceback.format_exc()), 500
def sandbox(self): from airflow import configuration title = "Sandbox Suggested Configuration" cfg_loc = conf.AIRFLOW_CONFIG + '.sandbox' f = open(cfg_loc, 'r') config = f.read() f.close() code_html = Markup(highlight( config, lexers.IniLexer(), # Lexer call HtmlFormatter(noclasses=True)) ) return self.render( 'airflow/code.html', code_html=code_html, title=title, subtitle=cfg_loc)
def noaccess(self): return self.render('airflow/noaccess.html')
def headers(self): d = { 'headers': {k: v for k, v in request.headers}, } if hasattr(current_user, 'is_superuser'): d['is_superuser'] = current_user.is_superuser() d['data_profiling'] = current_user.data_profiling() d['is_anonymous'] = current_user.is_anonymous() d['is_authenticated'] = current_user.is_authenticated() if hasattr(current_user, 'username'): d['username'] = current_user.username return wwwutils.json_response(d)
def pickle_info(self): d = {} dag_id = request.args.get('dag_id') dags = [dagbag.dags.get(dag_id)] if dag_id else dagbag.dags.values() for dag in dags: if not dag.is_subdag: d[dag.dag_id] = dag.pickle_info() return wwwutils.json_response(d)
def login(self):
def logout(self): logout_user() flash('You have been logged out.') return redirect(url_for('admin.index'))
def rendered(self): except Exception as e: flash("Error rendering template: " + str(e), "error") else: "<pre><code>" + str(content) + "</pre></code>")
'airflow/ti_code.html', html_dict=html_dict, dag=dag, task_id=task_id, execution_date=execution_date, form=form, title=title,)
def log(self): conf.get('core', 'BASE_LOG_FOLDER')) **locals()) TI.dag_id == dag_id, TI.task_id == task_id, TI.execution_date == dttm).first()
host = ti.hostname log_loaded = False
if socket.gethostname() == host: try: f = open(loc) log += "".join(f.readlines()) f.close() log_loaded = True except: log = "*** Log file isn't where expected.\n".format(loc) else: WORKER_LOG_SERVER_PORT = \ conf.get('celery', 'WORKER_LOG_SERVER_PORT') url = os.path.join( "http://{host}:{WORKER_LOG_SERVER_PORT}/log", log_relative ).format(**locals()) log += "*** Log file isn't local.\n" log += "*** Fetching here: {url}\n".format(**locals()) try: import requests log += '\n' + requests.get(url).text log_loaded = True except: log += "*** Failed to fetch log file from worker.\n".format( **locals())
# try to load log backup from S3 s3_log_folder = conf.get('core', 'S3_LOG_FOLDER') if not log_loaded and s3_log_folder.startswith('s3:'): import boto s3 = boto.connect_s3() s3_log_loc = os.path.join( conf.get('core', 'S3_LOG_FOLDER'), log_relative) log += '*** Fetching log from S3: {}\n'.format(s3_log_loc) log += ('*** Note: S3 logs are only available once ' 'tasks have completed.\n') bucket, key = s3_log_loc.lstrip('s3:/').split('/', 1) s3_key = boto.s3.key.Key(s3.get_bucket(bucket), key) if s3_key.exists(): log += '\n' + s3_key.get_contents_as_string().decode() else: log += '*** No log found on S3.\n'
session.commit() session.close()
'airflow/ti_code.html', code=log, dag=dag, title=title, task_id=task_id, execution_date=execution_date, form=form)
def task(self): # Carrying execution_date through, even though it's irrelevant for # this context flash( "Task [{}.{}] doesn't seem to exist" " at the moment".format(dag_id, task_id), "error") return redirect('/admin/')
attr_name not in attr_renderer:
# Color coding the special attributes that are code
'airflow/task.html', attributes=attributes, task_id=task_id, execution_date=execution_date, special_attrs_rendered=special_attrs_rendered, form=form, dag=dag, title=title)
def run(self):
except ImportError: # in case CeleryExecutor cannot be imported it is not active either flash("Only works with the CeleryExecutor, sorry", "error") return redirect(origin)
ti = models.TaskInstance(task=task, execution_date=execution_date) executor.start() executor.queue_task_instance( ti, force=force, ignore_dependencies=deps) executor.heartbeat() flash( "Sent {} to the message queue, " "it should start any moment now.".format(ti)) return redirect(origin)
def clear(self):
task_regex=r"^{0}$".format(task_id), include_downstream=downstream, include_upstream=upstream)
start_date=start_date, end_date=end_date)
else: start_date=start_date, end_date=end_date, dry_run=True) flash("No task instances to clear", 'error') response = redirect(origin) else:
'airflow/confirm.html', message=( "Here's the list of task instances you are about " "to clear:"), details=details,)
def blocked(self): session.query(DR.dag_id, sqla.func.count(DR.id)) .filter(DR.state == State.RUNNING) .group_by(DR.dag_id) .all() ) 'dag_id': dag_id, 'active_dag_run': active_dag_runs, 'max_active_runs': max_active_runs, })
def success(self):
# Flagging tasks as successful if future else execution_date)
elif dag.start_date: start_date = dag.start_date else: start_date = execution_date
task_ids += [ t.task_id for t in task.get_flat_relatives(upstream=False)] task_ids += [ t.task_id for t in task.get_flat_relatives(upstream=True)]
dates = [start_date] else:
TI.dag_id == dag_id, TI.execution_date.in_(dates), TI.task_id.in_(task_ids)).all() TI.dag_id == dag_id, TI.execution_date.in_(dates), TI.task_id.in_(task_ids), TI.state != State.SUCCESS).all() set(tasks) - set([(ti.task_id, ti.execution_date) for ti in tis]))
[(ti.task_id, ti.execution_date) for ti in tis_to_change], tis_to_create))
flash("Too many tasks at once (>{0})".format( MAX_PERIODS), 'error') return redirect(origin)
ti.state = State.SUCCESS
task=dag.get_task(task_id), execution_date=task_execution_date, state=State.SUCCESS)
len(tis_all_altered)))
else: flash("No task instances to mark as successful", 'error') response = redirect(origin) else: task=dag.get_task(task_id), execution_date=task_execution_date, state=State.SUCCESS))
'airflow/confirm.html', message=( "Here's the list of task instances you are about " "to mark as successful:"), details=details,)
def tree(self): dag = dag.sub_dag( task_regex=root, include_downstream=False, include_upstream=True)
base_date = dateutil.parser.parse(base_date) else:
session.query(DR) .filter( DR.dag_id==dag.dag_id, DR.execution_date<=base_date, DR.execution_date>=min_date) .all() ) dr.execution_date: utils.alchemy_to_dict(dr) for dr in dag_runs}
session, start_date=min_date, end_date=base_date)
# The default recursion traces every path so that tree view has full # expand/collapse functionality. After 5,000 nodes we stop and fall # back on a quick DFS search for performance. See PR #320.
recurse_nodes(t, visited) for t in task.upstream_list if node_count[0] < node_limit or t not in visited]
# D3 tree uses children vs _children to define what is # expanded or not. The following block makes it such that # repeated nodes are collapsed by default. elif children: children_key = "_children"
'name': task.task_id, 'instances': [ task_instances.get((task.task_id, d)) or { 'execution_date': d.isoformat(), 'task_id': task.task_id } for d in dates], children_key: children, 'num_dep': len(task.upstream_list), 'operator': task.task_type, 'retries': task.retries, 'owner': task.owner, 'start_date': task.start_date, 'end_date': task.end_date, 'depends_on_past': task.depends_on_past, 'ui_color': task.ui_color, } 'name': '[DAG]', 'children': [recurse_nodes(t, set()) for t in dag.roots], 'instances': [ dag_runs.get(d) or {'execution_date': d.isoformat()} for d in dates], }
'num_runs': num_runs}) 'airflow/tree.html', operators=sorted( list(set([op.__class__ for op in dag.tasks])), key=lambda x: x.__name__ ), root=root, form=form, dag=dag, data=data, blur=blur)
def graph(self): flash('DAG "{0}" seems to be missing.'.format(dag_id), "error") return redirect('/admin/')
dag = dag.sub_dag( task_regex=root, include_upstream=True, include_downstream=False)
'id': task.task_id, 'value': { 'label': task.task_id, 'labelStyle': "fill:{0};".format(task.ui_fgcolor), 'style': "fill:{0};".format(task.ui_color), } })
'u': t.task_id, 'v': task.task_id, }
dttm = dateutil.parser.parse(dttm) else:
('LR', "Left->Right"), ('RL', "Right->Left"), ('TB', "Top->Bottom"), ('BT', "Bottom->Top"), )) data={'execution_date': dttm.isoformat(), 'arrange': arrange})
ti.task_id: utils.alchemy_to_dict(ti) for ti in dag.get_task_instances(session, dttm, dttm)} t.task_id: { 'dag_id': t.dag_id, 'task_type': t.task_type, } for t in dag.tasks} flash("No tasks found", "error")
'airflow/graph.html', dag=dag, form=form, width=request.args.get('width', "100%"), height=request.args.get('height', "800"), execution_date=dttm.isoformat(), state_token=state_token(dr_state), doc_md=doc_md, arrange=arrange, operators=sorted( list(set([op.__class__ for op in dag.tasks])), key=lambda x: x.__name__ ), blur=blur, root=root or '', task_instances=json.dumps(task_instances, indent=2), tasks=json.dumps(tasks, indent=2), nodes=json.dumps(nodes, indent=2), edges=json.dumps(edges, indent=2),)
def duration(self):
base_date = dateutil.parser.parse(base_date) else:
dag = dag.sub_dag( task_regex=root, include_upstream=True, include_downstream=False)
end_date=base_date): ti.execution_date.isoformat(), float(ti.duration) / (60*60) ])
session, start_date=min_date, end_date=base_date)
'num_runs': num_runs}) 'airflow/chart.html', dag=dag, data=json.dumps(all_data), chart_options={'yAxis': {'title': {'text': 'hours'}}}, height="700px", demo_mode=conf.getboolean('webserver', 'demo_mode'), root=root, form=form, )
def landing_times(self):
base_date = dateutil.parser.parse(base_date) else:
dag = dag.sub_dag( task_regex=root, include_upstream=True, include_downstream=False)
end_date=base_date):
session, start_date=min_date, end_date=base_date)
'num_runs': num_runs}) 'airflow/chart.html', dag=dag, data=json.dumps(all_data), height="700px", chart_options={'yAxis': {'title': {'text': 'hours after 00:00'}}}, demo_mode=conf.getboolean('webserver', 'demo_mode'), root=root, form=form, )
def paused(self): DagModel).filter(DagModel.dag_id == dag_id).first() else: orm_dag.is_paused = False
def refresh(self): DagModel).filter(DagModel.dag_id == dag_id).first()
def refresh_all(self):
def gantt(self):
dag = dag.sub_dag( task_regex=root, include_upstream=True, include_downstream=False)
dttm = dateutil.parser.parse(dttm) else:
ti for ti in dag.get_task_instances(session, dttm, dttm) if ti.start_date] 'x': i, 'low': int(ti.start_date.strftime('%s')) * 1000, 'high': int(end_date.strftime('%s')) * 1000, 'color': color, })
'chart': { 'type': 'columnrange', 'inverted': True, 'height': height, }, 'xAxis': {'categories': tasks, 'alternateGridColor': '#FAFAFA'}, 'yAxis': {'type': 'datetime'}, 'title': { 'text': None }, 'plotOptions': { 'series': { 'cursor': 'pointer', 'minPointLength': 4, }, }, 'legend': { 'enabled': False }, 'series': [{ 'data': data }] } 'airflow/gantt.html', dag=dag, execution_date=dttm.isoformat(), form=form, hc=json.dumps(hc, indent=4), height=height, demo_mode=demo_mode, root=root, )
def task_instances(self):
else: return ("Error: Invalid execution_date")
ti.task_id: utils.alchemy_to_dict(ti) for ti in dag.get_task_instances(session, dttm, dttm)}
def variables(self, form): try: if request.method == 'POST': data = request.json if data: session = settings.Session() var = models.Variable(key=form, val=json.dumps(data)) session.add(var) session.commit() return "" else: return self.render( 'airflow/variables/{}.html'.format(form) ) except: return ("Error: form airflow/variables/{}.html " "not found.").format(form), 404
def index(self): # filter the dags if filter_by_owner and current user is not superuser qry = ( session.query(DM) .filter( ~DM.is_subdag, DM.is_active, DM.owners == current_user.username) .all() ) else: flash( "Broken DAG: [{ie.filename}] {ie.stacktrace}".format(ie=ie), "error") dags = { dag.dag_id: dag for dag in dags if ( dag.owner == current_user.username and (not dag.parent_dag) ) } else: 'airflow/dags.html', dags=dags, orm_dags=orm_dags, all_dag_ids=all_dag_ids)
def query(self): models.Connection.conn_id).all() ((db.conn_id, db.conn_id) for db in dbs if db.get_hook()))
'conn_id': conn_id_str, 'sql': sql, } # df = hook.get_pandas_df(sql) classes=[ 'table', 'table-bordered', 'table-striped', 'no-wrap'], index=False, na_rep='', ) if has_data else '' except Exception as e: flash(str(e), 'error') error = True
flash( "Query output truncated at " + str(QUERY_LIMIT) + " rows", 'info')
flash('No data', 'error')
return Response( response=df.to_csv(index=False), status=200, mimetype="application/text")
'airflow/query.html', form=form, title="Ad Hoc Query", results=results or '', has_data=has_data)
""" Modifying the base ModelView class for non edit, browse only operations """
pool=pool_link, used_slots=fused_slots, queued_slots=fqueued_slots)
'dag_id', 'task_id', 'execution_date', 'email_sent', 'timestamp') task_id=task_instance_link, execution_date=datetime_f, timestamp=datetime_f, dag_id=dag_link) 'dag_id', 'task_id', 'email_sent', 'timestamp', 'execution_date') 'email_sent': {'disabled': True}, 'timestamp': {'disabled': True}, }
'label', 'owner', 'conn_id', 'chart_type', 'show_datatable', 'x_is_date', 'y_log_scale', 'show_sql', 'height', 'sql_layout', 'sql', 'default_params',) 'label', 'conn_id', 'chart_type', 'owner', 'last_modified',) 'label': "Can include {{ templated_fields }} and {{ macros }}", 'chart_type': "The type of chart to be displayed", 'sql': "Can include {{ templated_fields }} and {{ macros }}.", 'height': "Height of the chart, in pixels.", 'conn_id': "Source database to run the query against", 'x_is_date': ( "Whether the X axis should be casted as a date field. Expect most " "intelligible date formats to get casted properly." ), 'owner': ( "The chart's owner, mostly used for reference and filtering in " "the list view." ), 'show_datatable': "Whether to display an interactive data table under the chart.", 'default_params': ( 'A dictionary of {"key": "values",} that define what the ' 'templated fields (parameters) values should be by default. ' 'To be valid, it needs to "eval" as a Python dict. ' 'The key values will show up in the url\'s querystring ' 'and can be altered there.' ), 'show_sql': "Whether to display the SQL statement as a collapsible " "section in the chart page.", 'y_log_scale': "Whether to use a log scale for the Y axis.", 'sql_layout': ( "Defines the layout of the SQL that the application should " "expect. Depending on the tables you are sourcing from, it may " "make more sense to pivot / unpivot the metrics." ), } 'sql': "SQL", 'height': "Chart Height", 'sql_layout': "SQL Layout", 'show_sql': "Display the SQL Statement", 'default_params': "Default Parameters", } 'chart_type': [ ('line', 'Line Chart'), ('spline', 'Spline Chart'), ('bar', 'Bar Chart'), ('para', 'Parallel Coordinates'), ('column', 'Column Chart'), ('area', 'Overlapping Area Chart'), ('stacked_area', 'Stacked Area Chart'), ('percent_area', 'Percent Area Chart'), ('heatmap', 'Heatmap'), ('datatable', 'No chart, data table only'), ], 'sql_layout': [ ('series', 'SELECT series, x, y FROM ...'), ('columns', 'SELECT x, y (series 1), y (series 2), ... FROM ...'), ], 'conn_id': [ (c.conn_id, c.conn_id) for c in ( Session().query(models.Connection.conn_id) .group_by(models.Connection.conn_id) ) ] }
if model.iteration_no is None: model.iteration_no = 0 else: model.iteration_no += 1 if not model.user_id and current_user and hasattr(current_user, 'id'): model.user_id = current_user.id model.last_modified = datetime.now()
'label', 'event_type', 'start_date', 'end_date', 'reported_by', 'description') 'label', 'event_type', 'start_date', 'end_date', 'reported_by')
''' # For debugging / troubleshooting mv = KnowEventTypeView( models.KnownEventType, Session, name="Known Event Types", category="Manage") admin.add_view(mv) class DagPickleView(SuperUserMixin, ModelView): pass mv = DagPickleView( models.DagPickle, Session, name="Pickles", category="Manage") admin.add_view(mv) '''
'key', 'val', ) 'is_encrypted': {'disabled': True}, 'val': { 'rows': 20, } }
'job_type', 'dag_id', 'state', 'unixname', 'hostname', 'start_date', 'end_date', 'latest_heartbeat') start_date=datetime_f, end_date=datetime_f, hostname=nobr_f, state=state_f, latest_heartbeat=datetime_f)
'state': [ ('success', 'success'), ('running', 'running'), ('failed', 'failed'), ], } 'state', 'dag_id', 'execution_date', 'run_id', 'external_trigger') execution_date=datetime_f, state=state_f, start_date=datetime_f, dag_id=dag_link)
def action_set_running(self, ids): self.set_dagrun_state(ids, State.RUNNING)
def action_set_failed(self, ids): self.set_dagrun_state(ids, State.FAILED)
def action_set_success(self, ids): self.set_dagrun_state(ids, State.SUCCESS)
try: DR = models.DagRun count = 0 for dr in session.query(DR).filter(DR.id.in_(ids)).all(): count += 1 dr.state = target_state if target_state == State.RUNNING: dr.start_date = datetime.now() else: dr.end_date = datetime.now() session.commit() flash( "{count} dag runs were set to '{target_state}'".format(**locals())) except Exception as ex: if not self.handle_view_exception(ex): raise Exception("Ooops") flash('Failed to set state', 'error')
dttm=datetime_f, execution_date=datetime_f, dag_id=dag_link)
'state', 'dag_id', 'task_id', 'execution_date', 'hostname', 'queue', 'pool', 'operator', 'start_date', 'end_date') log=log_link, task_id=task_instance_link, hostname=nobr_f, state=state_f, execution_date=datetime_f, start_date=datetime_f, end_date=datetime_f, queued_dttm=datetime_f, dag_id=dag_link, duration=duration_f) 'state': [ ('success', 'success'), ('running', 'running'), ('failed', 'failed'), ], } 'state', 'dag_id', 'task_id', 'execution_date', 'operator', 'start_date', 'end_date', 'duration', 'job_id', 'hostname', 'unixname', 'priority_weight', 'queue', 'queued_dttm', 'try_number', 'pool', 'log')
def action_set_running(self, ids): self.set_task_instance_state(ids, State.RUNNING)
def action_set_failed(self, ids): self.set_task_instance_state(ids, State.FAILED)
def action_set_success(self, ids): self.set_task_instance_state(ids, State.SUCCESS)
def action_set_retry(self, ids): self.set_task_instance_state(ids, State.UP_FOR_RETRY)
try: TI = models.TaskInstance for count, id in enumerate(ids): task_id, dag_id, execution_date = id.split(',') execution_date = datetime.strptime(execution_date, '%Y-%m-%d %H:%M:%S') ti = session.query(TI).filter(TI.task_id == task_id, TI.dag_id == dag_id, TI.execution_date == execution_date).one() ti.state = target_state count += 1 session.commit() flash( "{count} task instances were set to '{target_state}'".format(**locals())) except Exception as ex: if not self.handle_view_exception(ex): raise Exception("Ooops") flash('Failed to set state', 'error')
'conn_id', 'conn_type', 'host', 'schema', 'login', 'password', 'port', 'extra', 'extra__jdbc__drv_path', 'extra__jdbc__drv_clsname', ) 'is_extra_encrypted': {'disabled': True}, 'is_encrypted': {'disabled': True}, } # Used to customized the form, the forms elements get rendered # and results are stored in the extra field as json. All of these # need to be prefixed with extra__ and then the conn_type ___ as in # extra__{conn_type}__name. You can also hide form elements and rename # others from the connection_form.js file 'extra__jdbc__drv_path' : StringField('Driver Path'), 'extra__jdbc__drv_clsname': StringField('Driver Class'), } 'conn_type': [ ('bigquery', 'BigQuery',), ('ftp', 'FTP',), ('google_cloud_storage', 'Google Cloud Storage'), ('hdfs', 'HDFS',), ('http', 'HTTP',), ('hive_cli', 'Hive Client Wrapper',), ('hive_metastore', 'Hive Metastore Thrift',), ('hiveserver2', 'Hive Server 2 Thrift',), ('jdbc', 'Jdbc Connection',), ('mysql', 'MySQL',), ('postgres', 'Postgres',), ('oracle', 'Oracle',), ('vertica', 'Vertica',), ('presto', 'Presto',), ('s3', 'S3',), ('samba', 'Samba',), ('sqlite', 'Sqlite',), ('mssql', 'Microsoft SQL Server'), ('mesos_framework-id', 'Mesos Framework ID'), ] }
formdata = form.data if formdata['conn_type'] in ['jdbc']: extra = { key:formdata[key] for key in self.form_extra_fields.keys() if key in formdata} model.extra = json.dumps(extra)
def alert_fernet_key(cls): return conf.get('core', 'fernet_key') is None
def is_secure(self): """ Used to display a message in the Connection list view making it clear that the passwords and `extra` field can't be encrypted. """ is_secure = False try: import cryptography conf.get('core', 'fernet_key') is_secure = True except: pass return is_secure
try: d = json.loads(form.data.get('extra', '{}')) except Exception as e: d = {}
for field in list(self.form_extra_fields.keys()): value = d.get(field, '') if value: field = getattr(form, field) field.data = value
def conf(self): with open(conf.AIRFLOW_CONFIG, 'r') as f: config = f.read() else: "# You Airflow administrator chose not to expose the " "configuration, most likely for security reasons.") return Response( response=config, status=200, mimetype="application/text") else: config, lexers.IniLexer(), # Lexer call HtmlFormatter(noclasses=True)) ) 'airflow/code.html', pre_subtitle=settings.HEADER + " v" + airflow.__version__, code_html=code_html, title=title, subtitle=subtitle)
'dag_id', 'owners', 'is_paused', 'is_active', 'is_subdag', 'last_scheduler_run', 'last_expired') 'last_scheduler_run': {'disabled': True}, 'fileloc': {'disabled': True}, 'is_paused': {'disabled': True}, 'last_pickled': {'disabled': True}, 'pickle_id': {'disabled': True}, 'last_loaded': {'disabled': True}, 'last_expired': {'disabled': True}, 'pickle_size': {'disabled': True}, 'scheduler_lock': {'disabled': True}, 'owners': {'disabled': True}, } dag_id=dag_link, )
""" Default filters for model """ return ( super(DagModelView, self) .get_query() .filter(or_(models.DagModel.is_active, models.DagModel.is_paused)) .filter(~models.DagModel.is_subdag) )
""" Default filters for model """ return ( super(DagModelView, self) .get_count_query() .filter(models.DagModel.is_active) .filter(~models.DagModel.is_subdag) ) |