about summary refs log tree commit diff
path: root/gn_auth/hooks.py
diff options
context:
space:
mode:
Diffstat (limited to 'gn_auth/hooks.py')
-rw-r--r--gn_auth/hooks.py68
1 files changed, 68 insertions, 0 deletions
diff --git a/gn_auth/hooks.py b/gn_auth/hooks.py
new file mode 100644
index 0000000..bd7380b
--- /dev/null
+++ b/gn_auth/hooks.py
@@ -0,0 +1,68 @@
+"""Authorisation hooks implementation"""
+import functools
+from typing import List
+
+from flask import request_finished
+from flask import request, current_app
+
+from gn_auth.auth.db import sqlite3 as db
+
+def register_hooks(app):
+    """Initialise hooks system on the application."""
+    request_finished.connect(edu_domain_hook, app)
+
+
+def handle_register_request(func):
+    """Decorator for handling user registration hooks."""
+    @functools.wraps(func)
+    def wrapper(*args, **kwargs):
+        if request.method == "POST" and request.endpoint == "oauth2.users.register_user":
+            return func(*args, **kwargs)
+        else:
+            return lambda *args, **kwargs: None
+    return wrapper
+
+
+@handle_register_request
+def edu_domain_hook(_sender, response, **_extra):
+    """Hook to run whenever a user with a `.edu` domain registers."""
+    if response.status_code >= 400:
+        return
+    data = request.get_json()
+    if data is None or "email" not in data or not data["email"].endswith("edu"):
+        return
+    registered_email = data["email"]
+    apply_edu_role(registered_email)
+
+
+def apply_edu_role(email):
+    """Assign 'hook-role-from-edu-domain' to user."""
+    with db.connection(current_app.config["AUTH_DB"]) as conn:
+        with db.cursor(conn) as cursor:
+            cursor.execute("SELECT user_id FROM users WHERE email= ?", (email,) )
+            user_result = cursor.fetchone()
+            cursor.execute("SELECT role_id FROM roles WHERE role_name='hook-role-from-edu-domain'")
+            role_result = cursor.fetchone()
+            resource_ids = get_resources_for_edu_domain(cursor)
+            if user_result is None or role_result is None:
+                return
+            user_id = user_result[0]
+            role_id = role_result[0]
+            cursor.executemany(
+                "INSERT INTO user_roles(user_id, role_id, resource_id) "
+                "VALUES(:user_id, :role_id, :resource_id)",
+                tuple({
+                    "user_id": user_id,
+                    "role_id": role_id,
+                    "resource_id": resource_id
+                } for resource_id in resource_ids))
+
+
+def get_resources_for_edu_domain(cursor) -> List[int]:
+    """FIXME: I still haven't figured out how to get resources to be assigned to edu domain"""
+    resources_query = """
+        SELECT resource_id FROM resources INNER JOIN resource_categories USING(resource_category_id) WHERE resource_categories.resource_category_key IN ('genotype', 'phenotype', 'mrna')
+    """
+    cursor.execute(resources_query)
+    resource_ids = [x[0] for x in cursor.fetchall()]
+    return resource_ids