From 7c0ee01c1d134cbee0a2c8243dd55e4eeaaa5f7c Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Thu, 30 May 2024 12:03:38 -0500 Subject: Move user creation from db resultset into static method Creation of a User object from the database resultset will mostly be the same. This commit moves the repetitive code into a static method that can be called wherever we need it. This improves maintainability, since we only ever need to do an update in one place now. --- .../oauth2/grants/authorisation_code_grant.py | 3 +-- gn_auth/auth/authentication/users.py | 22 +++++++++++++++++----- .../auth/authorisation/resources/groups/models.py | 3 +-- gn_auth/auth/authorisation/resources/views.py | 2 +- gn_auth/auth/authorisation/users/admin/views.py | 3 +-- gn_auth/auth/authorisation/users/models.py | 4 +--- scripts/migrate_existing_data.py | 3 +-- 7 files changed, 23 insertions(+), 17 deletions(-) diff --git a/gn_auth/auth/authentication/oauth2/grants/authorisation_code_grant.py b/gn_auth/auth/authentication/oauth2/grants/authorisation_code_grant.py index a40292e..c285c96 100644 --- a/gn_auth/auth/authentication/oauth2/grants/authorisation_code_grant.py +++ b/gn_auth/auth/authentication/oauth2/grants/authorisation_code_grant.py @@ -81,8 +81,7 @@ class AuthorisationCodeGrant(grants.AuthorizationCodeGrant): cursor.execute(query, (str(authorization_code.code),)) res = cursor.fetchone() if res: - return User( - uuid.UUID(res["user_id"]), res["email"], res["name"]) + return User.from_sqlite3_row(res) return None diff --git a/gn_auth/auth/authentication/users.py b/gn_auth/auth/authentication/users.py index c2c1162..140ce36 100644 --- a/gn_auth/auth/authentication/users.py +++ b/gn_auth/auth/authentication/users.py @@ -1,8 +1,10 @@ """User-specific code and data structures.""" -from uuid import UUID, uuid4 +import datetime from typing import Tuple +from uuid import UUID, uuid4 from dataclasses import dataclass +import sqlite3 from argon2 import PasswordHasher from argon2.exceptions import VerifyMismatchError @@ -16,11 +18,22 @@ class User: user_id: UUID email: str name: str + created: datetime.datetime = datetime.datetime.now() + verified: bool = False def get_user_id(self): """Get the user's ID.""" return self.user_id + @staticmethod + def from_sqlite3_row(row: sqlite3.Row): + """Generate a user from a row in an SQLite3 resultset""" + return User(user_id=UUID(row["user_id"]), + email=row["email"], + name=row["name"], + created=datetime.datetime.fromtimestamp(row["created"]), + verified=bool(int(row["verified"]))) + DUMMY_USER = User(user_id=UUID("a391cf60-e8b7-4294-bd22-ddbbda4b3530"), email="gn3@dummy.user", @@ -33,7 +46,7 @@ def user_by_email(conn: db.DbConnection, email: str) -> User: row = cursor.fetchone() if row: - return User(UUID(row["user_id"]), row["email"], row["name"]) + return User.from_sqlite3_row(row) raise NotFoundError(f"Could not find user with email {email}") @@ -44,7 +57,7 @@ def user_by_id(conn: db.DbConnection, user_id: UUID) -> User: row = cursor.fetchone() if row: - return User(UUID(row["user_id"]), row["email"], row["name"]) + return User.from_sqlite3_row(row) raise NotFoundError(f"Could not find user with ID {user_id}") @@ -135,6 +148,5 @@ def fetch_users(conn: db.DbConnection, f" WHERE user_id IN ({params})" if len(ids) > 0 else "") cursor.execute(query, tuple(str(the_id) for the_id in ids)) - return tuple(User(UUID(row["user_id"]), row["email"], row["name"]) - for row in cursor.fetchall()) + return tuple(User.from_sqlite3_row(row) for row in cursor.fetchall()) return tuple() diff --git a/gn_auth/auth/authorisation/resources/groups/models.py b/gn_auth/auth/authorisation/resources/groups/models.py index 3feefa6..03a93b6 100644 --- a/gn_auth/auth/authorisation/resources/groups/models.py +++ b/gn_auth/auth/authorisation/resources/groups/models.py @@ -276,8 +276,7 @@ def group_users(conn: db.DbConnection, group_id: UUID) -> Iterable[User]: {"group_id": str(group_id)}) results = cursor.fetchall() - return (User(UUID(row["user_id"]), row["email"], row["name"]) - for row in results) + return (User.from_sqlite3_row(row) for row in results) @authorised_p( diff --git a/gn_auth/auth/authorisation/resources/views.py b/gn_auth/auth/authorisation/resources/views.py index 0200222..c481ef9 100644 --- a/gn_auth/auth/authorisation/resources/views.py +++ b/gn_auth/auth/authorisation/resources/views.py @@ -172,7 +172,7 @@ def resource_users(resource_id: uuid.UUID): def __organise_users_n_roles__(users_n_roles, row): user_id = uuid.UUID(row["user_id"]) user = users_n_roles.get(user_id, {}).get( - "user", User(user_id, row["email"], row["name"])) + "user", User.from_sqlite3_row(row)) role = Role( uuid.UUID(row["role_id"]), row["role_name"], bool(int(row["user_editable"])), tuple()) diff --git a/gn_auth/auth/authorisation/users/admin/views.py b/gn_auth/auth/authorisation/users/admin/views.py index 73e808a..8ca1e51 100644 --- a/gn_auth/auth/authorisation/users/admin/views.py +++ b/gn_auth/auth/authorisation/users/admin/views.py @@ -189,8 +189,7 @@ def register_client(): with db.cursor(conn) as cursor: cursor.execute("SELECT * FROM users") return tuple( - User(uuid.UUID(row["user_id"]), row["email"], row["name"]) - for row in cursor.fetchall()) + User.from_sqlite3_row(row) for row in cursor.fetchall()) if request.method == "GET": return render_template( "admin/register-client.html", diff --git a/gn_auth/auth/authorisation/users/models.py b/gn_auth/auth/authorisation/users/models.py index 8b47fc1..bde2e33 100644 --- a/gn_auth/auth/authorisation/users/models.py +++ b/gn_auth/auth/authorisation/users/models.py @@ -17,9 +17,7 @@ def list_users(conn: db.DbConnection) -> tuple[User, ...]: """List out all users.""" with db.cursor(conn) as cursor: cursor.execute("SELECT * FROM users") - return tuple( - User(uuid.UUID(row["user_id"]), row["email"], row["name"]) - for row in cursor.fetchall()) + return tuple(User.from_sqlite3_row(row) for row in cursor.fetchall()) def __build_resource_roles__(rows): def __build_roles__(roles, row): diff --git a/scripts/migrate_existing_data.py b/scripts/migrate_existing_data.py index ab3e739..1b44666 100644 --- a/scripts/migrate_existing_data.py +++ b/scripts/migrate_existing_data.py @@ -35,8 +35,7 @@ def sys_admins(conn: authdb.DbConnection) -> tuple[User, ...]: "INNER JOIN user_roles AS ur ON u.user_id=ur.user_id " "INNER JOIN roles AS r ON ur.role_id=r.role_id " "WHERE r.role_name='system-administrator'") - return tuple(User(UUID(row["user_id"]), row["email"], row["name"]) - for row in cursor.fetchall()) + return tuple(User.from_sqlite3_row(row) for row in cursor.fetchall()) return tuple() def choose_admin(enum_admins: dict[int, User]) -> int: -- cgit v1.2.3