From 9fc7ab5701aaef66ec6fc47e10a2b6421e23f83f Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Thu, 15 Dec 2022 08:28:03 +0300 Subject: Enable authentication * gn3/auth/authentication/routes.py: Add `/login` endpoint and a function to validate the login credentials. * gn3/auth/authentication/users.py: add function to fetch user details by email --- gn3/auth/authentication/routes.py | 57 +++++++++++++++++++++++++++++++++++++++ gn3/auth/authentication/users.py | 14 ++++++++++ 2 files changed, 71 insertions(+) create mode 100644 gn3/auth/authentication/routes.py diff --git a/gn3/auth/authentication/routes.py b/gn3/auth/authentication/routes.py new file mode 100644 index 0000000..3b288d7 --- /dev/null +++ b/gn3/auth/authentication/routes.py @@ -0,0 +1,57 @@ +import requests + +import bcrypt +from flask import flash, jsonify, request, session, Blueprint + +from gn3.auth import db +from gn3.settings import AUTH_DB + +from .users import User, user_by_email + +auth_routes = Blueprint("auth", __name__) + +def valid_login(conn: db.DbConnection, user: User, password: str) -> bool: + """Check the validity of the provided credentials for login.""" + with db.cursor(conn) as cursor: + cursor.execute( + ("SELECT * FROM users LEFT JOIN user_credentials " + "ON users.user_id=user_credentials.user_id " + "WHERE users.user_id=?"), + (str(user.user_id),)) + row = cursor.fetchone() + + if row == None: + return False + + return bcrypt.checkpw(password.encode("utf-8"), row["password"]) + +@auth_routes.route("/login", methods=["POST"]) +def login(): + """Log in the user.""" + print(request.cookies) + if session.get("user"): + flash("Already logged in!", "alert-warning") + print(f"ALREADY LOGGED IN: {session['user']}") + return redirect("/", code=302) + + form = request.form + email = form.get("email").strip() + password = form.get("password").strip() + if email == "" or password == "": + flash("You must provide the email and password!", "alert-error") + return redirect("/", code=302) + + with db.connection(AUTH_DB) as conn: + user = user_by_email(conn, email).maybe(False, lambda usr: usr) + if user and valid_login(conn, user, password): + session["user"] = user + return jsonify({ + "user_id": user.user_id, + "email": user.email, + "name": user.name + }), 200 + + return jsonify({ + "message": "Could not login. Invalid 'email' or 'password'.", + "type": "authentication-error" + }), 401 diff --git a/gn3/auth/authentication/users.py b/gn3/auth/authentication/users.py index 4854d18..4adee61 100644 --- a/gn3/auth/authentication/users.py +++ b/gn3/auth/authentication/users.py @@ -2,8 +2,22 @@ from uuid import UUID from typing import NamedTuple +from pymonad.maybe import Just, Maybe, Nothing + +from gn3.auth import db + class User(NamedTuple): """Class representing a user.""" user_id: UUID email: str name: str + +def user_by_email(conn: db.DbConnection, email: str) -> Maybe: + with db.cursor(conn) as cursor: + cursor.execute("SELECT * FROM users WHERE email=?", (email,)) + row = cursor.fetchone() + + if row: + return Just(User(UUID(row["user_id"]), row["email"], row["name"])) + + return Nothing -- cgit v1.2.3