aboutsummaryrefslogtreecommitdiff
path: root/gn_auth/auth/authentication/oauth2/views.py
blob: fc2776800bd4636a4f2f5333e9ea1a4e9786893a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
"""Endpoints for the oauth2 server"""
import uuid
import traceback
from urllib.parse import urlparse

from authlib.oauth2.rfc6749.errors import InvalidClientError
from email_validator import validate_email, EmailNotValidError
from flask import (
    flash,
    request,
    url_for,
    redirect,
    Response,
    Blueprint,
    render_template,
    current_app as app)

from gn_auth.auth.db import sqlite3 as db
from gn_auth.auth.db.sqlite3 import with_db_connection
from gn_auth.auth.errors import NotFoundError, ForbiddenAccess
from gn_auth.auth.authentication.users import valid_login, user_by_email

from .resource_server import require_oauth
from .endpoints.revocation import RevocationEndpoint
from .endpoints.introspection import IntrospectionEndpoint


auth = Blueprint("auth", __name__)

@auth.route("/delete-client/<uuid:client_id>", methods=["GET", "POST"])
def delete_client(client_id: uuid.UUID):
    """Delete an OAuth2 client."""
    return f"WOULD DELETE OAUTH2 CLIENT {client_id}."

@auth.route("/authorise", methods=["GET", "POST"])
def authorise():
    """Authorise a user"""
    try:
        server = app.config["OAUTH2_SERVER"]
        client_id = uuid.UUID(request.args.get(
            "client_id",
            request.form.get("client_id", str(uuid.uuid4()))))
        client = server.query_client(client_id)
        if not bool(client):
            flash("Invalid OAuth2 client.", "alert-danger")

        if request.method == "GET":
            client = server.query_client(request.args.get("client_id"))
            _src = urlparse(request.args["redirect_uri"])
            return render_template(
                "oauth2/authorise-user.html",
                client=client,
                scope=client.scope,
                response_type=request.args["response_type"],
                redirect_uri=request.args["redirect_uri"],
                source_uri=f"{_src.scheme}://{_src.netloc}/")

        form = request.form
        def __authorise__(conn: db.DbConnection):
            email_passwd_msg = "Email or password is invalid!"
            redirect_response = redirect(url_for("oauth2.auth.authorise",
                                                 response_type=form["response_type"],
                                                 client_id=client_id,
                                                 redirect_uri=form["redirect_uri"]))
            try:
                email = validate_email(
                    form.get("user:email"), check_deliverability=False)
                user = user_by_email(conn, email["email"])
                if valid_login(conn, user, form.get("user:password", "")):
                    if not user.verified:
                        return redirect(
                            url_for("oauth2.users.handle_unverified",
                                    response_type=form["response_type"],
                                    client_id=client_id,
                                    redirect_uri=form["redirect_uri"]),
                            code=307)
                    return server.create_authorization_response(request=request, grant_user=user)
                flash(email_passwd_msg, "alert-danger")
                return redirect_response # type: ignore[return-value]
            except EmailNotValidError as _enve:
                app.logger.debug(traceback.format_exc())
                flash(email_passwd_msg, "alert-danger")
                return redirect_response # type: ignore[return-value]
            except NotFoundError as _nfe:
                app.logger.debug(traceback.format_exc())
                flash(email_passwd_msg, "alert-danger")
                return redirect_response # type: ignore[return-value]

        return with_db_connection(__authorise__)
    except InvalidClientError as ice:
        return render_template(
            "oauth2/oauth2_error.html", error=ice), ice.status_code

@auth.route("/token", methods=["POST"])
def token():
    """Retrieve the authorisation token."""
    server = app.config["OAUTH2_SERVER"]
    return server.create_token_response()

@auth.route("/revoke", methods=["POST"])
def revoke_token():
    """Revoke the token."""
    return app.config["OAUTH2_SERVER"].create_endpoint_response(
        RevocationEndpoint.ENDPOINT_NAME)

@auth.route("/introspect", methods=["POST"])
@require_oauth("introspect")
def introspect_token() -> Response:
    """Provide introspection information for the token."""
    # This is dangerous to provide publicly
    authorised_clients = app.config.get(
        "OAUTH2_CLIENTS_WITH_INTROSPECTION_PRIVILEGE", [])
    with require_oauth.acquire("introspect") as the_token:
        if the_token.client.client_id in authorised_clients:
            return app.config["OAUTH2_SERVER"].create_endpoint_response(
                IntrospectionEndpoint.ENDPOINT_NAME)

    raise ForbiddenAccess("You cannot access this endpoint")