aboutsummaryrefslogtreecommitdiff
path: root/gn_auth
diff options
context:
space:
mode:
Diffstat (limited to 'gn_auth')
-rw-r--r--gn_auth/auth/authorisation/data/views.py30
-rw-r--r--gn_auth/commands.py64
-rw-r--r--gn_auth/jobs.py61
-rw-r--r--gn_auth/json_encoders_decoders.py31
-rw-r--r--gn_auth/session.py60
5 files changed, 245 insertions, 1 deletions
diff --git a/gn_auth/auth/authorisation/data/views.py b/gn_auth/auth/authorisation/data/views.py
index 9e55dd8..03b416f 100644
--- a/gn_auth/auth/authorisation/data/views.py
+++ b/gn_auth/auth/authorisation/data/views.py
@@ -11,9 +11,9 @@ from authlib.integrations.flask_oauth2.errors import _HTTPException
from flask import request, jsonify, Response, Blueprint, current_app as app
import gn_auth.auth.db.mariadb as gn3db
+
from gn_auth import jobs
from gn_auth.commands import run_async_cmd
-from gn_auth.db.traits import build_trait_name
from gn_auth.auth.db import sqlite3 as db
from gn_auth.auth.db.sqlite3 import with_db_connection
@@ -39,6 +39,34 @@ from gn_auth.auth.authorisation.data.genotypes import (
data = Blueprint("data", __name__)
+def build_trait_name(trait_fullname):
+ """
+ Initialises the trait's name, and other values from the search data provided
+
+ This is a copy of `gn3.db.traits.build_trait_name` function.
+ """
+ def dataset_type(dset_name):
+ if dset_name.find('Temp') >= 0:
+ return "Temp"
+ if dset_name.find('Geno') >= 0:
+ return "Geno"
+ if dset_name.find('Publish') >= 0:
+ return "Publish"
+ return "ProbeSet"
+
+ name_parts = trait_fullname.split("::")
+ assert len(name_parts) >= 2, f"Name format error: '{trait_fullname}'"
+ dataset_name = name_parts[0]
+ dataset_type = dataset_type(dataset_name)
+ return {
+ "db": {
+ "dataset_name": dataset_name,
+ "dataset_type": dataset_type},
+ "trait_fullname": trait_fullname,
+ "trait_name": name_parts[1],
+ "cellid": name_parts[2] if len(name_parts) == 3 else ""
+ }
+
@data.route("species")
def list_species() -> Response:
"""List all available species information."""
diff --git a/gn_auth/commands.py b/gn_auth/commands.py
new file mode 100644
index 0000000..d6f6f56
--- /dev/null
+++ b/gn_auth/commands.py
@@ -0,0 +1,64 @@
+"""Procedures used to work with the various bio-informatics cli
+commands"""
+import sys
+import json
+import subprocess
+from uuid import uuid4
+from datetime import datetime
+from typing import Dict, Optional, Tuple, Union, Sequence
+
+
+from redis.client import Redis
+
+def queue_cmd(conn: Redis,
+ job_queue: str,
+ cmd: Union[str, Sequence[str]],
+ email: Optional[str] = None,
+ env: Optional[dict] = None) -> str:
+ """Given a command CMD; (optional) EMAIL; and a redis connection CONN, queue
+it in Redis with an initial status of 'queued'. The following status codes
+are supported:
+
+ queued: Unprocessed; Still in the queue
+ running: Still running
+ success: Successful completion
+ error: Erroneous completion
+
+Returns the name of the specific redis hash for the specific task.
+
+ """
+ if not conn.ping():
+ raise RedisConnectionError
+ unique_id = ("cmd::"
+ f"{datetime.now().strftime('%Y-%m-%d%H-%M%S-%M%S-')}"
+ f"{str(uuid4())}")
+ conn.rpush(job_queue, unique_id)
+ for key, value in {
+ "cmd": json.dumps(cmd), "result": "", "status": "queued"}.items():
+ conn.hset(name=unique_id, key=key, value=value)
+ if email:
+ conn.hset(name=unique_id, key="email", value=email)
+ if env:
+ conn.hset(name=unique_id, key="env", value=json.dumps(env))
+ return unique_id
+
+def run_cmd(cmd: str, success_codes: Tuple = (0,), env: str = None) -> Dict:
+ """Run CMD and return the CMD's status code and output as a dict"""
+ parsed_cmd = json.loads(cmd)
+ parsed_env = (json.loads(env) if env is not None else None)
+ results = subprocess.run(
+ parsed_cmd, capture_output=True, shell=isinstance(parsed_cmd, str),
+ check=False, env=parsed_env)
+ out = str(results.stdout, 'utf-8')
+ if results.returncode not in success_codes: # Error!
+ out = str(results.stderr, 'utf-8')
+ return {"code": results.returncode, "output": out}
+
+def run_async_cmd(
+ conn: Redis, job_queue: str, cmd: Union[str, Sequence[str]],
+ email: Optional[str] = None, env: Optional[dict] = None) -> str:
+ """A utility function to call `gn3.commands.queue_cmd` function and run the
+ worker in the `one-shot` mode."""
+ cmd_id = queue_cmd(conn, job_queue, cmd, email, env)
+ subprocess.Popen([f"{sys.executable}", "-m", "sheepdog.worker"]) # pylint: disable=[consider-using-with]
+ return cmd_id
diff --git a/gn_auth/jobs.py b/gn_auth/jobs.py
new file mode 100644
index 0000000..6fc9bba
--- /dev/null
+++ b/gn_auth/jobs.py
@@ -0,0 +1,61 @@
+"""Handle external processes in a consistent manner."""
+import json
+from typing import Any
+from uuid import UUID, uuid4
+from datetime import datetime
+
+from redis import Redis
+from pymonad.either import Left, Right, Either
+
+from gn_auth import json_encoders_decoders as jed
+
+JOBS_NAMESPACE = "GN3_AUTH::JOBS"
+
+class InvalidCommand(Exception):
+ """Raise if the command to run is invalid."""
+
+def job_key(job_id: UUID, namespace_prefix: str = JOBS_NAMESPACE):
+ """Build the namespace key for a specific job."""
+ return f"{namespace_prefix}::{job_id}"
+
+def job(redisconn: Redis, job_id: UUID) -> Either:
+ """Retrive the job details of a job identified by `job_id`."""
+ the_job = redisconn.hgetall(job_key(job_id))
+ if the_job:
+ return Right({
+ key: json.loads(value, object_hook=jed.custom_json_decoder)
+ for key, value in the_job.items()
+ })
+ return Left({
+ "error": "NotFound",
+ "error_description": f"Job '{job_id}' was not found."
+ })
+
+def __command_valid__(job_command: Any) -> Either:
+ if not isinstance(job_command, list):
+ return Left({
+ "error": "InvalidJobCommand",
+ "error_description": "The job command MUST be a list."
+ })
+ if not all((isinstance(val, str) for val in job_command)):
+ return Left({
+ "error": "InvalidJobCommand",
+ "error_description": "All parts of the command MUST be strings."
+ })
+ return Right(job_command)
+
+def create_job(redisconn: Redis, job_details: dict[str, Any]) -> UUID:
+ """Create a new job and put it on Redis."""
+ def __create__(_job_command):
+ job_id = job_details.get("job_id", uuid4())
+ redisconn.hset(job_key(job_id), mapping={
+ key: json.dumps(value, cls=jed.CustomJSONEncoder) for key, value in {
+ **job_details, "job_id": job_id, "created": datetime.now(),
+ "status": "queued"
+ }.items()
+ })
+ return job_id
+ def __raise__(err):
+ raise InvalidCommand(err["error_description"])
+ return __command_valid__(job_details.get("command")).either(
+ __raise__, __create__)
diff --git a/gn_auth/json_encoders_decoders.py b/gn_auth/json_encoders_decoders.py
new file mode 100644
index 0000000..be15b34
--- /dev/null
+++ b/gn_auth/json_encoders_decoders.py
@@ -0,0 +1,31 @@
+"""Custom json encoders for various purposes."""
+import json
+from uuid import UUID
+from datetime import datetime
+
+__ENCODERS__ = {
+ UUID: lambda obj: {"__type": "UUID", "__value": str(obj)},
+ datetime: lambda obj: {"__type": "DATETIME", "__value": obj.isoformat()}
+}
+
+class CustomJSONEncoder(json.JSONEncoder):
+ """
+ A custom JSON encoder to handle cases where the default encoder fails.
+ """
+ def default(self, obj):# pylint: disable=[arguments-renamed]
+ """Return a serializable object for `obj`."""
+ if type(obj) in __ENCODERS__:
+ return __ENCODERS__[type(obj)](obj)
+ return json.JSONEncoder.default(self, obj)
+
+
+__DECODERS__ = {
+ "UUID": UUID,
+ "DATETIME": datetime.fromisoformat
+}
+
+def custom_json_decoder(obj_dict):
+ """Decode custom types"""
+ if "__type" in obj_dict:
+ return __DECODERS__[obj_dict["__type"]](obj_dict["__value"])
+ return obj_dict
diff --git a/gn_auth/session.py b/gn_auth/session.py
new file mode 100644
index 0000000..7226ac5
--- /dev/null
+++ b/gn_auth/session.py
@@ -0,0 +1,60 @@
+"""Handle any GN3 sessions."""
+from functools import wraps
+from datetime import datetime, timezone, timedelta
+
+from flask import flash, request, session, url_for, redirect
+
+__SESSION_KEY__ = "GN::AUTH::session_details"
+
+def __session_expired__():
+ """Check whether the session has expired."""
+ return datetime.now(tz=timezone.utc) >= session[__SESSION_KEY__]["expires"]
+
+def logged_in(func):
+ """Verify the user is logged in."""
+ @wraps(func)
+ def __logged_in__(*args, **kwargs):
+ if bool(session.get(__SESSION_KEY__)) and not __session_expired__():
+ return func(*args, **kwargs)
+ flash("You need to be logged in to access that page.", "alert-danger")
+ return redirect(url_for(
+ "oauth2.admin.login", next=request.url_rule.endpoint))
+ return __logged_in__
+
+def session_info():
+ """Retrieve the session information."""
+ return session.get(__SESSION_KEY__, False)
+
+def session_user():
+ """Retrieve session user."""
+ info = session_info()
+ return info and info["user"]
+
+def clear_session_info():
+ """Clear any session info."""
+ try:
+ session.pop(__SESSION_KEY__)
+ except KeyError as _keyerr:
+ pass
+
+def session_expired() -> bool:
+ """
+ Check whether the session has expired. Will always return `True` if no
+ session currently exists.
+ """
+ if bool(session.get(__SESSION_KEY__)):
+ now = datetime.now(tz=timezone.utc)
+ return now >= session[__SESSION_KEY__]["expires"]
+ return True
+
+def update_expiry() -> bool:
+ """Update the session expiry and return a boolean indicating success."""
+ if not session_expired():
+ now = datetime.now(tz=timezone.utc)
+ session[__SESSION_KEY__]["expires"] = now + timedelta(minutes=10)
+ return True
+ return False
+
+def update_session_info(**info):
+ """Update the session information."""
+ session[__SESSION_KEY__] = info