diff options
author | Frederick Muriuki Muriithi | 2022-12-19 16:02:19 +0300 |
---|---|---|
committer | Frederick Muriuki Muriithi | 2022-12-22 09:05:53 +0300 |
commit | b0641272491eb51d321b1b8a7d062e395e70800f (patch) | |
tree | c9b2065ea60399579c4c4d84c648b61ed67402ba /gn3/auth/authentication/users.py | |
parent | e9031e28594fcd21371adb2b9b26e17a1df95599 (diff) | |
download | genenetwork3-b0641272491eb51d321b1b8a7d062e395e70800f.tar.gz |
auth: implement OAuth2 flow.oauth2_auth_flow
Add code to implement the OAuth2 flow.
* Add test fixtures for setting up users and OAuth2 clients
* Add tests for token generation with the "Password Grant" flow
* Fix some issues with test due to changes in the database connection's
row_factory
Diffstat (limited to 'gn3/auth/authentication/users.py')
-rw-r--r-- | gn3/auth/authentication/users.py | 28 |
1 files changed, 28 insertions, 0 deletions
diff --git a/gn3/auth/authentication/users.py b/gn3/auth/authentication/users.py index 11deba2..6ec6bca 100644 --- a/gn3/auth/authentication/users.py +++ b/gn3/auth/authentication/users.py @@ -2,6 +2,7 @@ from uuid import UUID from typing import NamedTuple +import bcrypt from pymonad.maybe import Just, Maybe, Nothing from gn3.auth import db @@ -17,6 +18,7 @@ class User(NamedTuple): return self.user_id def user_by_email(conn: db.DbConnection, email: str) -> Maybe: + """Retrieve user from database by their email address""" with db.cursor(conn) as cursor: cursor.execute("SELECT * FROM users WHERE email=?", (email,)) row = cursor.fetchone() @@ -25,3 +27,29 @@ def user_by_email(conn: db.DbConnection, email: str) -> Maybe: return Just(User(UUID(row["user_id"]), row["email"], row["name"])) return Nothing + +def user_by_id(conn: db.DbConnection, user_id: UUID) -> Maybe: + """Retrieve user from database by their user id""" + with db.cursor(conn) as cursor: + cursor.execute("SELECT * FROM users WHERE user_id=?", (str(user_id),)) + row = cursor.fetchone() + + if row: + return Just(User(UUID(row["user_id"]), row["email"], row["name"])) + + return Nothing + +def valid_login(conn: db.DbConnection, user: User, password: str) -> bool: + """Check the validity of the provided credentials for login.""" + with db.cursor(conn) as cursor: + cursor.execute( + ("SELECT * FROM users LEFT JOIN user_credentials " + "ON users.user_id=user_credentials.user_id " + "WHERE users.user_id=?"), + (str(user.user_id),)) + row = cursor.fetchone() + + if row is None: + return False + + return bcrypt.checkpw(password.encode("utf-8"), row["password"]) |