diff options
Diffstat (limited to 'gn_auth/auth/authorisation/users/admin')
| -rw-r--r-- | gn_auth/auth/authorisation/users/admin/models.py | 56 | ||||
| -rw-r--r-- | gn_auth/auth/authorisation/users/admin/ui.py | 4 | ||||
| -rw-r--r-- | gn_auth/auth/authorisation/users/admin/views.py | 41 |
3 files changed, 84 insertions, 17 deletions
diff --git a/gn_auth/auth/authorisation/users/admin/models.py b/gn_auth/auth/authorisation/users/admin/models.py index 36f3c09..3d68932 100644 --- a/gn_auth/auth/authorisation/users/admin/models.py +++ b/gn_auth/auth/authorisation/users/admin/models.py @@ -1,23 +1,55 @@ """Major function for handling admin users.""" +import warnings + from gn_auth.auth.db import sqlite3 as db from gn_auth.auth.authentication.users import User +from gn_auth.auth.authorisation.roles.models import Role, db_rows_to_roles -def make_sys_admin(cursor: db.DbCursor, user: User) -> User: - """Make a given user into an system admin.""" + +def sysadmin_role(conn: db.DbConnection) -> Role: + """Fetch the `system-administrator` role details.""" + with db.cursor(conn) as cursor: + cursor.execute( + "SELECT roles.*, privileges.* " + "FROM roles INNER JOIN role_privileges " + "ON roles.role_id=role_privileges.role_id " + "INNER JOIN privileges " + "ON role_privileges.privilege_id=privileges.privilege_id " + "WHERE role_name='system-administrator'") + results = db_rows_to_roles(cursor.fetchall()) + + assert len(results) == 1, ( + "There should only ever be one 'system-administrator' role.") + return results[0] + + +def grant_sysadmin_role(cursor: db.DbCursor, user: User) -> User: + """Grant `system-administrator` role to `user`.""" cursor.execute( "SELECT * FROM roles WHERE role_name='system-administrator'") admin_role = cursor.fetchone() - cursor.execute( - "SELECT * FROM resources AS r " - "INNER JOIN resource_categories AS rc " - "ON r.resource_category_id=rc.resource_category_id " - "WHERE resource_category_key='system'") - the_system = cursor.fetchone() - cursor.execute( + cursor.execute("SELECT resources.resource_id FROM resources") + cursor.executemany( "INSERT INTO user_roles VALUES (:user_id, :role_id, :resource_id)", - { + tuple({ "user_id": str(user.user_id), "role_id": admin_role["role_id"], - "resource_id": the_system["resource_id"] - }) + "resource_id": resource_id + } for resource_id in cursor.fetchall())) return user + + +def make_sys_admin(cursor: db.DbCursor, user: User) -> User: + """Make a given user into an system admin.""" + warnings.warn( + DeprecationWarning( + f"The function `{__name__}.make_sys_admin` will be removed soon"), + stacklevel=1) + return grant_sysadmin_role(cursor, user) + + +def revoke_sysadmin_role(conn: db.DbConnection, user: User): + """Revoke `system-administrator` role from `user`.""" + with db.cursor(conn) as cursor: + cursor.execute("DELETE FROM user_roles WHERE user_id=? AND role_id=?", + (str(user.user_id), str(sysadmin_role(conn).role_id))) diff --git a/gn_auth/auth/authorisation/users/admin/ui.py b/gn_auth/auth/authorisation/users/admin/ui.py index 64e79a0..43ca0a2 100644 --- a/gn_auth/auth/authorisation/users/admin/ui.py +++ b/gn_auth/auth/authorisation/users/admin/ui.py @@ -1,6 +1,6 @@ """UI utilities for the auth system.""" from functools import wraps -from flask import flash, url_for, redirect +from flask import flash, request, url_for, redirect from gn_auth.session import logged_in, session_user, clear_session_info from gn_auth.auth.authorisation.resources.system.models import ( @@ -24,5 +24,5 @@ def is_admin(func): flash("Expected a system administrator.", "alert-danger") flash("You have been logged out of the system.", "alert-info") clear_session_info() - return redirect(url_for("oauth2.admin.login")) + return redirect(url_for("oauth2.admin.login", **dict(request.args))) return __admin__ diff --git a/gn_auth/auth/authorisation/users/admin/views.py b/gn_auth/auth/authorisation/users/admin/views.py index 85aeb50..9bc1c36 100644 --- a/gn_auth/auth/authorisation/users/admin/views.py +++ b/gn_auth/auth/authorisation/users/admin/views.py @@ -30,6 +30,7 @@ from ....authentication.oauth2.models.oauth2client import ( save_client, OAuth2Client, oauth2_clients, + update_client_attribute, client as oauth2_client, delete_client as _delete_client) from ....authentication.users import ( @@ -97,7 +98,7 @@ def login(): expires=( datetime.now(tz=timezone.utc) + timedelta(minutes=int( app.config.get("SESSION_EXPIRY_MINUTES", 10))))) - return redirect(url_for(next_uri)) + return redirect(url_for(next_uri, **dict(request.args))) raise NotFoundError(error_message) except NotFoundError as _nfe: flash(error_message, "alert-danger") @@ -196,7 +197,7 @@ def register_client(): if request.method == "GET": return render_template( "admin/register-client.html", - scope=app.config["OAUTH2_SCOPE"], + scope=app.config["OAUTH2_SCOPES_SUPPORTED"], users=with_db_connection(__list_users__), granttypes=_FORM_GRANT_TYPES_, current_user=session.session_user()) @@ -261,7 +262,7 @@ def view_client(client_id: uuid.UUID): return render_template( "admin/view-oauth2-client.html", client=with_db_connection(partial(oauth2_client, client_id=client_id)), - scope=app.config["OAUTH2_SCOPE"], + scope=app.config["OAUTH2_SCOPES_SUPPORTED"], granttypes=_FORM_GRANT_TYPES_) @@ -321,3 +322,37 @@ def delete_client(): "successfully."), "alert-success") return redirect(url_for("oauth2.admin.list_clients")) + + +@admin.route("/clients/<uuid:client_id>/change-secret", methods=["GET", "POST"]) +@is_admin +def change_client_secret(client_id: uuid.UUID): + """Enable changing of a client's secret.""" + def __no_client__(): + # Calling the function causes the flash to be evaluated + # flash("No such client was found!", "alert-danger") + return redirect(url_for("oauth2.admin.list_clients")) + + with db.connection(app.config["AUTH_DB"]) as conn: + if request.method == "GET": + return oauth2_client( + conn, client_id=client_id + ).maybe(__no_client__(), lambda _client: render_template( + "admin/confirm-change-client-secret.html", + client=_client + )) + + _raw = random_string() + return oauth2_client( + conn, client_id=client_id + ).then( + lambda _client: save_client( + conn, + update_client_attribute( + _client, "client_secret", hash_password(_raw))) + ).then( + lambda _client: render_template( + "admin/registered-client.html", + client=_client, + client_secret=_raw) + ).maybe(__no_client__(), lambda resp: resp) |
