aboutsummaryrefslogtreecommitdiff
path: root/gn3/auth/authorisation/views.py
blob: 1c59ed13b8d60f619bb14996de716e9048f69399 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
"""Endpoints for the authorisation stuff."""
import traceback
from typing import Tuple, Optional

import sqlite3
from flask import request, jsonify, current_app

from gn3.auth import db
from gn3.auth.blueprint import oauth2

from .groups import user_group
from .errors import UserRegistrationError
from .roles import assign_default_roles, user_roles as _user_roles

from ..authentication.oauth2.resource_server import require_oauth
from ..authentication.users import save_user, set_user_password
from ..authentication.oauth2.models.oauth2token import token_by_access_token

@oauth2.route("/user", methods=["GET"])
@require_oauth("profile")
def user_details():
    """Return user's details."""
    with require_oauth.acquire("profile") as the_token:
        user = the_token.user
        with db.connection(current_app.config["AUTH_DB"]) as conn, db.cursor(conn) as cursor:
            group = user_group(cursor, user)

        return jsonify({
            "user_id": user.user_id,
            "email": user.email,
            "name": user.name,
            "group": group.maybe(False, lambda grp: grp)
        })

@oauth2.route("/user-roles", methods=["GET"])
@require_oauth
def user_roles():
    """Return the non-resource roles assigned to the user."""
    with require_oauth.acquire("role") as token:
        with db.connection(current_app.config["AUTH_DB"]) as conn:
            return jsonify(_user_roles(conn, token.user))

def __email_valid__(email: str) -> Tuple[bool, Optional[str]]:
    """Validate the email address."""
    if email == "":
        return False, "Empty email address"

    ## Check that the address is a valid email address
    ## Review use of `email-validator` or `pyIsEmail` python packages for
    ## validating the emails, if it turns out this is important.

    ## Success
    return True, None

def __password_valid__(password, confirm_password) -> Tuple[bool, Optional[str]]:
    if password == "" or confirm_password == "":
        return False, "Empty password value"

    if password != confirm_password:
        return False, "Mismatched password values"

    return True, None

def __user_name_valid__(name: str) -> Tuple[bool, Optional[str]]:
    if name == "":
        return False, "User's name not provided."

    return True, None

def __assert_not_logged_in__(conn: db.DbConnection):
    bearer = request.headers.get('Authorization')
    if bearer:
        token = token_by_access_token(conn, bearer.split(None)[1]).maybe(# type: ignore[misc]
            False, lambda tok: tok)
        if token:
            raise UserRegistrationError(
                "Cannot register user while authenticated")

@oauth2.route("/register-user", methods=["POST"])
def register_user():
    """Register a user."""
    with db.connection(current_app.config["AUTH_DB"]) as conn:
        __assert_not_logged_in__(conn)

        form = request.form
        email = form.get("email", "").strip()
        password = form.get("password", "").strip()
        user_name = form.get("user_name", "").strip()
        errors = tuple(
                error for valid,error in
            [__email_valid__(email),
             __password_valid__(
                 password, form.get("confirm_password", "").strip()),
             __user_name_valid__(user_name)]
            if not valid)
        if len(errors) > 0:
            raise UserRegistrationError(*errors)

        try:
            with db.cursor(conn) as cursor:
                user, _hashed_password = set_user_password(
                    cursor, save_user(cursor, email, user_name), password)
                assign_default_roles(cursor, user)
                return jsonify(
                    {
                        "user_id": user.user_id,
                        "email": user.email,
                        "name": user.name
                    }), 200
        except sqlite3.IntegrityError as sq3ie:
            current_app.logger.debug(traceback.format_exc())
            raise UserRegistrationError(
                "A user with that email already exists") from sq3ie

    raise Exception(
        "unknown_error", "The system experienced an unexpected error.")