about summary refs log tree commit diff
path: root/gn3/auth/authentication/users.py
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2022-12-19 16:02:19 +0300
committerFrederick Muriuki Muriithi2022-12-22 09:05:53 +0300
commitb0641272491eb51d321b1b8a7d062e395e70800f (patch)
treec9b2065ea60399579c4c4d84c648b61ed67402ba /gn3/auth/authentication/users.py
parente9031e28594fcd21371adb2b9b26e17a1df95599 (diff)
downloadgenenetwork3-b0641272491eb51d321b1b8a7d062e395e70800f.tar.gz
auth: implement OAuth2 flow. oauth2_auth_flow
Add code to implement the OAuth2 flow.

* Add test fixtures for setting up users and OAuth2 clients
* Add tests for token generation with the "Password Grant" flow
* Fix some issues with test due to changes in the database connection's
  row_factory
Diffstat (limited to 'gn3/auth/authentication/users.py')
-rw-r--r--gn3/auth/authentication/users.py28
1 files changed, 28 insertions, 0 deletions
diff --git a/gn3/auth/authentication/users.py b/gn3/auth/authentication/users.py
index 11deba2..6ec6bca 100644
--- a/gn3/auth/authentication/users.py
+++ b/gn3/auth/authentication/users.py
@@ -2,6 +2,7 @@
 from uuid import UUID
 from typing import NamedTuple
 
+import bcrypt
 from pymonad.maybe import Just, Maybe, Nothing
 
 from gn3.auth import db
@@ -17,6 +18,7 @@ class User(NamedTuple):
         return self.user_id
 
 def user_by_email(conn: db.DbConnection, email: str) -> Maybe:
+    """Retrieve user from database by their email address"""
     with db.cursor(conn) as cursor:
         cursor.execute("SELECT * FROM users WHERE email=?", (email,))
         row = cursor.fetchone()
@@ -25,3 +27,29 @@ def user_by_email(conn: db.DbConnection, email: str) -> Maybe:
         return Just(User(UUID(row["user_id"]), row["email"], row["name"]))
 
     return Nothing
+
+def user_by_id(conn: db.DbConnection, user_id: UUID) -> Maybe:
+    """Retrieve user from database by their user id"""
+    with db.cursor(conn) as cursor:
+        cursor.execute("SELECT * FROM users WHERE user_id=?", (str(user_id),))
+        row = cursor.fetchone()
+
+    if row:
+        return Just(User(UUID(row["user_id"]), row["email"], row["name"]))
+
+    return Nothing
+
+def valid_login(conn: db.DbConnection, user: User, password: str) -> bool:
+    """Check the validity of the provided credentials for login."""
+    with db.cursor(conn) as cursor:
+        cursor.execute(
+            ("SELECT * FROM users LEFT JOIN user_credentials "
+             "ON users.user_id=user_credentials.user_id "
+             "WHERE users.user_id=?"),
+            (str(user.user_id),))
+        row = cursor.fetchone()
+
+    if row is None:
+        return False
+
+    return bcrypt.checkpw(password.encode("utf-8"), row["password"])