aboutsummaryrefslogtreecommitdiff
path: root/gn3/auth/authentication/oauth2/views.py
diff options
context:
space:
mode:
Diffstat (limited to 'gn3/auth/authentication/oauth2/views.py')
-rw-r--r--gn3/auth/authentication/oauth2/views.py52
1 files changed, 50 insertions, 2 deletions
diff --git a/gn3/auth/authentication/oauth2/views.py b/gn3/auth/authentication/oauth2/views.py
index 3a14a48..48a97da 100644
--- a/gn3/auth/authentication/oauth2/views.py
+++ b/gn3/auth/authentication/oauth2/views.py
@@ -1,14 +1,28 @@
"""Endpoints for the oauth2 server"""
import uuid
+import traceback
-from flask import Response, Blueprint, current_app as app
+from email_validator import validate_email, EmailNotValidError
+from flask import (
+ flash,
+ request,
+ url_for,
+ redirect,
+ Response,
+ Blueprint,
+ render_template,
+ current_app as app)
+from gn3.auth import db
+from gn3.auth.db_utils import with_db_connection
from gn3.auth.authorisation.errors import ForbiddenAccess
from .resource_server import require_oauth
from .endpoints.revocation import RevocationEndpoint
from .endpoints.introspection import IntrospectionEndpoint
+from ..users import valid_login, NotFoundError, user_by_email
+
auth = Blueprint("auth", __name__)
@auth.route("/register-client", methods=["GET", "POST"])
@@ -24,7 +38,41 @@ def delete_client(client_id: uuid.UUID):
@auth.route("/authorise", methods=["GET", "POST"])
def authorise():
"""Authorise a user"""
- return "WOULD AUTHORISE THE USER."
+ server = app.config["OAUTH2_SERVER"]
+ client_id = uuid.UUID(request.args.get("client_id", str(uuid.uuid4())))
+ client = server.query_client(client_id)
+ if not bool(client):
+ flash("Invalid OAuth2 client.", "alert-error")
+ if request.method == "GET":
+ client = server.query_client(request.args.get("client_id"))
+ return render_template(
+ "oauth2/authorise-user.html",
+ client=client,
+ scope=client.scope,
+ response_type="code")
+
+ form = request.form
+ def __authorise__(conn: db.DbConnection) -> Response:
+ email_passwd_msg = "Email or password is invalid!"
+ redirect_response = redirect(url_for("oauth2.auth.authorise",
+ client_id=client_id))
+ try:
+ email = validate_email(form.get("user:email"))
+ user = user_by_email(conn, email["email"])
+ if valid_login(conn, user, form.get("user:password", "")):
+ return server.create_authorization_response(request=request, grant_user=user)
+ flash(email_passwd_msg, "alert-error")
+ return redirect_response # type: ignore[return-value]
+ except EmailNotValidError as _enve:
+ app.logger.debug(traceback.format_exc())
+ flash(email_passwd_msg, "alert-error")
+ return redirect_response # type: ignore[return-value]
+ except NotFoundError as _nfe:
+ app.logger.debug(traceback.format_exc())
+ flash(email_passwd_msg, "alert-error")
+ return redirect_response # type: ignore[return-value]
+
+ return with_db_connection(__authorise__)
@auth.route("/token", methods=["POST"])
def token():