aboutsummaryrefslogtreecommitdiff
path: root/gn3/auth/authentication/oauth2/views.py
blob: 48a97daeac54447707c85e77ee0d7870d9db3214 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
"""Endpoints for the oauth2 server"""
import uuid
import traceback

from email_validator import validate_email, EmailNotValidError
from flask import (
    flash,
    request,
    url_for,
    redirect,
    Response,
    Blueprint,
    render_template,
    current_app as app)

from gn3.auth import db
from gn3.auth.db_utils import with_db_connection
from gn3.auth.authorisation.errors import ForbiddenAccess

from .resource_server import require_oauth
from .endpoints.revocation import RevocationEndpoint
from .endpoints.introspection import IntrospectionEndpoint

from ..users import valid_login, NotFoundError, user_by_email

auth = Blueprint("auth", __name__)

@auth.route("/register-client", methods=["GET", "POST"])
def register_client():
    """Register an OAuth2 client."""
    return "WOULD REGISTER ..."

@auth.route("/delete-client/<uuid:client_id>", methods=["GET", "POST"])
def delete_client(client_id: uuid.UUID):
    """Delete an OAuth2 client."""
    return f"WOULD DELETE OAUTH2 CLIENT {client_id}."

@auth.route("/authorise", methods=["GET", "POST"])
def authorise():
    """Authorise a user"""
    server = app.config["OAUTH2_SERVER"]
    client_id = uuid.UUID(request.args.get("client_id", str(uuid.uuid4())))
    client = server.query_client(client_id)
    if not bool(client):
        flash("Invalid OAuth2 client.", "alert-error")
    if request.method == "GET":
        client = server.query_client(request.args.get("client_id"))
        return render_template(
            "oauth2/authorise-user.html",
            client=client,
            scope=client.scope,
            response_type="code")

    form = request.form
    def __authorise__(conn: db.DbConnection) -> Response:
        email_passwd_msg = "Email or password is invalid!"
        redirect_response = redirect(url_for("oauth2.auth.authorise",
                                             client_id=client_id))
        try:
            email = validate_email(form.get("user:email"))
            user = user_by_email(conn, email["email"])
            if valid_login(conn, user, form.get("user:password", "")):
                return server.create_authorization_response(request=request, grant_user=user)
            flash(email_passwd_msg, "alert-error")
            return redirect_response # type: ignore[return-value]
        except EmailNotValidError as _enve:
            app.logger.debug(traceback.format_exc())
            flash(email_passwd_msg, "alert-error")
            return redirect_response # type: ignore[return-value]
        except NotFoundError as _nfe:
            app.logger.debug(traceback.format_exc())
            flash(email_passwd_msg, "alert-error")
            return redirect_response # type: ignore[return-value]

    return with_db_connection(__authorise__)

@auth.route("/token", methods=["POST"])
def token():
    """Retrieve the authorisation token."""
    server = app.config["OAUTH2_SERVER"]
    return server.create_token_response()

@auth.route("/revoke", methods=["POST"])
def revoke_token():
    """Revoke the token."""
    return app.config["OAUTH2_SERVER"].create_endpoint_response(
        RevocationEndpoint.ENDPOINT_NAME)

@auth.route("/introspect", methods=["POST"])
@require_oauth("introspect")
def introspect_token() -> Response:
    """Provide introspection information for the token."""
    # This is dangerous to provide publicly
    authorised_clients = app.config.get(
        "OAUTH2_CLIENTS_WITH_INTROSPECTION_PRIVILEGE", [])
    with require_oauth.acquire("introspect") as the_token:
        if the_token.client.client_id in authorised_clients:
            return app.config["OAUTH2_SERVER"].create_endpoint_response(
                IntrospectionEndpoint.ENDPOINT_NAME)

    raise ForbiddenAccess("You cannot access this endpoint")