diff options
author | Frederick Muriuki Muriithi | 2022-11-14 18:04:08 +0300 |
---|---|---|
committer | Frederick Muriuki Muriithi | 2022-11-14 18:04:08 +0300 |
commit | 6b964b95a67bac69a1217b3f9c39c58a19881df4 (patch) | |
tree | f83ac08fa06c7535c1fcb3926876029e2ec17a52 /gn3 | |
parent | baf7980cd6080c8cb4e6c6b585948144273600aa (diff) | |
download | genenetwork3-6b964b95a67bac69a1217b3f9c39c58a19881df4.tar.gz |
auth: Implement `create_group`
Diffstat (limited to 'gn3')
-rw-r--r-- | gn3/auth/authorisation/__init__.py | 39 | ||||
-rw-r--r-- | gn3/auth/authorisation/groups.py | 15 | ||||
-rw-r--r-- | gn3/auth/authorisation/privileges.py | 16 | ||||
-rw-r--r-- | gn3/auth/db.py | 19 |
4 files changed, 78 insertions, 11 deletions
diff --git a/gn3/auth/authorisation/__init__.py b/gn3/auth/authorisation/__init__.py index a6991d2..048f67d 100644 --- a/gn3/auth/authorisation/__init__.py +++ b/gn3/auth/authorisation/__init__.py @@ -1,12 +1,35 @@ """The authorisation module.""" -from typing import Union +from functools import wraps +from typing import Union, Callable -def authorised_p(success_message: Union[str, bool] = False, error_message: Union[str, bool] = False): +from flask import g, current_app as app + +from gn3.auth import db +from . import privileges as auth_privs + +def authorised_p( + privileges: tuple[str] = tuple(), + success_message: Union[str, bool] = False, + error_message: Union[str, bool] = False): """Authorisation decorator.""" - def __authoriser__(*args, **kwargs): - return { - "status": "error", - "message": error_message or "unauthorised" - } + assert len(privileges) > 0, "You must provide at least one privilege" + def __build_authoriser__(func: Callable): + @wraps(func) + def __authoriser__(*args, **kwargs): + if hasattr(g, "user_id") and g.user_id: + with db.connection(app.config["AUTH_DB"]) as conn: + user_privileges = auth_privs.user_privileges(conn, g.user_id) - return __authoriser__ + not_assigned = [ + priv for priv in privileges if priv not in user_privileges] + if len(not_assigned) == 0: + return { + "status": "success", + "message": success_message or "successfully authorised", + "results": func(*args, **kwargs)} + return { + "status": "error", + "message": f"Unauthorised: {error_message or ''}" + } + return __authoriser__ + return __build_authoriser__ diff --git a/gn3/auth/authorisation/groups.py b/gn3/auth/authorisation/groups.py index 8c8a87f..ad30763 100644 --- a/gn3/auth/authorisation/groups.py +++ b/gn3/auth/authorisation/groups.py @@ -1,7 +1,16 @@ """Handle the management of resource/user groups.""" +import uuid +from gn3.auth import db from . import authorised_p -@authorised_p -def create_group(group_name): - raise Exception("NOT IMPLEMENTED!") +@authorised_p( + ("create-group",), success_message="Successfully created group.", + error_message="Failed to create group.") +def create_group(conn, group_name): + with db.cursor(conn) as cursor: + group_id = uuid.uuid4() + cursor.execute( + "INSERT INTO groups(group_id, group_name) VALUES (?, ?)", + (str(group_id), group_name)) + return group_id diff --git a/gn3/auth/authorisation/privileges.py b/gn3/auth/authorisation/privileges.py new file mode 100644 index 0000000..99b36ef --- /dev/null +++ b/gn3/auth/authorisation/privileges.py @@ -0,0 +1,16 @@ +"""Handle privileges""" +from uuid import UUID + +from gn3.auth import db + +def user_privileges(conn, user_id: UUID): + """Fetch the user's privileges from the database.""" + with db.cursor(conn) as cursor: + cursor.execute( + ("SELECT p.privilege_name " + "FROM user_roles AS ur " + "INNER JOIN role_privileges AS rp ON ur.role_id=rp.role_id " + "INNER JOIN privileges AS p ON rp.privilege_id=p.privilege_id " + "WHERE ur.user_id=?"), + (str(user_id),)) + return tuple(row[0] for row in cursor.fetchall()) diff --git a/gn3/auth/db.py b/gn3/auth/db.py new file mode 100644 index 0000000..c0d0415 --- /dev/null +++ b/gn3/auth/db.py @@ -0,0 +1,19 @@ +"""Handle connection to auth database.""" +import sqlite3 +import contextlib + +@contextlib.contextmanager +def connection(db_path: str): + connection = sqlite3.connect(db_path) + try: + yield connection + finally: + connection.close() + +@contextlib.contextmanager +def cursor(connection): + cur = connection.cursor() + try: + yield cur + finally: + cur.close() |