diff options
Diffstat (limited to 'gn_auth')
-rw-r--r-- | gn_auth/auth/authorisation/data/views.py | 30 | ||||
-rw-r--r-- | gn_auth/commands.py | 64 | ||||
-rw-r--r-- | gn_auth/jobs.py | 61 | ||||
-rw-r--r-- | gn_auth/json_encoders_decoders.py | 31 | ||||
-rw-r--r-- | gn_auth/session.py | 60 |
5 files changed, 245 insertions, 1 deletions
diff --git a/gn_auth/auth/authorisation/data/views.py b/gn_auth/auth/authorisation/data/views.py index 9e55dd8..03b416f 100644 --- a/gn_auth/auth/authorisation/data/views.py +++ b/gn_auth/auth/authorisation/data/views.py @@ -11,9 +11,9 @@ from authlib.integrations.flask_oauth2.errors import _HTTPException from flask import request, jsonify, Response, Blueprint, current_app as app import gn_auth.auth.db.mariadb as gn3db + from gn_auth import jobs from gn_auth.commands import run_async_cmd -from gn_auth.db.traits import build_trait_name from gn_auth.auth.db import sqlite3 as db from gn_auth.auth.db.sqlite3 import with_db_connection @@ -39,6 +39,34 @@ from gn_auth.auth.authorisation.data.genotypes import ( data = Blueprint("data", __name__) +def build_trait_name(trait_fullname): + """ + Initialises the trait's name, and other values from the search data provided + + This is a copy of `gn3.db.traits.build_trait_name` function. + """ + def dataset_type(dset_name): + if dset_name.find('Temp') >= 0: + return "Temp" + if dset_name.find('Geno') >= 0: + return "Geno" + if dset_name.find('Publish') >= 0: + return "Publish" + return "ProbeSet" + + name_parts = trait_fullname.split("::") + assert len(name_parts) >= 2, f"Name format error: '{trait_fullname}'" + dataset_name = name_parts[0] + dataset_type = dataset_type(dataset_name) + return { + "db": { + "dataset_name": dataset_name, + "dataset_type": dataset_type}, + "trait_fullname": trait_fullname, + "trait_name": name_parts[1], + "cellid": name_parts[2] if len(name_parts) == 3 else "" + } + @data.route("species") def list_species() -> Response: """List all available species information.""" diff --git a/gn_auth/commands.py b/gn_auth/commands.py new file mode 100644 index 0000000..d6f6f56 --- /dev/null +++ b/gn_auth/commands.py @@ -0,0 +1,64 @@ +"""Procedures used to work with the various bio-informatics cli +commands""" +import sys +import json +import subprocess +from uuid import uuid4 +from datetime import datetime +from typing import Dict, Optional, Tuple, Union, Sequence + + +from redis.client import Redis + +def queue_cmd(conn: Redis, + job_queue: str, + cmd: Union[str, Sequence[str]], + email: Optional[str] = None, + env: Optional[dict] = None) -> str: + """Given a command CMD; (optional) EMAIL; and a redis connection CONN, queue +it in Redis with an initial status of 'queued'. The following status codes +are supported: + + queued: Unprocessed; Still in the queue + running: Still running + success: Successful completion + error: Erroneous completion + +Returns the name of the specific redis hash for the specific task. + + """ + if not conn.ping(): + raise RedisConnectionError + unique_id = ("cmd::" + f"{datetime.now().strftime('%Y-%m-%d%H-%M%S-%M%S-')}" + f"{str(uuid4())}") + conn.rpush(job_queue, unique_id) + for key, value in { + "cmd": json.dumps(cmd), "result": "", "status": "queued"}.items(): + conn.hset(name=unique_id, key=key, value=value) + if email: + conn.hset(name=unique_id, key="email", value=email) + if env: + conn.hset(name=unique_id, key="env", value=json.dumps(env)) + return unique_id + +def run_cmd(cmd: str, success_codes: Tuple = (0,), env: str = None) -> Dict: + """Run CMD and return the CMD's status code and output as a dict""" + parsed_cmd = json.loads(cmd) + parsed_env = (json.loads(env) if env is not None else None) + results = subprocess.run( + parsed_cmd, capture_output=True, shell=isinstance(parsed_cmd, str), + check=False, env=parsed_env) + out = str(results.stdout, 'utf-8') + if results.returncode not in success_codes: # Error! + out = str(results.stderr, 'utf-8') + return {"code": results.returncode, "output": out} + +def run_async_cmd( + conn: Redis, job_queue: str, cmd: Union[str, Sequence[str]], + email: Optional[str] = None, env: Optional[dict] = None) -> str: + """A utility function to call `gn3.commands.queue_cmd` function and run the + worker in the `one-shot` mode.""" + cmd_id = queue_cmd(conn, job_queue, cmd, email, env) + subprocess.Popen([f"{sys.executable}", "-m", "sheepdog.worker"]) # pylint: disable=[consider-using-with] + return cmd_id diff --git a/gn_auth/jobs.py b/gn_auth/jobs.py new file mode 100644 index 0000000..6fc9bba --- /dev/null +++ b/gn_auth/jobs.py @@ -0,0 +1,61 @@ +"""Handle external processes in a consistent manner.""" +import json +from typing import Any +from uuid import UUID, uuid4 +from datetime import datetime + +from redis import Redis +from pymonad.either import Left, Right, Either + +from gn_auth import json_encoders_decoders as jed + +JOBS_NAMESPACE = "GN3_AUTH::JOBS" + +class InvalidCommand(Exception): + """Raise if the command to run is invalid.""" + +def job_key(job_id: UUID, namespace_prefix: str = JOBS_NAMESPACE): + """Build the namespace key for a specific job.""" + return f"{namespace_prefix}::{job_id}" + +def job(redisconn: Redis, job_id: UUID) -> Either: + """Retrive the job details of a job identified by `job_id`.""" + the_job = redisconn.hgetall(job_key(job_id)) + if the_job: + return Right({ + key: json.loads(value, object_hook=jed.custom_json_decoder) + for key, value in the_job.items() + }) + return Left({ + "error": "NotFound", + "error_description": f"Job '{job_id}' was not found." + }) + +def __command_valid__(job_command: Any) -> Either: + if not isinstance(job_command, list): + return Left({ + "error": "InvalidJobCommand", + "error_description": "The job command MUST be a list." + }) + if not all((isinstance(val, str) for val in job_command)): + return Left({ + "error": "InvalidJobCommand", + "error_description": "All parts of the command MUST be strings." + }) + return Right(job_command) + +def create_job(redisconn: Redis, job_details: dict[str, Any]) -> UUID: + """Create a new job and put it on Redis.""" + def __create__(_job_command): + job_id = job_details.get("job_id", uuid4()) + redisconn.hset(job_key(job_id), mapping={ + key: json.dumps(value, cls=jed.CustomJSONEncoder) for key, value in { + **job_details, "job_id": job_id, "created": datetime.now(), + "status": "queued" + }.items() + }) + return job_id + def __raise__(err): + raise InvalidCommand(err["error_description"]) + return __command_valid__(job_details.get("command")).either( + __raise__, __create__) diff --git a/gn_auth/json_encoders_decoders.py b/gn_auth/json_encoders_decoders.py new file mode 100644 index 0000000..be15b34 --- /dev/null +++ b/gn_auth/json_encoders_decoders.py @@ -0,0 +1,31 @@ +"""Custom json encoders for various purposes.""" +import json +from uuid import UUID +from datetime import datetime + +__ENCODERS__ = { + UUID: lambda obj: {"__type": "UUID", "__value": str(obj)}, + datetime: lambda obj: {"__type": "DATETIME", "__value": obj.isoformat()} +} + +class CustomJSONEncoder(json.JSONEncoder): + """ + A custom JSON encoder to handle cases where the default encoder fails. + """ + def default(self, obj):# pylint: disable=[arguments-renamed] + """Return a serializable object for `obj`.""" + if type(obj) in __ENCODERS__: + return __ENCODERS__[type(obj)](obj) + return json.JSONEncoder.default(self, obj) + + +__DECODERS__ = { + "UUID": UUID, + "DATETIME": datetime.fromisoformat +} + +def custom_json_decoder(obj_dict): + """Decode custom types""" + if "__type" in obj_dict: + return __DECODERS__[obj_dict["__type"]](obj_dict["__value"]) + return obj_dict diff --git a/gn_auth/session.py b/gn_auth/session.py new file mode 100644 index 0000000..7226ac5 --- /dev/null +++ b/gn_auth/session.py @@ -0,0 +1,60 @@ +"""Handle any GN3 sessions.""" +from functools import wraps +from datetime import datetime, timezone, timedelta + +from flask import flash, request, session, url_for, redirect + +__SESSION_KEY__ = "GN::AUTH::session_details" + +def __session_expired__(): + """Check whether the session has expired.""" + return datetime.now(tz=timezone.utc) >= session[__SESSION_KEY__]["expires"] + +def logged_in(func): + """Verify the user is logged in.""" + @wraps(func) + def __logged_in__(*args, **kwargs): + if bool(session.get(__SESSION_KEY__)) and not __session_expired__(): + return func(*args, **kwargs) + flash("You need to be logged in to access that page.", "alert-danger") + return redirect(url_for( + "oauth2.admin.login", next=request.url_rule.endpoint)) + return __logged_in__ + +def session_info(): + """Retrieve the session information.""" + return session.get(__SESSION_KEY__, False) + +def session_user(): + """Retrieve session user.""" + info = session_info() + return info and info["user"] + +def clear_session_info(): + """Clear any session info.""" + try: + session.pop(__SESSION_KEY__) + except KeyError as _keyerr: + pass + +def session_expired() -> bool: + """ + Check whether the session has expired. Will always return `True` if no + session currently exists. + """ + if bool(session.get(__SESSION_KEY__)): + now = datetime.now(tz=timezone.utc) + return now >= session[__SESSION_KEY__]["expires"] + return True + +def update_expiry() -> bool: + """Update the session expiry and return a boolean indicating success.""" + if not session_expired(): + now = datetime.now(tz=timezone.utc) + session[__SESSION_KEY__]["expires"] = now + timedelta(minutes=10) + return True + return False + +def update_session_info(**info): + """Update the session information.""" + session[__SESSION_KEY__] = info |