diff options
Diffstat (limited to 'gn3/auth/authorisation/views.py')
-rw-r--r-- | gn3/auth/authorisation/views.py | 197 |
1 files changed, 0 insertions, 197 deletions
diff --git a/gn3/auth/authorisation/views.py b/gn3/auth/authorisation/views.py deleted file mode 100644 index 03c4b03..0000000 --- a/gn3/auth/authorisation/views.py +++ /dev/null @@ -1,197 +0,0 @@ -"""Endpoints for the authorisation stuff.""" -import uuid -import traceback -from typing import Tuple, Optional - -import sqlite3 -from flask import request, jsonify, Response, current_app - -from gn3.auth import db -from gn3.auth.dictify import dictify -from gn3.auth.blueprint import oauth2 - -from .errors import NotFoundError, UserRegistrationError -from .resources import user_resources as _user_resources -from .roles import user_role, assign_default_roles, user_roles as _user_roles -from .groups import ( - all_groups, GroupCreationError, user_group as _user_group, - group_users as _group_users, create_group as _create_group) - -from ..authentication.oauth2.resource_server import require_oauth -from ..authentication.users import save_user, set_user_password -from ..authentication.oauth2.models.oauth2token import token_by_access_token - -@oauth2.route("/user", methods=["GET"]) -@require_oauth("profile") -def user_details(): - """Return user's details.""" - with require_oauth.acquire("profile") as the_token: - user = the_token.user - user_dets = { - "user_id": user.user_id, "email": user.email, "name": user.name, - "group": False - } - with db.connection(current_app.config["AUTH_DB"]) as conn, db.cursor(conn) as cursor: - return jsonify(_user_group(cursor, user).maybe( - user_dets, - lambda group: {**user_dets, "group": dictify(group)})) - -@oauth2.route("/user-roles", methods=["GET"]) -@require_oauth("role") -def user_roles(): - """Return the non-resource roles assigned to the user.""" - with require_oauth.acquire("role") as token: - with db.connection(current_app.config["AUTH_DB"]) as conn: - return jsonify(_user_roles(conn, token.user).maybe( - tuple(), lambda roles: tuple(dictify(role) for role in roles))) - -def __email_valid__(email: str) -> Tuple[bool, Optional[str]]: - """Validate the email address.""" - if email == "": - return False, "Empty email address" - - ## Check that the address is a valid email address - ## Review use of `email-validator` or `pyIsEmail` python packages for - ## validating the emails, if it turns out this is important. - - ## Success - return True, None - -def __password_valid__(password, confirm_password) -> Tuple[bool, Optional[str]]: - if password == "" or confirm_password == "": - return False, "Empty password value" - - if password != confirm_password: - return False, "Mismatched password values" - - return True, None - -def __user_name_valid__(name: str) -> Tuple[bool, Optional[str]]: - if name == "": - return False, "User's name not provided." - - return True, None - -def __assert_not_logged_in__(conn: db.DbConnection): - bearer = request.headers.get('Authorization') - if bearer: - token = token_by_access_token(conn, bearer.split(None)[1]).maybe(# type: ignore[misc] - False, lambda tok: tok) - if token: - raise UserRegistrationError( - "Cannot register user while authenticated") - -@oauth2.route("/register-user", methods=["POST"]) -def register_user(): - """Register a user.""" - with db.connection(current_app.config["AUTH_DB"]) as conn: - __assert_not_logged_in__(conn) - - form = request.form - email = form.get("email", "").strip() - password = form.get("password", "").strip() - user_name = form.get("user_name", "").strip() - errors = tuple( - error for valid,error in - [__email_valid__(email), - __password_valid__( - password, form.get("confirm_password", "").strip()), - __user_name_valid__(user_name)] - if not valid) - if len(errors) > 0: - raise UserRegistrationError(*errors) - - try: - with db.cursor(conn) as cursor: - user, _hashed_password = set_user_password( - cursor, save_user(cursor, email, user_name), password) - assign_default_roles(cursor, user) - return jsonify( - { - "user_id": user.user_id, - "email": user.email, - "name": user.name - }), 200 - except sqlite3.IntegrityError as sq3ie: - current_app.logger.debug(traceback.format_exc()) - raise UserRegistrationError( - "A user with that email already exists") from sq3ie - - raise Exception( - "unknown_error", "The system experienced an unexpected error.") - -@oauth2.route("/groups", methods=["GET"]) -@require_oauth("profile group") -def groups(): - """Return the list of groups that exist.""" - with db.connection(current_app.config["AUTH_DB"]) as conn: - the_groups = all_groups(conn) - - return jsonify(the_groups.maybe( - [], lambda grps: [dictify(grp) for grp in grps])) - -@oauth2.route("/create-group", methods=["POST"]) -@require_oauth("profile group") -def create_group(): - """Create a new group.""" - with require_oauth.acquire("profile group") as the_token: - group_name=request.form.get("group_name", "").strip() - if not bool(group_name): - raise GroupCreationError("Could not create the group.") - - db_uri = current_app.config["AUTH_DB"] - with db.connection(db_uri) as conn: - user = the_token.user - new_group = _create_group( - conn, group_name, user, request.form.get("group_description")) - return jsonify({ - **dictify(new_group), "group_leader": dictify(user) - }) - -@oauth2.route("/role/<uuid:role_id>", methods=["GET"]) -@require_oauth("role") -def role(role_id: uuid.UUID) -> Response: - """Retrieve a user role with id `role_id`""" - def __error__(exc: Exception): - raise exc - with require_oauth.acquire("profile role") as the_token: - db_uri = current_app.config["AUTH_DB"] - with db.connection(db_uri) as conn: - the_role = user_role(conn, the_token.user, role_id) - return the_role.either( - __error__, lambda a_role: jsonify(dictify(a_role))) - -@oauth2.route("/user-group", methods=["GET"]) -@require_oauth("profile group") -def user_group(): - """Retrieve the group in which the user is a member.""" - with require_oauth.acquire("profile group") as the_token: - db_uri = current_app.config["AUTH_DB"] - with db.connection(db_uri) as conn, db.cursor(conn) as cursor: - group = _user_group(cursor, the_token.user).maybe( - False, lambda grp: grp) - - if group: - return jsonify(dictify(group)) - raise NotFoundError("User is not a member of any group.") - -@oauth2.route("/user-resources") -@require_oauth("profile resource") -def user_resources(): - """Retrieve the resources a user has access to.""" - with require_oauth.acquire("profile resource") as the_token: - db_uri = current_app.config["AUTH_DB"] - with db.connection(db_uri) as conn: - return jsonify([ - dictify(resource) for resource in - _user_resources(conn, the_token.user)]) - -@oauth2.route("/group-users/<uuid:group_id>", methods=["GET"]) -@require_oauth("profile group") -def group_users(group_id: uuid.UUID) -> Response: - """Retrieve all the members of a group.""" - with require_oauth.acquire("profile group") as the_token: - db_uri = current_app.config["AUTH_DB"] - with db.connection(db_uri) as conn: - return jsonify(tuple( - dictify(user) for user in _group_users(conn, group_id))) |