diff options
-rw-r--r-- | gn_auth/commands.py | 2 | ||||
-rw-r--r-- | gn_auth/jobs.py | 2 | ||||
-rw-r--r-- | gn_auth/settings.py | 1 | ||||
-rw-r--r-- | scripts/search_phenotypes.py | 125 | ||||
-rw-r--r-- | scripts/worker.py | 83 |
5 files changed, 211 insertions, 2 deletions
diff --git a/gn_auth/commands.py b/gn_auth/commands.py index 02bed10..cf79041 100644 --- a/gn_auth/commands.py +++ b/gn_auth/commands.py @@ -64,5 +64,5 @@ def run_async_cmd( """A utility function to call `gn3.commands.queue_cmd` function and run the worker in the `one-shot` mode.""" cmd_id = queue_cmd(conn, job_queue, cmd, email, env) - subprocess.Popen([f"{sys.executable}", "-m", "sheepdog.worker"]) # pylint: disable=[consider-using-with] + subprocess.Popen([f"{sys.executable}", "-m", "scripts.worker", job_queue]) # pylint: disable=[consider-using-with] return cmd_id diff --git a/gn_auth/jobs.py b/gn_auth/jobs.py index 6fc9bba..8f9f4f0 100644 --- a/gn_auth/jobs.py +++ b/gn_auth/jobs.py @@ -9,7 +9,7 @@ from pymonad.either import Left, Right, Either from gn_auth import json_encoders_decoders as jed -JOBS_NAMESPACE = "GN3_AUTH::JOBS" +JOBS_NAMESPACE = "GN_AUTH::JOBS" class InvalidCommand(Exception): """Raise if the command to run is invalid.""" diff --git a/gn_auth/settings.py b/gn_auth/settings.py index 394c557..a60ab7e 100644 --- a/gn_auth/settings.py +++ b/gn_auth/settings.py @@ -14,6 +14,7 @@ AUTH_MIGRATIONS = "migrations/auth" # Redis settings REDIS_URI = "redis://localhost:6379/0" +REDIS_JOB_QUEUE = "GN_AUTH::job-queue" # OAuth2 settings OAUTH2_SCOPE = ( diff --git a/scripts/search_phenotypes.py b/scripts/search_phenotypes.py new file mode 100644 index 0000000..20d91c9 --- /dev/null +++ b/scripts/search_phenotypes.py @@ -0,0 +1,125 @@ +""" +A script to do search for phenotype traits using the Xapian Search endpoint. +""" +import uuid +import json +import traceback +from urllib.parse import urljoin +from typing import Any, Iterable +from datetime import datetime, timedelta + +import click +import redis +import requests + +from gn_auth import jobs +from gn_auth.auth.db import mariadb as gn3db +from gn_auth.auth.db import sqlite3 as authdb +from gn_auth.settings import SQL_URI, AUTH_DB +from gn_auth.auth.authorisation.data.phenotypes import linked_phenotype_data + +class NoSearchResults(Exception): + """Raise when there are no results for a search.""" + +def do_search( + host: str, query: str, per_page: int, page: int = 1) -> Iterable[dict[str, Any]]: + """Do the search and return the results""" + search_uri = urljoin(host, (f"search/?page={page}&per_page={per_page}" + f"&type=phenotype&query={query}")) + response = requests.get(search_uri) + results = response.json() + if len(results) > 0: + return (item for item in results) + raise NoSearchResults(f"No results for search '{query}'") + +def __filter_object__(search_item): + return (search_item["species"], search_item["group"], + search_item["dataset"], search_item["name"]) + +def remove_selected(search_results, selected: tuple): + """Remove any item that the user has selected.""" + return (item for item in search_results if __filter_object__(item) not in selected) + +def remove_linked(search_results, linked: tuple): + """Remove any item that has been already linked to a user group.""" + return (item for item in search_results if __filter_object__(item) not in linked) + +def update_status(redisconn: redis.Redis, redisname, status: str): + """Update the status of the search.""" + redisconn.hset(redisname, "status", json.dumps(status)) + +def update_search_results(redisconn: redis.Redis, redisname: str, + results: tuple[dict[str, Any], ...]): + """Save the results to redis db.""" + key = "search_results" + prev_results = tuple(json.loads(redisconn.hget(redisname, key) or "[]")) + redisconn.hset(redisname, key, json.dumps(prev_results + results)) + +def expire_redis_results(redisconn: redis.Redis, redisname: str): + """Expire the results after a while to ensure they are cleaned up.""" + redisconn.expireat(redisname, datetime.now() + timedelta(minutes=30)) + +@click.command() +@click.argument("species") +@click.argument("query") +@click.argument("job-id", type=click.UUID) +@click.option( + "--host", default="http://localhost:8080/api/", help="The URI to GN3.") +@click.option("--per-page", default=10000, help="Number of results per page.") +@click.option("--selected", default="[]", help="Selected traits.") +@click.option( + "--auth-db-uri", default=AUTH_DB, help="The SQL URI to the auth database.") +@click.option( + "--gn3-db-uri", default=SQL_URI, + help="The SQL URI to the main GN3 database.") +@click.option( + "--redis-uri", default="redis://:@localhost:6379/0", + help="The URI to the redis server.") +def search(# pylint: disable=[too-many-arguments, too-many-locals] + species: str, query: str, job_id: uuid.UUID, host: str, per_page: int, + selected: str, auth_db_uri: str, gn3_db_uri: str, redis_uri: str): + """ + Search for phenotype traits, filtering out any linked and selected traits, + loading more and more pages until the `per_page` quota is fulfilled or the + search runs out of pages. + """ + redisname = jobs.job_key(job_id) + with (authdb.connection(auth_db_uri) as authconn, + gn3db.database_connection(gn3_db_uri) as gn3conn, + redis.Redis.from_url(redis_uri, decode_responses=True) as redisconn): + update_status(redisconn, redisname, "started") + update_search_results(redisconn, redisname, tuple()) # init search results + try: + search_query = f"species:{species}" + ( + f" AND ({query})" if bool(query) else "") + selected_traits = tuple( + (item["species"], item["group"], item["dataset"], item["name"]) + for item in json.loads(selected)) + linked = tuple( + (row["SpeciesName"], row["InbredSetName"], row["dataset_name"], + str(row["PublishXRefId"])) + for row in linked_phenotype_data(authconn, gn3conn, species)) + page = 1 + count = 0 + while count < per_page: + results = tuple(remove_linked( + remove_selected( + do_search(host, search_query, per_page, page), + selected_traits), + linked))[0:per_page-count] + count = count + len(results) + page = page + 1 + update_search_results(redisconn, redisname, results) + except NoSearchResults as _nsr: + pass + except Exception as _exc: # pylint: disable=[broad-except] + update_status(redisconn, redisname, "failed") + redisconn.hset(redisname, "exception", json.dumps(traceback.format_exc())) + expire_redis_results(redisconn, redisname) + return 1 + update_status(redisconn, redisname, "completed") + expire_redis_results(redisconn, redisname) + return 0 + +if __name__ == "__main__": + search() # pylint: disable=[no-value-for-parameter] diff --git a/scripts/worker.py b/scripts/worker.py new file mode 100644 index 0000000..0a77d41 --- /dev/null +++ b/scripts/worker.py @@ -0,0 +1,83 @@ +"""Daemon that processes commands""" +import os +import sys +import time +import argparse + +import redis +import redis.connection + +from gn_auth.commands import run_cmd + +# Enable importing from one dir up: put as first to override any other globally +# accessible GN3 +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +def update_status(conn, cmd_id, status): + """Helper to update command status""" + conn.hset(name=f"{cmd_id}", key="status", value=f"{status}") + +def make_incremental_backoff(init_val: float=0.1, maximum: int=420): + """ + Returns a closure that can be used to increment the returned value up to + `maximum` or reset it to `init_val`. + """ + current = init_val + + def __increment_or_reset__(command: str, value: float=0.1): + nonlocal current + if command == "reset": + current = init_val + return current + + if command == "increment": + current = min(current + abs(value), maximum) + return current + + return current + + return __increment_or_reset__ + +def run_jobs(conn, queue_name: str): + """Process the redis using a redis connection, CONN""" + # pylint: disable=E0401, C0415 + cmd_id = (conn.lpop(queue_name) or b'').decode("utf-8") + if bool(cmd_id): + cmd = conn.hget(name=cmd_id, key="cmd") + if cmd and (conn.hget(cmd_id, "status") == b"queued"): + update_status(conn, cmd_id, "running") + result = run_cmd( + cmd.decode("utf-8"), env=conn.hget(name=cmd_id, key="env")) + conn.hset(name=cmd_id, key="result", value=result.get("output")) + if result.get("code") == 0: # Success + update_status(conn, cmd_id, "success") + else: + update_status(conn, cmd_id, "error") + conn.hset(cmd_id, "stderr", result.get("output")) + return cmd_id + return None + +def parse_cli_arguments(): + """Parse the command-line arguments.""" + parser = argparse.ArgumentParser( + description="Run asynchronous (service) commands.") + parser.add_argument("queue_name", help="Queue to check in redis") + parser.add_argument( + "--daemon", default=False, action="store_true", + help=( + "Run process as a daemon instead of the default 'one-shot' " + "process")) + return parser.parse_args() + +if __name__ == "__main__": + args = parse_cli_arguments() + with redis.Redis() as redis_conn: + if not args.daemon: + run_jobs(redis_conn, args.queue_name) + else: + sleep_time = make_incremental_backoff() + while True: # Daemon that keeps running forever: + if run_jobs(redis_conn, args.queue_name): + time.sleep(sleep_time("reset")) + continue + time.sleep(sleep_time("increment", sleep_time("return_current"))) |