diff options
Diffstat (limited to 'gn3/auth/authorisation/users/views.py')
-rw-r--r-- | gn3/auth/authorisation/users/views.py | 149 |
1 files changed, 149 insertions, 0 deletions
diff --git a/gn3/auth/authorisation/users/views.py b/gn3/auth/authorisation/users/views.py new file mode 100644 index 0000000..460f81c --- /dev/null +++ b/gn3/auth/authorisation/users/views.py @@ -0,0 +1,149 @@ +"""User authorisation endpoints.""" +import traceback +from typing import Tuple, Optional + +import sqlite3 +from flask import request, jsonify, Response, Blueprint, current_app + +from gn3.auth import db +from gn3.auth.dictify import dictify + +from ..groups.models import user_group as _user_group +from ..errors import NotFoundError, UserRegistrationError +from ..resources.models import user_resources as _user_resources +from ..roles.models import assign_default_roles, user_roles as _user_roles + +from ...authentication.oauth2.resource_server import require_oauth +from ...authentication.users import save_user, set_user_password +from ...authentication.oauth2.models.oauth2token import token_by_access_token + +users = Blueprint("users", __name__) + +@users.route("/", methods=["GET"]) +@require_oauth("profile") +def user_details() -> Response: + """Return user's details.""" + with require_oauth.acquire("profile") as the_token: + user = the_token.user + user_dets = { + "user_id": user.user_id, "email": user.email, "name": user.name, + "group": False + } + with db.connection(current_app.config["AUTH_DB"]) as conn, db.cursor(conn) as cursor: + the_group = _user_group(cursor, user).maybe(# type: ignore[misc] + False, lambda grp: grp)# type: ignore[arg-type] + return jsonify({ + **user_dets, + "group": dictify(the_group) if the_group else False + }) + +@users.route("/roles", methods=["GET"]) +@require_oauth("role") +def user_roles() -> Response: + """Return the non-resource roles assigned to the user.""" + with require_oauth.acquire("role") as token: + with db.connection(current_app.config["AUTH_DB"]) as conn: + return jsonify(tuple( + dictify(role) for role in + _user_roles(conn, token.user).maybe(# type: ignore[misc] + tuple(), lambda roles: roles))) + +def __email_valid__(email: str) -> Tuple[bool, Optional[str]]: + """Validate the email address.""" + if email == "": + return False, "Empty email address" + + ## Check that the address is a valid email address + ## Review use of `email-validator` or `pyIsEmail` python packages for + ## validating the emails, if it turns out this is important. + + ## Success + return True, None + +def __password_valid__(password, confirm_password) -> Tuple[bool, Optional[str]]: + if password == "" or confirm_password == "": + return False, "Empty password value" + + if password != confirm_password: + return False, "Mismatched password values" + + return True, None + +def __user_name_valid__(name: str) -> Tuple[bool, Optional[str]]: + if name == "": + return False, "User's name not provided." + + return True, None + +def __assert_not_logged_in__(conn: db.DbConnection): + bearer = request.headers.get('Authorization') + if bearer: + token = token_by_access_token(conn, bearer.split(None)[1]).maybe(# type: ignore[misc] + False, lambda tok: tok) + if token: + raise UserRegistrationError( + "Cannot register user while authenticated") + +@users.route("/register", methods=["POST"]) +def register_user() -> Response: + """Register a user.""" + with db.connection(current_app.config["AUTH_DB"]) as conn: + __assert_not_logged_in__(conn) + + form = request.form + email = form.get("email", "").strip() + password = form.get("password", "").strip() + user_name = form.get("user_name", "").strip() + errors = tuple( + error for valid,error in + [__email_valid__(email), + __password_valid__( + password, form.get("confirm_password", "").strip()), + __user_name_valid__(user_name)] + if not valid) + if len(errors) > 0: + raise UserRegistrationError(*errors) + + try: + with db.cursor(conn) as cursor: + user, _hashed_password = set_user_password( + cursor, save_user(cursor, email, user_name), password) + assign_default_roles(cursor, user) + return jsonify( + { + "user_id": user.user_id, + "email": user.email, + "name": user.name + }) + except sqlite3.IntegrityError as sq3ie: + current_app.logger.debug(traceback.format_exc()) + raise UserRegistrationError( + "A user with that email already exists") from sq3ie + + raise Exception( + "unknown_error", "The system experienced an unexpected error.") + +@users.route("/group", methods=["GET"]) +@require_oauth("profile group") +def user_group() -> Response: + """Retrieve the group in which the user is a member.""" + with require_oauth.acquire("profile group") as the_token: + db_uri = current_app.config["AUTH_DB"] + with db.connection(db_uri) as conn, db.cursor(conn) as cursor: + group = _user_group(cursor, the_token.user).maybe(# type: ignore[misc] + False, lambda grp: grp)# type: ignore[arg-type] + + if group: + return jsonify(dictify(group)) + raise NotFoundError("User is not a member of any group.") + +@users.route("/resources") +@require_oauth("profile resource") +def user_resources() -> Response: + """Retrieve the resources a user has access to.""" + with require_oauth.acquire("profile resource") as the_token: + db_uri = current_app.config["AUTH_DB"] + with db.connection(db_uri) as conn: + return jsonify([ + dictify(resource) for resource in + _user_resources(conn, the_token.user)]) |