aboutsummaryrefslogtreecommitdiff
path: root/gn_auth/auth/authorisation/users/views.py
diff options
context:
space:
mode:
Diffstat (limited to 'gn_auth/auth/authorisation/users/views.py')
-rw-r--r--gn_auth/auth/authorisation/users/views.py178
1 files changed, 166 insertions, 12 deletions
diff --git a/gn_auth/auth/authorisation/users/views.py b/gn_auth/auth/authorisation/users/views.py
index 8135ed3..7adcd06 100644
--- a/gn_auth/auth/authorisation/users/views.py
+++ b/gn_auth/auth/authorisation/users/views.py
@@ -1,12 +1,13 @@
"""User authorisation endpoints."""
+import uuid
import sqlite3
import secrets
-import datetime
import traceback
from typing import Any
from functools import partial
from dataclasses import asdict
from urllib.parse import urljoin
+from datetime import datetime, timedelta
from email.headerregistry import Address
from email_validator import validate_email, EmailNotValidError
from flask import (
@@ -123,7 +124,7 @@ def send_verification_email(
"""Send an email verification message."""
subject="GeneNetwork: Please Verify Your Email"
verification_code = secrets.token_urlsafe(64)
- generated = datetime.datetime.now()
+ generated = datetime.now()
expiration_minutes = 15
def __render__(template):
return render_template(template,
@@ -148,12 +149,13 @@ def send_verification_email(
"generated": int(generated.timestamp()),
"expires": int(
(generated +
- datetime.timedelta(
+ timedelta(
minutes=expiration_minutes)).timestamp())
})
- send_message(smtp_user=current_app.config["SMTP_USER"],
- smtp_passwd=current_app.config["SMTP_PASSWORD"],
+ send_message(smtp_user=current_app.config.get("SMTP_USER", ""),
+ smtp_passwd=current_app.config.get("SMTP_PASSWORD", ""),
message=build_email_message(
+ from_address=current_app.config["EMAIL_ADDRESS"],
to_addresses=(user_address(user),),
subject=subject,
txtmessage=__render__("emails/verify-email.txt"),
@@ -187,11 +189,11 @@ def register_user() -> Response:
redirect_uri=form["redirect_uri"])
return jsonify(asdict(user))
except sqlite3.IntegrityError as sq3ie:
- current_app.logger.debug(traceback.format_exc())
+ current_app.logger.error(traceback.format_exc())
raise UserRegistrationError(
"A user with that email already exists") from sq3ie
except EmailNotValidError as enve:
- current_app.logger.debug(traceback.format_exc())
+ current_app.logger.error(traceback.format_exc())
raise(UserRegistrationError(f"Email Error: {str(enve)}")) from enve
raise Exception(
@@ -235,11 +237,12 @@ def verify_user():
return loginuri
results = results[0]
- if (datetime.datetime.fromtimestamp(
- int(results["expires"])) < datetime.datetime.now()):
+ if (datetime.fromtimestamp(
+ int(results["expires"])) < datetime.now()):
delete_verification_code(cursor, verificationcode)
flash("Invalid verification code: code has expired.",
"alert-danger")
+ return loginuri
# Code is good!
delete_verification_code(cursor, verificationcode)
@@ -310,15 +313,29 @@ def list_all_users() -> Response:
@users.route("/handle-unverified", methods=["POST"])
def handle_unverified():
"""Handle case where user tries to login but is unverified"""
- form = request_json()
+ email = request.args["email"]
# TODO: Maybe have a GN2_URI setting here?
# or pass the client_id here?
+ with (db.connection(current_app.config["AUTH_DB"]) as conn,
+ db.cursor(conn) as cursor):
+ cursor.execute(
+ "DELETE FROM user_verification_codes WHERE expires <= ?",
+ (int(datetime.now().timestamp()),))
+ cursor.execute(
+ "SELECT u.user_id, u.email, uvc.* FROM users AS u "
+ "INNER JOIN user_verification_codes AS uvc "
+ "ON u.user_id=uvc.user_id "
+ "WHERE u.email=?",
+ (email,))
+ token_found = bool(cursor.fetchone())
+
return render_template(
"users/unverified-user.html",
- email=form.get("user:email"),
+ email=email,
response_type=request.args["response_type"],
client_id=request.args["client_id"],
- redirect_uri=request.args["redirect_uri"])
+ redirect_uri=request.args["redirect_uri"],
+ token_found=token_found)
@users.route("/send-verification", methods=["POST"])
def send_verification_code():
@@ -350,3 +367,140 @@ def send_verification_code():
})
resp.code = 400
return resp
+
+
+def send_forgot_password_email(
+ conn,
+ user: User,
+ client_id: uuid.UUID,
+ redirect_uri: str,
+ response_type: str
+):
+ """Send the 'forgot-password' email."""
+ subject="GeneNetwork: Change Your Password"
+ token = secrets.token_urlsafe(64)
+ generated = datetime.now()
+ expiration_minutes = 15
+ def __render__(template):
+ return render_template(template,
+ subject=subject,
+ forgot_password_uri=urljoin(
+ request.url,
+ url_for("oauth2.users.change_password",
+ forgot_password_token=token,
+ client_id=client_id,
+ redirect_uri=redirect_uri,
+ response_type=response_type)),
+ expiration_minutes=expiration_minutes)
+
+ with db.cursor(conn) as cursor:
+ cursor.execute(
+ ("INSERT OR REPLACE INTO "
+ "forgot_password_tokens(user_id, token, generated, expires) "
+ "VALUES (:user_id, :token, :generated, :expires)"),
+ {
+ "user_id": str(user.user_id),
+ "token": token,
+ "generated": int(generated.timestamp()),
+ "expires": int(
+ (generated +
+ timedelta(
+ minutes=expiration_minutes)).timestamp())
+ })
+
+ send_message(smtp_user=current_app.config["SMTP_USER"],
+ smtp_passwd=current_app.config["SMTP_PASSWORD"],
+ message=build_email_message(
+ from_address=current_app.config["EMAIL_ADDRESS"],
+ to_addresses=(user_address(user),),
+ subject=subject,
+ txtmessage=__render__("emails/forgot-password.txt"),
+ htmlmessage=__render__("emails/forgot-password.html")),
+ host=current_app.config["SMTP_HOST"],
+ port=current_app.config["SMTP_PORT"])
+
+
+@users.route("/forgot-password", methods=["GET", "POST"])
+def forgot_password():
+ """Enable user to request password change."""
+ if request.method == "GET":
+ return render_template("users/forgot-password.html",
+ client_id=request.args["client_id"],
+ redirect_uri=request.args["redirect_uri"],
+ response_type=request.args["response_type"])
+
+ form = request.form
+ email = form.get("email", "").strip()
+ if not bool(email):
+ flash("You MUST provide an email.", "alert-danger")
+ return redirect(url_for("oauth2.users.forgot_password"))
+
+ with db.connection(current_app.config["AUTH_DB"]) as conn:
+ user = user_by_email(conn, form["email"])
+ if not bool(user):
+ flash("We could not find an account with that email.",
+ "alert-danger")
+ return redirect(url_for("oauth2.users.forgot_password"))
+
+ send_forgot_password_email(conn,
+ user,
+ request.args["client_id"],
+ request.args["redirect_uri"],
+ request.args["response_type"])
+ return render_template("users/forgot-password-token-send-success.html",
+ email=form["email"])
+
+
+@users.route("/change-password/<forgot_password_token>", methods=["GET", "POST"])
+def change_password(forgot_password_token):
+ """Enable user to perform password change."""
+ login_page = redirect(url_for("oauth2.auth.authorise",
+ client_id=request.args["client_id"],
+ redirect_uri=request.args["redirect_uri"],
+ response_type=request.args["response_type"]))
+ with (db.connection(current_app.config["AUTH_DB"]) as conn,
+ db.cursor(conn) as cursor):
+ cursor.execute("DELETE FROM forgot_password_tokens WHERE expires<=?",
+ (int(datetime.now().timestamp()),))
+ cursor.execute(
+ "SELECT fpt.*, u.email FROM forgot_password_tokens AS fpt "
+ "INNER JOIN users AS u ON fpt.user_id=u.user_id WHERE token=?",
+ (forgot_password_token,))
+ token = cursor.fetchone()
+ if request.method == "GET":
+ if bool(token):
+ return render_template(
+ "users/change-password.html",
+ email=token["email"],
+ client_id=request.args["client_id"],
+ redirect_uri=request.args["redirect_uri"],
+ response_type=request.args["response_type"],
+ forgot_password_token=forgot_password_token)
+ flash("Invalid Token: We cannot change your password!",
+ "alert-danger")
+ return login_page
+
+ password = request.form["password"]
+ confirm_password = request.form["confirm-password"]
+ change_password_page = redirect(url_for(
+ "oauth2.users.change_password",
+ client_id=request.args["client_id"],
+ redirect_uri=request.args["redirect_uri"],
+ response_type=request.args["response_type"],
+ forgot_password_token=forgot_password_token))
+ if bool(password) and bool(confirm_password):
+ if password == confirm_password:
+ _user, _hashed_password = set_user_password(
+ cursor, user_by_email(conn, token["email"]), password)
+ cursor.execute(
+ "DELETE FROM forgot_password_tokens WHERE token=?",
+ (forgot_password_token,))
+ flash("Password changed successfully!", "alert-success")
+ return login_page
+
+ flash("Passwords do not match!", "alert-danger")
+ return change_password_page
+
+ flash("Both the password and its confirmation MUST be provided!",
+ "alert-danger")
+ return change_password_page