1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
|
"""Endpoints for the oauth2 server"""
import uuid
import traceback
from urllib.parse import urlparse
from authlib.oauth2.rfc6749.errors import InvalidClientError
from email_validator import validate_email, EmailNotValidError
from flask import (
flash,
request,
url_for,
jsonify,
redirect,
Response,
Blueprint,
render_template,
current_app as app)
from gn_auth.auth.db import sqlite3 as db
from gn_auth.auth.db.sqlite3 import with_db_connection
from gn_auth.auth.jwks import jwks_directory, list_jwks
from gn_auth.auth.errors import NotFoundError, ForbiddenAccess
from gn_auth.auth.authentication.users import valid_login, user_by_email
from .resource_server import require_oauth
from .endpoints.revocation import RevocationEndpoint
from .endpoints.introspection import IntrospectionEndpoint
auth = Blueprint("auth", __name__)
@auth.route("/delete-client/<uuid:client_id>", methods=["GET", "POST"])
def delete_client(client_id: uuid.UUID):
"""Delete an OAuth2 client."""
return f"WOULD DELETE OAUTH2 CLIENT {client_id}."
@auth.route("/authorise", methods=["GET", "POST"])
def authorise():
"""Authorise a user"""
try:
server = app.config["OAUTH2_SERVER"]
client_id = uuid.UUID(request.args.get("client_id")
or request.form.get("client_id")
or str(uuid.uuid4()))
client = server.query_client(client_id)
if not bool(client):
flash("Invalid OAuth2 client.", "alert-danger")
if request.method == "GET":
def __forgot_password_table_exists__(conn):
with db.cursor(conn) as cursor:
cursor.execute("SELECT name FROM sqlite_master "
"WHERE type='table' "
"AND name='forgot_password_tokens'")
return bool(cursor.fetchone())
return False
client = server.query_client(request.args.get("client_id"))
_src = urlparse(request.args["redirect_uri"])
return render_template(
"oauth2/authorise-user.html",
client=client,
scope=client.scope,
response_type=request.args["response_type"],
redirect_uri=request.args["redirect_uri"],
source_uri=f"{_src.scheme}://{_src.netloc}/",
display_forgot_password=with_db_connection(
__forgot_password_table_exists__))
form = request.form
def __authorise__(conn: db.DbConnection):
email_passwd_msg = "Email or password is invalid!"
redirect_response = redirect(url_for("oauth2.auth.authorise",
response_type=form["response_type"],
client_id=client_id,
redirect_uri=form["redirect_uri"]))
try:
email = validate_email(
form.get("user:email"), check_deliverability=False)
user = user_by_email(conn, email["email"])
if valid_login(conn, user, form.get("user:password", "")):
if not user.verified:
return redirect(
url_for("oauth2.users.handle_unverified",
response_type=form["response_type"],
client_id=client_id,
redirect_uri=form["redirect_uri"],
email=email["email"]),
code=307)
return server.create_authorization_response(request=request, grant_user=user)
flash(email_passwd_msg, "alert-danger")
return redirect_response # type: ignore[return-value]
except EmailNotValidError as _enve:
app.logger.debug(traceback.format_exc())
flash(email_passwd_msg, "alert-danger")
return redirect_response # type: ignore[return-value]
except NotFoundError as _nfe:
app.logger.debug(traceback.format_exc())
flash(email_passwd_msg, "alert-danger")
return redirect_response # type: ignore[return-value]
return with_db_connection(__authorise__)
except InvalidClientError as ice:
return render_template(
"oauth2/oauth2_error.html", error=ice), ice.status_code
@auth.route("/token", methods=["POST"])
def token():
"""Retrieve the authorisation token."""
server = app.config["OAUTH2_SERVER"]
return server.create_token_response()
@auth.route("/revoke", methods=["POST"])
def revoke_token():
"""Revoke the token."""
return app.config["OAUTH2_SERVER"].create_endpoint_response(
RevocationEndpoint.ENDPOINT_NAME)
@auth.route("/introspect", methods=["POST"])
@require_oauth("introspect")
def introspect_token() -> Response:
"""Provide introspection information for the token."""
# This is dangerous to provide publicly
authorised_clients = app.config.get(
"OAUTH2_CLIENTS_WITH_INTROSPECTION_PRIVILEGE", [])
with require_oauth.acquire("introspect") as the_token:
if the_token.client.client_id in authorised_clients:
return app.config["OAUTH2_SERVER"].create_endpoint_response(
IntrospectionEndpoint.ENDPOINT_NAME)
raise ForbiddenAccess("You cannot access this endpoint")
@auth.route("/public-jwks", methods=["GET"])
def public_jwks():
"""Provide the JWK public keys used by this application."""
return jsonify({
"documentation": (
"The keys are listed in order of creation, from the oldest (first) "
"to the newest (last)."),
"jwks": tuple(key.as_dict() for key in list_jwks(jwks_directory(app)))})
|