about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2023-12-05 16:51:47 +0300
committerFrederick Muriuki Muriithi2023-12-05 17:24:44 +0300
commitbc78ac2cf926f38ef88309dd1b4288b1b1230b66 (patch)
tree8cfec39c7740b65d7e02460eaf5b7643d1c1ae2c
parentd59c3c49b2fcb60550be68f241f2526895512e94 (diff)
downloadgn-auth-bc78ac2cf926f38ef88309dd1b4288b1b1230b66.tar.gz
Add missing scripts and update code to invoke them
Copy the missing scripts over from GN3 and update them for
gn-auth. Update the code to invoke the scripts correctly. Set up
correct redis keys for use with the scripts.
-rw-r--r--gn_auth/commands.py2
-rw-r--r--gn_auth/jobs.py2
-rw-r--r--gn_auth/settings.py1
-rw-r--r--scripts/search_phenotypes.py125
-rw-r--r--scripts/worker.py83
5 files changed, 211 insertions, 2 deletions
diff --git a/gn_auth/commands.py b/gn_auth/commands.py
index 02bed10..cf79041 100644
--- a/gn_auth/commands.py
+++ b/gn_auth/commands.py
@@ -64,5 +64,5 @@ def run_async_cmd(
     """A utility function to call `gn3.commands.queue_cmd` function and run the
     worker in the `one-shot` mode."""
     cmd_id = queue_cmd(conn, job_queue, cmd, email, env)
-    subprocess.Popen([f"{sys.executable}", "-m", "sheepdog.worker"]) # pylint: disable=[consider-using-with]
+    subprocess.Popen([f"{sys.executable}", "-m", "scripts.worker", job_queue]) # pylint: disable=[consider-using-with]
     return cmd_id
diff --git a/gn_auth/jobs.py b/gn_auth/jobs.py
index 6fc9bba..8f9f4f0 100644
--- a/gn_auth/jobs.py
+++ b/gn_auth/jobs.py
@@ -9,7 +9,7 @@ from pymonad.either import Left, Right, Either
 
 from gn_auth import json_encoders_decoders as jed
 
-JOBS_NAMESPACE = "GN3_AUTH::JOBS"
+JOBS_NAMESPACE = "GN_AUTH::JOBS"
 
 class InvalidCommand(Exception):
     """Raise if the command to run is invalid."""
diff --git a/gn_auth/settings.py b/gn_auth/settings.py
index 394c557..a60ab7e 100644
--- a/gn_auth/settings.py
+++ b/gn_auth/settings.py
@@ -14,6 +14,7 @@ AUTH_MIGRATIONS = "migrations/auth"
 
 # Redis settings
 REDIS_URI = "redis://localhost:6379/0"
+REDIS_JOB_QUEUE = "GN_AUTH::job-queue"
 
 # OAuth2 settings
 OAUTH2_SCOPE = (
diff --git a/scripts/search_phenotypes.py b/scripts/search_phenotypes.py
new file mode 100644
index 0000000..20d91c9
--- /dev/null
+++ b/scripts/search_phenotypes.py
@@ -0,0 +1,125 @@
+"""
+A script to do search for phenotype traits using the Xapian Search endpoint.
+"""
+import uuid
+import json
+import traceback
+from urllib.parse import urljoin
+from typing import Any, Iterable
+from datetime import datetime, timedelta
+
+import click
+import redis
+import requests
+
+from gn_auth import jobs
+from gn_auth.auth.db import mariadb as gn3db
+from gn_auth.auth.db import sqlite3 as authdb
+from gn_auth.settings import SQL_URI, AUTH_DB
+from gn_auth.auth.authorisation.data.phenotypes import linked_phenotype_data
+
+class NoSearchResults(Exception):
+    """Raise when there are no results for a search."""
+
+def do_search(
+        host: str, query: str, per_page: int, page: int = 1) -> Iterable[dict[str, Any]]:
+    """Do the search and return the results"""
+    search_uri = urljoin(host, (f"search/?page={page}&per_page={per_page}"
+                                f"&type=phenotype&query={query}"))
+    response = requests.get(search_uri)
+    results = response.json()
+    if len(results) > 0:
+        return (item for item in results)
+    raise NoSearchResults(f"No results for search '{query}'")
+
+def __filter_object__(search_item):
+    return (search_item["species"], search_item["group"],
+            search_item["dataset"], search_item["name"])
+
+def remove_selected(search_results, selected: tuple):
+    """Remove any item that the user has selected."""
+    return (item for item in search_results if __filter_object__(item) not in selected)
+
+def remove_linked(search_results, linked: tuple):
+    """Remove any item that has been already linked to a user group."""
+    return (item for item in search_results if __filter_object__(item) not in linked)
+
+def update_status(redisconn: redis.Redis, redisname, status: str):
+    """Update the status of the search."""
+    redisconn.hset(redisname, "status", json.dumps(status))
+
+def update_search_results(redisconn: redis.Redis, redisname: str,
+                          results: tuple[dict[str, Any], ...]):
+    """Save the results to redis db."""
+    key = "search_results"
+    prev_results = tuple(json.loads(redisconn.hget(redisname, key) or "[]"))
+    redisconn.hset(redisname, key, json.dumps(prev_results + results))
+
+def expire_redis_results(redisconn: redis.Redis, redisname: str):
+    """Expire the results after a while to ensure they are cleaned up."""
+    redisconn.expireat(redisname, datetime.now() + timedelta(minutes=30))
+
+@click.command()
+@click.argument("species")
+@click.argument("query")
+@click.argument("job-id", type=click.UUID)
+@click.option(
+    "--host", default="http://localhost:8080/api/", help="The URI to GN3.")
+@click.option("--per-page", default=10000, help="Number of results per page.")
+@click.option("--selected", default="[]", help="Selected traits.")
+@click.option(
+    "--auth-db-uri", default=AUTH_DB, help="The SQL URI to the auth database.")
+@click.option(
+    "--gn3-db-uri", default=SQL_URI,
+    help="The SQL URI to the main GN3 database.")
+@click.option(
+    "--redis-uri", default="redis://:@localhost:6379/0",
+    help="The URI to the redis server.")
+def search(# pylint: disable=[too-many-arguments, too-many-locals]
+        species: str, query: str, job_id: uuid.UUID, host: str, per_page: int,
+        selected: str, auth_db_uri: str, gn3_db_uri: str, redis_uri: str):
+    """
+    Search for phenotype traits, filtering out any linked and selected traits,
+    loading more and more pages until the `per_page` quota is fulfilled or the
+    search runs out of pages.
+    """
+    redisname = jobs.job_key(job_id)
+    with (authdb.connection(auth_db_uri) as authconn,
+          gn3db.database_connection(gn3_db_uri) as gn3conn,
+          redis.Redis.from_url(redis_uri, decode_responses=True) as redisconn):
+        update_status(redisconn, redisname, "started")
+        update_search_results(redisconn, redisname, tuple()) # init search results
+        try:
+            search_query = f"species:{species}" + (
+                f" AND ({query})" if bool(query) else "")
+            selected_traits = tuple(
+                (item["species"], item["group"], item["dataset"], item["name"])
+                for item in json.loads(selected))
+            linked = tuple(
+                (row["SpeciesName"], row["InbredSetName"], row["dataset_name"],
+                 str(row["PublishXRefId"]))
+                for row in linked_phenotype_data(authconn, gn3conn, species))
+            page = 1
+            count = 0
+            while count < per_page:
+                results = tuple(remove_linked(
+                    remove_selected(
+                        do_search(host, search_query, per_page, page),
+                        selected_traits),
+                    linked))[0:per_page-count]
+                count = count + len(results)
+                page = page + 1
+                update_search_results(redisconn, redisname, results)
+        except NoSearchResults as _nsr:
+            pass
+        except Exception as _exc: # pylint: disable=[broad-except]
+            update_status(redisconn, redisname, "failed")
+            redisconn.hset(redisname, "exception", json.dumps(traceback.format_exc()))
+            expire_redis_results(redisconn, redisname)
+            return 1
+        update_status(redisconn, redisname, "completed")
+        expire_redis_results(redisconn, redisname)
+        return 0
+
+if __name__ == "__main__":
+    search() # pylint: disable=[no-value-for-parameter]
diff --git a/scripts/worker.py b/scripts/worker.py
new file mode 100644
index 0000000..0a77d41
--- /dev/null
+++ b/scripts/worker.py
@@ -0,0 +1,83 @@
+"""Daemon that processes commands"""
+import os
+import sys
+import time
+import argparse
+
+import redis
+import redis.connection
+
+from gn_auth.commands import run_cmd
+
+# Enable importing from one dir up: put as first to override any other globally
+# accessible GN3
+sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
+
+def update_status(conn, cmd_id, status):
+    """Helper to update command status"""
+    conn.hset(name=f"{cmd_id}", key="status", value=f"{status}")
+
+def make_incremental_backoff(init_val: float=0.1, maximum: int=420):
+    """
+    Returns a closure that can be used to increment the returned value up to
+    `maximum` or reset it to `init_val`.
+    """
+    current = init_val
+
+    def __increment_or_reset__(command: str, value: float=0.1):
+        nonlocal current
+        if command == "reset":
+            current = init_val
+            return current
+
+        if command == "increment":
+            current = min(current + abs(value), maximum)
+            return current
+
+        return current
+
+    return __increment_or_reset__
+
+def run_jobs(conn, queue_name: str):
+    """Process the redis using a redis connection, CONN"""
+    # pylint: disable=E0401, C0415
+    cmd_id = (conn.lpop(queue_name) or b'').decode("utf-8")
+    if bool(cmd_id):
+        cmd = conn.hget(name=cmd_id, key="cmd")
+        if cmd and (conn.hget(cmd_id, "status") == b"queued"):
+            update_status(conn, cmd_id, "running")
+            result = run_cmd(
+                cmd.decode("utf-8"), env=conn.hget(name=cmd_id, key="env"))
+            conn.hset(name=cmd_id, key="result", value=result.get("output"))
+            if result.get("code") == 0:  # Success
+                update_status(conn, cmd_id, "success")
+            else:
+                update_status(conn, cmd_id, "error")
+                conn.hset(cmd_id, "stderr", result.get("output"))
+        return cmd_id
+    return None
+
+def parse_cli_arguments():
+    """Parse the command-line arguments."""
+    parser = argparse.ArgumentParser(
+        description="Run asynchronous (service) commands.")
+    parser.add_argument("queue_name", help="Queue to check in redis")
+    parser.add_argument(
+        "--daemon", default=False, action="store_true",
+        help=(
+            "Run process as a daemon instead of the default 'one-shot' "
+            "process"))
+    return parser.parse_args()
+
+if __name__ == "__main__":
+    args = parse_cli_arguments()
+    with redis.Redis() as redis_conn:
+        if not args.daemon:
+            run_jobs(redis_conn, args.queue_name)
+        else:
+            sleep_time = make_incremental_backoff()
+            while True:  # Daemon that keeps running forever:
+                if run_jobs(redis_conn, args.queue_name):
+                    time.sleep(sleep_time("reset"))
+                    continue
+                time.sleep(sleep_time("increment", sleep_time("return_current")))