Coverage for airflow.contrib.auth.backends.ldap_auth : 57%
Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
|
Form, PasswordField, StringField)
except: pass
raise AuthenticationError("Username or password incorrect")
return conn
self.user = user
def try_login(username, password):
search_filter = "(&({0})({1}={2}))".format( configuration.get("ldap", "user_filter"), configuration.get("ldap", "user_name_attr"), username )
# todo: BASE or ONELEVEL?
res = conn.search(configuration.get("ldap", "basedn"), search_filter, search_scope=LEVEL)
# todo: use list or result? if not res: raise AuthenticationError("Invalid username or password")
entry = conn.response[0]
conn.unbind() conn = get_ldap_connection(entry['dn'], password)
if not conn: raise AuthenticationError("Invalid username or password")
'''Required by flask_login''' return True
'''Required by flask_login''' return True
'''Required by flask_login''' return False
'''Provides access to data profiling tools''' return True
'''Access all the things''' return True
def load_user(userid): session = settings.Session() user = session.query(models.User).filter(models.User.id == userid).first() session.expunge_all() session.commit() session.close() return LdapUser(user)
flash("You are already logged in") return redirect(url_for('index'))
return self.render('airflow/login.html', title="Airflow - Login", form=form)
session = settings.Session() user = session.query(models.User).filter( models.User.username == DEFAULT_USERNAME).first()
if not user: user = models.User( username=DEFAULT_USERNAME, is_superuser=True)
session.merge(user) session.commit() flask_login.login_user(LdapUser(user)) session.commit() session.close()
return redirect(request.args.get("next") or url_for("admin.index")) flash("Incorrect login details") return self.render('airflow/login.html', title="Airflow - Login", form=form)
|