aboutsummaryrefslogtreecommitdiff
path: root/gn3/auth/authentication/users.py
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2022-12-19 16:02:19 +0300
committerFrederick Muriuki Muriithi2022-12-22 09:05:53 +0300
commitb0641272491eb51d321b1b8a7d062e395e70800f (patch)
treec9b2065ea60399579c4c4d84c648b61ed67402ba /gn3/auth/authentication/users.py
parente9031e28594fcd21371adb2b9b26e17a1df95599 (diff)
downloadgenenetwork3-b0641272491eb51d321b1b8a7d062e395e70800f.tar.gz
auth: implement OAuth2 flow.oauth2_auth_flow
Add code to implement the OAuth2 flow. * Add test fixtures for setting up users and OAuth2 clients * Add tests for token generation with the "Password Grant" flow * Fix some issues with test due to changes in the database connection's row_factory
Diffstat (limited to 'gn3/auth/authentication/users.py')
-rw-r--r--gn3/auth/authentication/users.py28
1 files changed, 28 insertions, 0 deletions
diff --git a/gn3/auth/authentication/users.py b/gn3/auth/authentication/users.py
index 11deba2..6ec6bca 100644
--- a/gn3/auth/authentication/users.py
+++ b/gn3/auth/authentication/users.py
@@ -2,6 +2,7 @@
from uuid import UUID
from typing import NamedTuple
+import bcrypt
from pymonad.maybe import Just, Maybe, Nothing
from gn3.auth import db
@@ -17,6 +18,7 @@ class User(NamedTuple):
return self.user_id
def user_by_email(conn: db.DbConnection, email: str) -> Maybe:
+ """Retrieve user from database by their email address"""
with db.cursor(conn) as cursor:
cursor.execute("SELECT * FROM users WHERE email=?", (email,))
row = cursor.fetchone()
@@ -25,3 +27,29 @@ def user_by_email(conn: db.DbConnection, email: str) -> Maybe:
return Just(User(UUID(row["user_id"]), row["email"], row["name"]))
return Nothing
+
+def user_by_id(conn: db.DbConnection, user_id: UUID) -> Maybe:
+ """Retrieve user from database by their user id"""
+ with db.cursor(conn) as cursor:
+ cursor.execute("SELECT * FROM users WHERE user_id=?", (str(user_id),))
+ row = cursor.fetchone()
+
+ if row:
+ return Just(User(UUID(row["user_id"]), row["email"], row["name"]))
+
+ return Nothing
+
+def valid_login(conn: db.DbConnection, user: User, password: str) -> bool:
+ """Check the validity of the provided credentials for login."""
+ with db.cursor(conn) as cursor:
+ cursor.execute(
+ ("SELECT * FROM users LEFT JOIN user_credentials "
+ "ON users.user_id=user_credentials.user_id "
+ "WHERE users.user_id=?"),
+ (str(user.user_id),))
+ row = cursor.fetchone()
+
+ if row is None:
+ return False
+
+ return bcrypt.checkpw(password.encode("utf-8"), row["password"])