diff options
Diffstat (limited to 'gn_auth/auth/authorisation/users/views.py')
-rw-r--r-- | gn_auth/auth/authorisation/users/views.py | 178 |
1 files changed, 166 insertions, 12 deletions
diff --git a/gn_auth/auth/authorisation/users/views.py b/gn_auth/auth/authorisation/users/views.py index 8135ed3..7adcd06 100644 --- a/gn_auth/auth/authorisation/users/views.py +++ b/gn_auth/auth/authorisation/users/views.py @@ -1,12 +1,13 @@ """User authorisation endpoints.""" +import uuid import sqlite3 import secrets -import datetime import traceback from typing import Any from functools import partial from dataclasses import asdict from urllib.parse import urljoin +from datetime import datetime, timedelta from email.headerregistry import Address from email_validator import validate_email, EmailNotValidError from flask import ( @@ -123,7 +124,7 @@ def send_verification_email( """Send an email verification message.""" subject="GeneNetwork: Please Verify Your Email" verification_code = secrets.token_urlsafe(64) - generated = datetime.datetime.now() + generated = datetime.now() expiration_minutes = 15 def __render__(template): return render_template(template, @@ -148,12 +149,13 @@ def send_verification_email( "generated": int(generated.timestamp()), "expires": int( (generated + - datetime.timedelta( + timedelta( minutes=expiration_minutes)).timestamp()) }) - send_message(smtp_user=current_app.config["SMTP_USER"], - smtp_passwd=current_app.config["SMTP_PASSWORD"], + send_message(smtp_user=current_app.config.get("SMTP_USER", ""), + smtp_passwd=current_app.config.get("SMTP_PASSWORD", ""), message=build_email_message( + from_address=current_app.config["EMAIL_ADDRESS"], to_addresses=(user_address(user),), subject=subject, txtmessage=__render__("emails/verify-email.txt"), @@ -187,11 +189,11 @@ def register_user() -> Response: redirect_uri=form["redirect_uri"]) return jsonify(asdict(user)) except sqlite3.IntegrityError as sq3ie: - current_app.logger.debug(traceback.format_exc()) + current_app.logger.error(traceback.format_exc()) raise UserRegistrationError( "A user with that email already exists") from sq3ie except EmailNotValidError as enve: - current_app.logger.debug(traceback.format_exc()) + current_app.logger.error(traceback.format_exc()) raise(UserRegistrationError(f"Email Error: {str(enve)}")) from enve raise Exception( @@ -235,11 +237,12 @@ def verify_user(): return loginuri results = results[0] - if (datetime.datetime.fromtimestamp( - int(results["expires"])) < datetime.datetime.now()): + if (datetime.fromtimestamp( + int(results["expires"])) < datetime.now()): delete_verification_code(cursor, verificationcode) flash("Invalid verification code: code has expired.", "alert-danger") + return loginuri # Code is good! delete_verification_code(cursor, verificationcode) @@ -310,15 +313,29 @@ def list_all_users() -> Response: @users.route("/handle-unverified", methods=["POST"]) def handle_unverified(): """Handle case where user tries to login but is unverified""" - form = request_json() + email = request.args["email"] # TODO: Maybe have a GN2_URI setting here? # or pass the client_id here? + with (db.connection(current_app.config["AUTH_DB"]) as conn, + db.cursor(conn) as cursor): + cursor.execute( + "DELETE FROM user_verification_codes WHERE expires <= ?", + (int(datetime.now().timestamp()),)) + cursor.execute( + "SELECT u.user_id, u.email, uvc.* FROM users AS u " + "INNER JOIN user_verification_codes AS uvc " + "ON u.user_id=uvc.user_id " + "WHERE u.email=?", + (email,)) + token_found = bool(cursor.fetchone()) + return render_template( "users/unverified-user.html", - email=form.get("user:email"), + email=email, response_type=request.args["response_type"], client_id=request.args["client_id"], - redirect_uri=request.args["redirect_uri"]) + redirect_uri=request.args["redirect_uri"], + token_found=token_found) @users.route("/send-verification", methods=["POST"]) def send_verification_code(): @@ -350,3 +367,140 @@ def send_verification_code(): }) resp.code = 400 return resp + + +def send_forgot_password_email( + conn, + user: User, + client_id: uuid.UUID, + redirect_uri: str, + response_type: str +): + """Send the 'forgot-password' email.""" + subject="GeneNetwork: Change Your Password" + token = secrets.token_urlsafe(64) + generated = datetime.now() + expiration_minutes = 15 + def __render__(template): + return render_template(template, + subject=subject, + forgot_password_uri=urljoin( + request.url, + url_for("oauth2.users.change_password", + forgot_password_token=token, + client_id=client_id, + redirect_uri=redirect_uri, + response_type=response_type)), + expiration_minutes=expiration_minutes) + + with db.cursor(conn) as cursor: + cursor.execute( + ("INSERT OR REPLACE INTO " + "forgot_password_tokens(user_id, token, generated, expires) " + "VALUES (:user_id, :token, :generated, :expires)"), + { + "user_id": str(user.user_id), + "token": token, + "generated": int(generated.timestamp()), + "expires": int( + (generated + + timedelta( + minutes=expiration_minutes)).timestamp()) + }) + + send_message(smtp_user=current_app.config["SMTP_USER"], + smtp_passwd=current_app.config["SMTP_PASSWORD"], + message=build_email_message( + from_address=current_app.config["EMAIL_ADDRESS"], + to_addresses=(user_address(user),), + subject=subject, + txtmessage=__render__("emails/forgot-password.txt"), + htmlmessage=__render__("emails/forgot-password.html")), + host=current_app.config["SMTP_HOST"], + port=current_app.config["SMTP_PORT"]) + + +@users.route("/forgot-password", methods=["GET", "POST"]) +def forgot_password(): + """Enable user to request password change.""" + if request.method == "GET": + return render_template("users/forgot-password.html", + client_id=request.args["client_id"], + redirect_uri=request.args["redirect_uri"], + response_type=request.args["response_type"]) + + form = request.form + email = form.get("email", "").strip() + if not bool(email): + flash("You MUST provide an email.", "alert-danger") + return redirect(url_for("oauth2.users.forgot_password")) + + with db.connection(current_app.config["AUTH_DB"]) as conn: + user = user_by_email(conn, form["email"]) + if not bool(user): + flash("We could not find an account with that email.", + "alert-danger") + return redirect(url_for("oauth2.users.forgot_password")) + + send_forgot_password_email(conn, + user, + request.args["client_id"], + request.args["redirect_uri"], + request.args["response_type"]) + return render_template("users/forgot-password-token-send-success.html", + email=form["email"]) + + +@users.route("/change-password/<forgot_password_token>", methods=["GET", "POST"]) +def change_password(forgot_password_token): + """Enable user to perform password change.""" + login_page = redirect(url_for("oauth2.auth.authorise", + client_id=request.args["client_id"], + redirect_uri=request.args["redirect_uri"], + response_type=request.args["response_type"])) + with (db.connection(current_app.config["AUTH_DB"]) as conn, + db.cursor(conn) as cursor): + cursor.execute("DELETE FROM forgot_password_tokens WHERE expires<=?", + (int(datetime.now().timestamp()),)) + cursor.execute( + "SELECT fpt.*, u.email FROM forgot_password_tokens AS fpt " + "INNER JOIN users AS u ON fpt.user_id=u.user_id WHERE token=?", + (forgot_password_token,)) + token = cursor.fetchone() + if request.method == "GET": + if bool(token): + return render_template( + "users/change-password.html", + email=token["email"], + client_id=request.args["client_id"], + redirect_uri=request.args["redirect_uri"], + response_type=request.args["response_type"], + forgot_password_token=forgot_password_token) + flash("Invalid Token: We cannot change your password!", + "alert-danger") + return login_page + + password = request.form["password"] + confirm_password = request.form["confirm-password"] + change_password_page = redirect(url_for( + "oauth2.users.change_password", + client_id=request.args["client_id"], + redirect_uri=request.args["redirect_uri"], + response_type=request.args["response_type"], + forgot_password_token=forgot_password_token)) + if bool(password) and bool(confirm_password): + if password == confirm_password: + _user, _hashed_password = set_user_password( + cursor, user_by_email(conn, token["email"]), password) + cursor.execute( + "DELETE FROM forgot_password_tokens WHERE token=?", + (forgot_password_token,)) + flash("Password changed successfully!", "alert-success") + return login_page + + flash("Passwords do not match!", "alert-danger") + return change_password_page + + flash("Both the password and its confirmation MUST be provided!", + "alert-danger") + return change_password_page |