aboutsummaryrefslogtreecommitdiff
path: root/gn3/auth/authorisation/users/views.py
diff options
context:
space:
mode:
Diffstat (limited to 'gn3/auth/authorisation/users/views.py')
-rw-r--r--gn3/auth/authorisation/users/views.py149
1 files changed, 149 insertions, 0 deletions
diff --git a/gn3/auth/authorisation/users/views.py b/gn3/auth/authorisation/users/views.py
new file mode 100644
index 0000000..460f81c
--- /dev/null
+++ b/gn3/auth/authorisation/users/views.py
@@ -0,0 +1,149 @@
+"""User authorisation endpoints."""
+import traceback
+from typing import Tuple, Optional
+
+import sqlite3
+from flask import request, jsonify, Response, Blueprint, current_app
+
+from gn3.auth import db
+from gn3.auth.dictify import dictify
+
+from ..groups.models import user_group as _user_group
+from ..errors import NotFoundError, UserRegistrationError
+from ..resources.models import user_resources as _user_resources
+from ..roles.models import assign_default_roles, user_roles as _user_roles
+
+from ...authentication.oauth2.resource_server import require_oauth
+from ...authentication.users import save_user, set_user_password
+from ...authentication.oauth2.models.oauth2token import token_by_access_token
+
+users = Blueprint("users", __name__)
+
+@users.route("/", methods=["GET"])
+@require_oauth("profile")
+def user_details() -> Response:
+ """Return user's details."""
+ with require_oauth.acquire("profile") as the_token:
+ user = the_token.user
+ user_dets = {
+ "user_id": user.user_id, "email": user.email, "name": user.name,
+ "group": False
+ }
+ with db.connection(current_app.config["AUTH_DB"]) as conn, db.cursor(conn) as cursor:
+ the_group = _user_group(cursor, user).maybe(# type: ignore[misc]
+ False, lambda grp: grp)# type: ignore[arg-type]
+ return jsonify({
+ **user_dets,
+ "group": dictify(the_group) if the_group else False
+ })
+
+@users.route("/roles", methods=["GET"])
+@require_oauth("role")
+def user_roles() -> Response:
+ """Return the non-resource roles assigned to the user."""
+ with require_oauth.acquire("role") as token:
+ with db.connection(current_app.config["AUTH_DB"]) as conn:
+ return jsonify(tuple(
+ dictify(role) for role in
+ _user_roles(conn, token.user).maybe(# type: ignore[misc]
+ tuple(), lambda roles: roles)))
+
+def __email_valid__(email: str) -> Tuple[bool, Optional[str]]:
+ """Validate the email address."""
+ if email == "":
+ return False, "Empty email address"
+
+ ## Check that the address is a valid email address
+ ## Review use of `email-validator` or `pyIsEmail` python packages for
+ ## validating the emails, if it turns out this is important.
+
+ ## Success
+ return True, None
+
+def __password_valid__(password, confirm_password) -> Tuple[bool, Optional[str]]:
+ if password == "" or confirm_password == "":
+ return False, "Empty password value"
+
+ if password != confirm_password:
+ return False, "Mismatched password values"
+
+ return True, None
+
+def __user_name_valid__(name: str) -> Tuple[bool, Optional[str]]:
+ if name == "":
+ return False, "User's name not provided."
+
+ return True, None
+
+def __assert_not_logged_in__(conn: db.DbConnection):
+ bearer = request.headers.get('Authorization')
+ if bearer:
+ token = token_by_access_token(conn, bearer.split(None)[1]).maybe(# type: ignore[misc]
+ False, lambda tok: tok)
+ if token:
+ raise UserRegistrationError(
+ "Cannot register user while authenticated")
+
+@users.route("/register", methods=["POST"])
+def register_user() -> Response:
+ """Register a user."""
+ with db.connection(current_app.config["AUTH_DB"]) as conn:
+ __assert_not_logged_in__(conn)
+
+ form = request.form
+ email = form.get("email", "").strip()
+ password = form.get("password", "").strip()
+ user_name = form.get("user_name", "").strip()
+ errors = tuple(
+ error for valid,error in
+ [__email_valid__(email),
+ __password_valid__(
+ password, form.get("confirm_password", "").strip()),
+ __user_name_valid__(user_name)]
+ if not valid)
+ if len(errors) > 0:
+ raise UserRegistrationError(*errors)
+
+ try:
+ with db.cursor(conn) as cursor:
+ user, _hashed_password = set_user_password(
+ cursor, save_user(cursor, email, user_name), password)
+ assign_default_roles(cursor, user)
+ return jsonify(
+ {
+ "user_id": user.user_id,
+ "email": user.email,
+ "name": user.name
+ })
+ except sqlite3.IntegrityError as sq3ie:
+ current_app.logger.debug(traceback.format_exc())
+ raise UserRegistrationError(
+ "A user with that email already exists") from sq3ie
+
+ raise Exception(
+ "unknown_error", "The system experienced an unexpected error.")
+
+@users.route("/group", methods=["GET"])
+@require_oauth("profile group")
+def user_group() -> Response:
+ """Retrieve the group in which the user is a member."""
+ with require_oauth.acquire("profile group") as the_token:
+ db_uri = current_app.config["AUTH_DB"]
+ with db.connection(db_uri) as conn, db.cursor(conn) as cursor:
+ group = _user_group(cursor, the_token.user).maybe(# type: ignore[misc]
+ False, lambda grp: grp)# type: ignore[arg-type]
+
+ if group:
+ return jsonify(dictify(group))
+ raise NotFoundError("User is not a member of any group.")
+
+@users.route("/resources")
+@require_oauth("profile resource")
+def user_resources() -> Response:
+ """Retrieve the resources a user has access to."""
+ with require_oauth.acquire("profile resource") as the_token:
+ db_uri = current_app.config["AUTH_DB"]
+ with db.connection(db_uri) as conn:
+ return jsonify([
+ dictify(resource) for resource in
+ _user_resources(conn, the_token.user)])