From 8af8105444522c2c71b5ddd36a550e964cddffbf Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Mon, 17 Apr 2023 14:42:05 +0300 Subject: Hook up code to use external search script for phenotypes --- gn3/auth/authorisation/data/views.py | 56 ++++++++++++++++++++++----------- gn3/auth/authorisation/groups/data.py | 4 +-- gn3/jobs.py | 58 +++++++++++++++++++++++++++++++++++ scripts/search_phenotypes.py | 10 +++--- sheepdog/worker.py | 1 + 5 files changed, 104 insertions(+), 25 deletions(-) create mode 100644 gn3/jobs.py diff --git a/gn3/auth/authorisation/data/views.py b/gn3/auth/authorisation/data/views.py index 33ba262..043ebd6 100644 --- a/gn3/auth/authorisation/data/views.py +++ b/gn3/auth/authorisation/data/views.py @@ -1,43 +1,32 @@ """Handle data endpoints.""" -import os +import sys import uuid import json -import random -import string -import datetime -from functools import reduce, partial -from typing import Any, Sequence, Iterable +from typing import Any +from functools import partial import redis from MySQLdb.cursors import DictCursor -from email_validator import validate_email, EmailNotValidError from authlib.integrations.flask_oauth2.errors import _HTTPException from flask import request, jsonify, Response, Blueprint, current_app as app import gn3.db_utils as gn3db +from gn3 import jobs +from gn3.commands import run_async_cmd from gn3.db.traits import build_trait_name from gn3.auth import db -from gn3.auth.dictify import dictify from gn3.auth.db_utils import with_db_connection from gn3.auth.authorisation.errors import InvalidData, NotFoundError -from gn3.auth.authorisation.roles.models import( - revoke_user_role_by_name, assign_user_role_by_name) - -from gn3.auth.authorisation.groups.models import ( - Group, user_group, group_by_id, add_user_to_group) +from gn3.auth.authorisation.groups.models import group_by_id from gn3.auth.authorisation.resources.checks import authorised_for from gn3.auth.authorisation.resources.models import ( user_resources, public_resources, attach_resources_data) -from gn3.auth.authorisation.errors import ForbiddenAccess - - from gn3.auth.authentication.oauth2.resource_server import require_oauth -from gn3.auth.authentication.users import User, user_by_email, set_user_password from gn3.auth.authorisation.data.mrna import link_mrna_data, ungrouped_mrna_data from gn3.auth.authorisation.data.genotypes import ( @@ -159,7 +148,28 @@ def __search_genotypes__(): return jsonify(with_db_connection(__ungrouped__)) def __search_phenotypes__(): - pass + # launch the external process to search for phenotypes + redisuri = app.config["REDIS_URI"] + with redis.Redis.from_url(redisuri, decode_responses=True) as redisconn: + job_id = uuid.uuid4() + command =[ + sys.executable, "-m", "scripts.search_phenotypes", + __request_key__("species_name"), + __request_key__("query"), + str(job_id), + f"--host={__request_key__('gn3_server_uri')}", + f"--auth-db-uri={app.config['AUTH_DB']}", + f"--gn3-db-uri={app.config['SQL_URI']}", + f"--redis-uri={redisuri}"] + jobs.create_job(redisconn, { + "job_id": job_id, "command": command, "status": "queued", + "search_results": "[]"}) + return jsonify({ + "job_id": job_id, + "command_id": run_async_cmd( + redisconn, app.config.get("REDIS_JOB_QUEUE"), command), + "command": command + }) @data.route("/search", methods=["GET"]) @require_oauth("profile group resource") @@ -173,6 +183,16 @@ def search_unlinked_data(): } return search_fns[dataset_type]() +@data.route("/search/phenotype/", methods=["GET"]) +def pheno_search_results(job_id: uuid.UUID) -> Response: + """Get the search results from the external script""" + def __search_error__(err): + raise NotFoundError(err["error_description"]) + redisuri = app.config["REDIS_URI"] + with redis.Redis.from_url(redisuri, decode_responses=True) as redisconn: + return jobs.job(redisconn, job_id).either( + __search_error__, jsonify) + @data.route("/link/genotype", methods=["POST"]) def link_genotypes() -> Response: """Link genotype data to group.""" diff --git a/gn3/auth/authorisation/groups/data.py b/gn3/auth/authorisation/groups/data.py index 453cc71..ee6f70e 100644 --- a/gn3/auth/authorisation/groups/data.py +++ b/gn3/auth/authorisation/groups/data.py @@ -1,13 +1,11 @@ """Handles the resource objects' data.""" -from typing import Any, Sequence - from MySQLdb.cursors import DictCursor from gn3 import db_utils as gn3db from gn3.auth import db as authdb from gn3.auth.authorisation.groups import Group from gn3.auth.authorisation.checks import authorised_p -from gn3.auth.authorisation.errors import InvalidData, NotFoundError +from gn3.auth.authorisation.errors import NotFoundError def __fetch_mrna_data_by_ids__( conn: gn3db.Connection, dataset_ids: tuple[str, ...]) -> tuple[ diff --git a/gn3/jobs.py b/gn3/jobs.py new file mode 100644 index 0000000..8854a61 --- /dev/null +++ b/gn3/jobs.py @@ -0,0 +1,58 @@ +"""Handle external processes in a consistent manner.""" +import json +from typing import Any +from uuid import UUID, uuid4 +from datetime import datetime + +from redis import Redis + +from pymonad.either import Left, Right, Either + +JOBS_NAMESPACE = "GN3::JOBS" + +class InvalidCommand(Exception): + """Raise if the command to run is invalid.""" + +def job_key(job_id: UUID, namespace_prefix: str = JOBS_NAMESPACE): + """Build the namespace key for a specific job.""" + return f"{namespace_prefix}::{job_id}" + +def job(redisconn: Redis, job_id: UUID) -> Either: + """Retrive the job details of a job identified by `job_id`.""" + the_job = redisconn.hgetall(job_key(job_id)) + if the_job: + return Right({ + **the_job, + "search_results": json.loads(the_job["search_results"]) + }) + return Left({ + "error": "NotFound", + "error_description": f"Job '{job_id}' was not found." + }) + +def __command_valid__(job_command: Any) -> Either: + if not isinstance(job_command, list): + return Left({ + "error": "InvalidJobCommand", + "error_description": "The job command MUST be a list." + }) + if not all((isinstance(val, str) for val in job_command)): + return Left({ + "error": "InvalidJobCommand", + "error_description": "All parts of the command MUST be strings." + }) + return Right(job_command) + +def create_job(redisconn: Redis, job_details: dict[str, Any]) -> UUID: + """Create a new job and put it on Redis.""" + def __create__(_job_command): + job_id = job_details.get("job_id", uuid4()) + redisconn.hset(job_key(job_id), mapping={ + **job_details, "job_id": job_id, "created": datetime.now(), "stdout": "", + "stderr": "", "status": "queued" + }) + return job_id + def __raise__(err): + raise InvalidCommand(err["error_description"]) + return __command_valid__(job_details.get("command")).either( + __raise__, __create__) diff --git a/scripts/search_phenotypes.py b/scripts/search_phenotypes.py index cdd1d96..4533cce 100644 --- a/scripts/search_phenotypes.py +++ b/scripts/search_phenotypes.py @@ -12,6 +12,7 @@ import click import redis import requests +from gn3 import jobs from gn3.auth import db as authdb from gn3 import db_utils as gn3db from gn3.settings import SQL_URI, AUTH_DB @@ -43,7 +44,8 @@ def remove_linked(search_results, linked: tuple): """Remove any item that has been already linked to a user group.""" return (item for item in search_results if __filter_object__(item) not in linked) -def save_to_redis(redisconn: redis.Redis, redisname: str, results: tuple[dict[str, Any], ...]): +def update_search_results(redisconn: redis.Redis, redisname: str, + results: tuple[dict[str, Any], ...]): """Save the results to redis db.""" key = "search_results" prev_results = tuple(json.loads(redisconn.hget(redisname, key) or "[]")) @@ -77,12 +79,12 @@ def search(# pylint: disable=[too-many-arguments, too-many-locals] loading more and more pages until the `per_page` quota is fulfilled or the search runs out of pages. """ - redisname = f"GN3:JOBS:{job_id}" + redisname = jobs.job_key(job_id) with (authdb.connection(auth_db_uri) as authconn, gn3db.database_connection(gn3_db_uri) as gn3conn, redis.Redis.from_url(redis_uri, decode_responses=True) as redisconn): redisconn.hset(redisname, "status", "started") - save_to_redis(redisconn, redisname, tuple()) + update_search_results(redisconn, redisname, tuple()) # init search results try: search_query = f"species:{species}" + ( f" AND ({query})" if bool(query) else "") @@ -103,7 +105,7 @@ def search(# pylint: disable=[too-many-arguments, too-many-locals] linked))[0:per_page-count] count = count + len(results) page = page + 1 - save_to_redis(redisconn, redisname, results) + update_search_results(redisconn, redisname, results) except NoSearchResults as _nsr: pass except Exception as _exc: # pylint: disable=[broad-except] diff --git a/sheepdog/worker.py b/sheepdog/worker.py index 6557ab3..c08edec 100644 --- a/sheepdog/worker.py +++ b/sheepdog/worker.py @@ -52,6 +52,7 @@ def run_jobs(conn, queue_name: str = "GN3::job-queue"): update_status(conn, cmd_id, "success") else: update_status(conn, cmd_id, "error") + conn.hset(cmd_id, "stderr", result.get("output")) return cmd_id return None -- cgit v1.2.3