aboutsummaryrefslogtreecommitdiff
path: root/gn3
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2022-11-14 18:04:08 +0300
committerFrederick Muriuki Muriithi2022-11-14 18:04:08 +0300
commit6b964b95a67bac69a1217b3f9c39c58a19881df4 (patch)
treef83ac08fa06c7535c1fcb3926876029e2ec17a52 /gn3
parentbaf7980cd6080c8cb4e6c6b585948144273600aa (diff)
downloadgenenetwork3-6b964b95a67bac69a1217b3f9c39c58a19881df4.tar.gz
auth: Implement `create_group`
Diffstat (limited to 'gn3')
-rw-r--r--gn3/auth/authorisation/__init__.py39
-rw-r--r--gn3/auth/authorisation/groups.py15
-rw-r--r--gn3/auth/authorisation/privileges.py16
-rw-r--r--gn3/auth/db.py19
4 files changed, 78 insertions, 11 deletions
diff --git a/gn3/auth/authorisation/__init__.py b/gn3/auth/authorisation/__init__.py
index a6991d2..048f67d 100644
--- a/gn3/auth/authorisation/__init__.py
+++ b/gn3/auth/authorisation/__init__.py
@@ -1,12 +1,35 @@
"""The authorisation module."""
-from typing import Union
+from functools import wraps
+from typing import Union, Callable
-def authorised_p(success_message: Union[str, bool] = False, error_message: Union[str, bool] = False):
+from flask import g, current_app as app
+
+from gn3.auth import db
+from . import privileges as auth_privs
+
+def authorised_p(
+ privileges: tuple[str] = tuple(),
+ success_message: Union[str, bool] = False,
+ error_message: Union[str, bool] = False):
"""Authorisation decorator."""
- def __authoriser__(*args, **kwargs):
- return {
- "status": "error",
- "message": error_message or "unauthorised"
- }
+ assert len(privileges) > 0, "You must provide at least one privilege"
+ def __build_authoriser__(func: Callable):
+ @wraps(func)
+ def __authoriser__(*args, **kwargs):
+ if hasattr(g, "user_id") and g.user_id:
+ with db.connection(app.config["AUTH_DB"]) as conn:
+ user_privileges = auth_privs.user_privileges(conn, g.user_id)
- return __authoriser__
+ not_assigned = [
+ priv for priv in privileges if priv not in user_privileges]
+ if len(not_assigned) == 0:
+ return {
+ "status": "success",
+ "message": success_message or "successfully authorised",
+ "results": func(*args, **kwargs)}
+ return {
+ "status": "error",
+ "message": f"Unauthorised: {error_message or ''}"
+ }
+ return __authoriser__
+ return __build_authoriser__
diff --git a/gn3/auth/authorisation/groups.py b/gn3/auth/authorisation/groups.py
index 8c8a87f..ad30763 100644
--- a/gn3/auth/authorisation/groups.py
+++ b/gn3/auth/authorisation/groups.py
@@ -1,7 +1,16 @@
"""Handle the management of resource/user groups."""
+import uuid
+from gn3.auth import db
from . import authorised_p
-@authorised_p
-def create_group(group_name):
- raise Exception("NOT IMPLEMENTED!")
+@authorised_p(
+ ("create-group",), success_message="Successfully created group.",
+ error_message="Failed to create group.")
+def create_group(conn, group_name):
+ with db.cursor(conn) as cursor:
+ group_id = uuid.uuid4()
+ cursor.execute(
+ "INSERT INTO groups(group_id, group_name) VALUES (?, ?)",
+ (str(group_id), group_name))
+ return group_id
diff --git a/gn3/auth/authorisation/privileges.py b/gn3/auth/authorisation/privileges.py
new file mode 100644
index 0000000..99b36ef
--- /dev/null
+++ b/gn3/auth/authorisation/privileges.py
@@ -0,0 +1,16 @@
+"""Handle privileges"""
+from uuid import UUID
+
+from gn3.auth import db
+
+def user_privileges(conn, user_id: UUID):
+ """Fetch the user's privileges from the database."""
+ with db.cursor(conn) as cursor:
+ cursor.execute(
+ ("SELECT p.privilege_name "
+ "FROM user_roles AS ur "
+ "INNER JOIN role_privileges AS rp ON ur.role_id=rp.role_id "
+ "INNER JOIN privileges AS p ON rp.privilege_id=p.privilege_id "
+ "WHERE ur.user_id=?"),
+ (str(user_id),))
+ return tuple(row[0] for row in cursor.fetchall())
diff --git a/gn3/auth/db.py b/gn3/auth/db.py
new file mode 100644
index 0000000..c0d0415
--- /dev/null
+++ b/gn3/auth/db.py
@@ -0,0 +1,19 @@
+"""Handle connection to auth database."""
+import sqlite3
+import contextlib
+
+@contextlib.contextmanager
+def connection(db_path: str):
+ connection = sqlite3.connect(db_path)
+ try:
+ yield connection
+ finally:
+ connection.close()
+
+@contextlib.contextmanager
+def cursor(connection):
+ cur = connection.cursor()
+ try:
+ yield cur
+ finally:
+ cur.close()