about summary refs log tree commit diff
path: root/gn2/scripts
diff options
context:
space:
mode:
authorArun Isaac2023-12-29 18:55:37 +0000
committerArun Isaac2023-12-29 19:01:46 +0000
commit204a308be0f741726b9a620d88fbc22b22124c81 (patch)
treeb3cf66906674020b530c844c2bb4982c8a0e2d39 /gn2/scripts
parent83062c75442160427b50420161bfcae2c5c34c84 (diff)
downloadgenenetwork2-204a308be0f741726b9a620d88fbc22b22124c81.tar.gz
Namespace all modules under gn2.
We move all modules under a gn2 directory. This is important for
"correct" packaging and deployment as a Guix service.
Diffstat (limited to 'gn2/scripts')
-rw-r--r--gn2/scripts/__init__.py0
-rw-r--r--gn2/scripts/corr_compute.py77
-rw-r--r--gn2/scripts/parse_corr_html_results_to_csv.py175
-rw-r--r--gn2/scripts/profile_corrs.py67
-rw-r--r--gn2/scripts/run_external.py159
5 files changed, 478 insertions, 0 deletions
diff --git a/gn2/scripts/__init__.py b/gn2/scripts/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/gn2/scripts/__init__.py
diff --git a/gn2/scripts/corr_compute.py b/gn2/scripts/corr_compute.py
new file mode 100644
index 00000000..44dcac68
--- /dev/null
+++ b/gn2/scripts/corr_compute.py
@@ -0,0 +1,77 @@
+"""Compute the correlations."""
+
+import sys
+import json
+import pickle
+import pathlib
+import datetime
+
+from flask import g
+
+from gn2.wqflask import app
+from gn2.wqflask.user_session import UserSession
+from gn2.wqflask.correlation.show_corr_results import set_template_vars
+from gn2.wqflask.correlation.correlation_gn3_api import compute_correlation
+
+class UserSessionSimulator():
+
+    def __init__(self, user_id):
+        self._user_id = user_id
+
+    @property
+    def user_id(self):
+        return self._user_id
+
+error_types = {
+    "WrongCorrelationType": "Wrong Correlation Type",
+    "CalledProcessError": "Called Process Error"
+}
+
+def e_time():
+    return datetime.datetime.utcnow().isoformat()
+
+def compute(form):
+    import subprocess
+    try:
+        correlation_results = compute_correlation(form, compute_all=True)
+    except Exception as exc:
+        return {
+            "error-type": error_types[type(exc).__name__],
+            "error-message": exc.args[0]
+        }
+
+    return set_template_vars(form, correlation_results)
+
+if __name__ == "__main__":
+    ARGS_COUNT = 3
+    if len(sys.argv) < ARGS_COUNT:
+        print(f"{e_time()}: You need to pass the file with the picked form",
+              file=sys.stderr)
+        sys.exit(1)
+
+    if len(sys.argv) > ARGS_COUNT:
+        print(f"{e_time()}: Unknown arguments {sys.argv[ARGS_COUNT:]}",
+              file=sys.stderr)
+        sys.exit(1)
+
+    filepath = pathlib.Path(sys.argv[1])
+    if not filepath.exists():
+        print(f"File not found '{filepath}'", file=sys.stderr)
+        sys.exit(2)
+
+    with open(filepath, "rb") as pfile:
+        form = pickle.Unpickler(pfile).load()
+
+    with app.app_context():
+        g.user_session = UserSessionSimulator(sys.argv[2])
+        results = compute(form)
+
+    print(json.dumps(results), file=sys.stdout)
+
+    if "error-type" in results:
+        print(
+            f"{results['error-type']}: {results['error-message']}",
+            file=sys.stderr)
+        sys.exit(3)
+
+    sys.exit(0)
diff --git a/gn2/scripts/parse_corr_html_results_to_csv.py b/gn2/scripts/parse_corr_html_results_to_csv.py
new file mode 100644
index 00000000..464eb2ca
--- /dev/null
+++ b/gn2/scripts/parse_corr_html_results_to_csv.py
@@ -0,0 +1,175 @@
+import csv
+import sys
+import json
+from pathlib import Path
+from functools import reduce, partial
+from typing import Any, Union, Sequence, Optional
+from argparse import Action, Namespace, ArgumentError, ArgumentParser
+
+from lxml import etree
+
+def thread(value, *functions):
+    return reduce(lambda result, func: func(result), functions, value)
+
+def parse_file(filename: Path):
+    with open(filename, encoding="utf-8") as inpfl:
+        raw_html = inpfl.read()
+
+    return etree.HTML(raw_html)
+
+def first_row_headers(table):
+    return tuple(
+        " ".join(text.strip() for text in cell.xpath(".//child::text()"))
+        for cell in table.xpath("./tbody/tr[1]/td"))
+
+def results_table(tables):
+    found = tuple(filter(
+        lambda table: (
+            first_row_headers(table)[0:4] ==
+            ("Index", "Record ID", "Symbol", "Description")),
+        tables))
+    return found[0]
+
+def table_contents(table):
+    return tuple(
+        tuple(" ".join(text.strip() for text in cell.xpath(".//child::text()"))
+            for cell in row)
+        for row in table.xpath("./tbody/tr"))
+
+
+def to_dicts(contents):
+    frow = contents[0]
+    return tuple(dict(zip(frow, row)) for row in contents[1:])
+
+def write_csv(
+        input_file: Path, output_dir: Union[bool, Path],
+        contents: Sequence[dict]) -> Sequence[Sequence[str]]:
+    def __write__(stream):
+        writer = csv.DictWriter(
+            stream, fieldnames=list(contents[0].keys()),
+            dialect=csv.unix_dialect)
+        writer.writeheader()
+        writer.writerows(contents)
+
+    if not bool(output_dir):
+        return __write__(sys.stdout)
+
+    output_file = output_dir.joinpath(
+        f"{input_file.stem}__results.csv")
+    with open(output_file, "w", encoding="utf-8") as out_file:
+        return __write__(out_file)
+
+def output_stream():
+    if not to_output_file:
+        return sys.stdout
+
+    output_file = input_file.parent.joinpath(
+        f"{input_file.stem}.csv")
+    with open(output_file) as out_file:
+        yield out_file
+
+class FileCheck(Action):
+    """Action class to check existence of a given file path."""
+
+    def __init__(self, option_strings, dest, **kwargs):
+        "Initialise the FileCheck action class"
+        super().__init__(option_strings, dest, **kwargs)
+
+    def __call__(# pylint: disable=[signature-differs]
+            self, parser: ArgumentParser, namespace: Namespace,
+            values: Union[str, Sequence[Any], None],
+            option_string: Optional[str] = "") -> None:
+        """Check existence of a given file path and set it, or raise an
+        exception."""
+        the_path = str(values or "")
+        the_file = Path(the_path).absolute().resolve()
+        if not the_file.is_file():
+            raise ArgumentError(
+                self,
+                f"The file '{values}' does not exist or is a folder/directory.")
+
+        setattr(namespace, self.dest, the_file)
+
+class DirectoryCheck(Action):
+    """Action class to check the existence of a particular directory"""
+    def __init__(self, option_strings, dest, **kwargs):
+        """Init `DirectoryCheck` action object."""
+        super().__init__(option_strings, dest, **kwargs)
+
+    def __call__(
+            self, parser: ArgumentParser, namespace: Namespace,
+            values: Union[str, Sequence[Any], None],
+            option_string: Optional[str] = "") -> None:
+        the_dir = Path(str(values or "")).absolute().resolve()
+        if not the_dir.is_dir():
+            raise ArgumentError(
+                self, f"The directory '{the_dir}' does not exist!")
+
+        setattr(namespace, self.dest, the_dir)
+
+def gn1_parser(subparsers) -> None:
+    parser = subparsers.add_parser("gn1")
+    parser.add_argument(
+        "inputfile", help="The HTML file to parse", action=FileCheck)
+    parser.add_argument(
+        "--outputdir", help="Path to output directory", action=DirectoryCheck,
+        default=False)
+    parser.set_defaults(
+        func=lambda args: thread(
+            args.inputfile,
+            parse_file,
+            lambda tree: tree.xpath("//table"),
+            results_table,
+            table_contents,
+            to_dicts,
+            partial(write_csv, args.inputfile, args.outputdir)))
+
+def tablejson_script(scripts):
+    for script in scripts:
+        script_content = thread(
+            script.xpath('.//child::text()'),
+            lambda val: "".join(val).strip())
+        if script_content.find("var tableJson") >= 0:
+            return json.loads(thread(
+                script_content,
+                lambda val: val[len("var tableJson = "):].strip()))
+        continue
+    return None
+
+def gn2_parser(subparsers) -> None:
+    parser = subparsers.add_parser("gn2")
+    parser.add_argument(
+        "inputfile", help="The HTML file to parse", action=FileCheck)
+    parser.add_argument(
+        "--outputdir", help="Path to output directory", action=DirectoryCheck,
+        default=False)
+    parser.set_defaults(
+        func=lambda args: thread(
+            args.inputfile,
+            parse_file,
+            lambda tree: tree.xpath("//script"),
+            tablejson_script,
+            partial(write_csv, args.inputfile, args.outputdir)
+        ))
+
+def parse_cli_args():
+    parser = ArgumentParser(
+        "parse_corr_html_results_to_csv",
+        description = "Parse correlation results from the given HTML file.")
+    subparsers = parser.add_subparsers(
+        title="subcommands", description="Valid subcommands",
+        help="additional help")
+    gn1_parser(subparsers)
+    gn2_parser(subparsers)
+
+    return parser, parser.parse_args()
+
+def run():
+    parser, args = parse_cli_args()
+    try:
+        args.func(args)
+    except AttributeError as _attr_err:
+        parser.print_help()
+
+if __name__ == "__main__":
+    run()
diff --git a/gn2/scripts/profile_corrs.py b/gn2/scripts/profile_corrs.py
new file mode 100644
index 00000000..5f9a06da
--- /dev/null
+++ b/gn2/scripts/profile_corrs.py
@@ -0,0 +1,67 @@
+import io
+import sys
+import pstats
+import cProfile
+
+from flask import g, request
+
+from gn2.utility.startup_config import app_config
+
+from gn2.wqflask import app
+from gn2.wqflask.user_session import UserSession
+from gn2.wqflask.correlation.correlation_gn3_api import compute_correlation
+
+def sample_vals():
+    return '{"C57BL/6J":"10.835","DBA/2J":"11.142","B6D2F1":"11.126","D2B6F1":"11.143","BXD1":"10.811","BXD2":"11.503","BXD5":"10.766","BXD6":"10.986","BXD8":"11.050","BXD9":"10.822","BXD11":"10.670","BXD12":"10.946","BXD13":"10.890","BXD14":"x","BXD15":"10.884","BXD16":"11.222","BXD18":"x","BXD19":"10.968","BXD20":"10.962","BXD21":"10.906","BXD22":"11.080","BXD23":"11.046","BXD24":"11.146","BXD24a":"x","BXD25":"x","BXD27":"11.078","BXD28":"11.034","BXD29":"10.808","BXD30":"x","BXD31":"11.087","BXD32":"11.029","BXD33":"10.662","BXD34":"11.482","BXD35":"x","BXD36":"x","BXD37":"x","BXD38":"10.836","BXD39":"10.926","BXD40":"10.638","BXD41":"x","BXD42":"10.974","BXD43":"10.828","BXD44":"10.900","BXD45":"11.358","BXD48":"11.042","BXD48a":"10.975","BXD49":"x","BXD50":"11.228","BXD51":"11.126","BXD52":"x","BXD53":"x","BXD54":"x","BXD55":"11.580","BXD56":"x","BXD59":"x","BXD60":"10.829","BXD61":"11.152","BXD62":"11.156","BXD63":"10.942","BXD64":"10.506","BXD65":"11.126","BXD65a":"11.272","BXD65b":"11.157","BXD66":"11.071","BXD67":"11.080","BXD68":"10.997","BXD69":"11.096","BXD70":"11.152","BXD71":"x","BXD72":"x","BXD73":"11.262","BXD73a":"11.444","BXD73b":"x","BXD74":"10.974","BXD75":"11.150","BXD76":"10.920","BXD77":"10.928","BXD78":"x","BXD79":"11.371","BXD81":"x","BXD83":"10.946","BXD84":"11.181","BXD85":"10.992","BXD86":"10.770","BXD87":"11.200","BXD88":"x","BXD89":"10.930","BXD90":"11.183","BXD91":"x","BXD93":"11.056","BXD94":"10.737","BXD95":"x","BXD98":"10.986","BXD99":"10.892","BXD100":"x","BXD101":"x","BXD102":"x","BXD104":"x","BXD105":"x","BXD106":"x","BXD107":"x","BXD108":"x","BXD109":"x","BXD110":"x","BXD111":"x","BXD112":"x","BXD113":"x","BXD114":"x","BXD115":"x","BXD116":"x","BXD117":"x","BXD119":"x","BXD120":"x","BXD121":"x","BXD122":"x","BXD123":"x","BXD124":"x","BXD125":"x","BXD126":"x","BXD127":"x","BXD128":"x","BXD128a":"x","BXD130":"x","BXD131":"x","BXD132":"x","BXD133":"x","BXD134":"x","BXD135":"x","BXD136":"x","BXD137":"x","BXD138":"x","BXD139":"x","BXD141":"x","BXD142":"x","BXD144":"x","BXD145":"x","BXD146":"x","BXD147":"x","BXD148":"x","BXD149":"x","BXD150":"x","BXD151":"x","BXD152":"x","BXD153":"x","BXD154":"x","BXD155":"x","BXD156":"x","BXD157":"x","BXD160":"x","BXD161":"x","BXD162":"x","BXD165":"x","BXD168":"x","BXD169":"x","BXD170":"x","BXD171":"x","BXD172":"x","BXD173":"x","BXD174":"x","BXD175":"x","BXD176":"x","BXD177":"x","BXD178":"x","BXD180":"x","BXD181":"x","BXD183":"x","BXD184":"x","BXD186":"x","BXD187":"x","BXD188":"x","BXD189":"x","BXD190":"x","BXD191":"x","BXD192":"x","BXD193":"x","BXD194":"x","BXD195":"x","BXD196":"x","BXD197":"x","BXD198":"x","BXD199":"x","BXD200":"x","BXD201":"x","BXD202":"x","BXD203":"x","BXD204":"x","BXD205":"x","BXD206":"x","BXD207":"x","BXD208":"x","BXD209":"x","BXD210":"x","BXD211":"x","BXD212":"x","BXD213":"x","BXD214":"x","BXD215":"x","BXD216":"x","BXD217":"x","BXD218":"x","BXD219":"x","BXD220":"x"}'
+
+def simulated_form(corr_type: str = "sample"):
+    assert corr_type in ("sample", "tissue", "lit")
+    return {
+            "dataset": "HC_M2_0606_P",
+            "trait_id": "1435464_at",
+            "corr_dataset": "HC_M2_0606_P",
+            "corr_sample_method": "pearson",
+            "corr_return_results": "100",
+            "corr_samples_group": "samples_primary",
+            "sample_vals": sample_vals(),
+            "corr_type": corr_type,
+            "corr_sample_method": "pearson",
+            "location_type": "gene",
+            "corr_return_results": "20000"
+        }
+
+def profile_corrs():
+    "Profile tho correlations"
+    profiler = cProfile.Profile()
+    profiler.enable()
+    correlation_results = compute_correlation(request.form, compute_all=True)
+    profiler.disable()
+    return profiler
+
+def dump_stats(profiler):
+    iostr = io.StringIO()
+    sort_by = pstats.SortKey.CUMULATIVE
+    ps = pstats.Stats(profiler, stream=iostr).sort_stats(sort_by)
+    ps.print_stats()
+
+    cli_args = sys.argv
+    if len(cli_args) > 1:
+        with open(cli_args[1], "w+", encoding="utf-8") as output_file:
+            print(iostr.getvalue(), file=output_file)
+
+        return 0
+
+    print(iostr.getvalue())
+    return 0
+
+
+if __name__ == "__main__":
+    def main():
+        "Entry point for profiler script"
+        return dump_stats(profile_corrs())
+
+    app_config()
+    with app.app_context():
+        with app.test_request_context("/corr_compute", data=simulated_form()):
+            g.user_session = UserSession()
+            main()
diff --git a/gn2/scripts/run_external.py b/gn2/scripts/run_external.py
new file mode 100644
index 00000000..297d17a1
--- /dev/null
+++ b/gn2/scripts/run_external.py
@@ -0,0 +1,159 @@
+"""
+Run jobs in external processes.
+"""
+
+import os
+import sys
+import shlex
+import argparse
+import traceback
+import subprocess
+from uuid import UUID
+from time import sleep
+from datetime import datetime
+from urllib.parse import urlparse
+from tempfile import TemporaryDirectory
+
+# import psutil
+from redis import Redis
+
+import gn2.jobs.jobs as jobs
+
+def print_help(args, parser):
+    print(parser.format_help())
+
+def UUID4(val):
+    return UUID(val)
+
+def redis_connection(parsed_url):
+    return Redis.from_url(
+        f"redis://{parsed_url.netloc}{parsed_url.path}", decode_responses=True)
+
+def update_status(redis_conn: Redis, job_id: UUID, value: str):
+    "Update the job's status."
+    redis_conn.hset(jobs.job_namespace(job_id), key="status", value=value)
+
+def __update_stdout_stderr__(
+        redis_conn: Redis, job_id: UUID, bytes_read: bytes, stream: str):
+    job = jobs.job(redis_conn, job_id)
+    if job.is_nothing():
+        raise jobs.NoSuchJob(job_id)
+
+    job = job.maybe({}, lambda x: x)
+    redis_conn.hset(
+        jobs.job_namespace(job_id), key=stream,
+        value=(job.get(stream, "") + bytes_read.decode("utf-8")))
+
+def set_stdout(redis_conn: Redis, job_id:UUID, bytes_read: bytes):
+    """Set the stdout value for the given job."""
+    job = jobs.job(redis_conn, job_id)
+    if job.is_nothing():
+        raise jobs.NoSuchJob(job_id)
+
+    job = job.maybe({}, lambda x: x)
+    redis_conn.hset(
+        jobs.job_namespace(job_id), key="stdout",
+        value=bytes_read.decode("utf-8"))
+
+def update_stdout(redis_conn: Redis, job_id:UUID, bytes_read: bytes):
+    """Update the stdout value for the given job."""
+    __update_stdout_stderr__(redis_conn, job_id, bytes_read, "stdout")
+
+def update_stderr(redis_conn: Redis, job_id:UUID, bytes_read: bytes):
+    """Update the stderr value for the given job."""
+    __update_stdout_stderr__(redis_conn, job_id, bytes_read, "stderr")
+
+def set_meta(redis_conn: Redis, job_id: UUID, meta_key: str, meta_val: str):
+    job = jobs.job(redis_conn, job_id)
+    if job.is_nothing():
+        raise jobs.NoSuchJob(job_id)
+
+    redis_conn.hset(jobs.job_namespace(job_id), key=meta_key, value=meta_val)
+
+def run_job(redis_conn: Redis, job_id: UUID):
+    """Run the job in an external process."""
+    print(f"THE ARGUMENTS TO RUN_JOB:\n\tConnection: {redis_conn}\n\tJob ID: {job_id}\n")
+
+    the_job = jobs.job(redis_conn, job_id)
+    if the_job.is_nothing():
+        raise jobs.NoSuchJob(job_id)
+
+    with TemporaryDirectory() as tmpdir:
+        stdout_file = f"{tmpdir}/{job_id}.stdout"
+        stderr_file = f"{tmpdir}/{job_id}.stderr"
+        with open(stdout_file, "w+b") as outfl, open(stderr_file, "w+b") as errfl:
+            with subprocess.Popen(
+                    jobs.command(the_job), stdout=outfl,
+                    stderr=errfl) as process:
+                while process.poll() is None:
+                    update_status(redis_conn, job_id, "running")
+                    update_stdout(redis_conn, job_id, outfl.read1())
+                    sleep(1)
+
+            update_status(redis_conn, job_id, "completed")
+            with open(stdout_file, "rb") as outfl, open(stderr_file, "rb") as errfl:
+                set_stdout(redis_conn, job_id, outfl.read())
+                update_stderr(redis_conn, job_id, errfl.read())
+
+            os.remove(stdout_file)
+            os.remove(stderr_file)
+
+    returncode = process.returncode
+    set_meta(redis_conn, job_id, "completion-status",
+             ("success" if returncode == 0 else "error"))
+    set_meta(redis_conn, job_id, "return-code", returncode)
+    return process.returncode
+
+def run_job_parser(parent_parser):
+    parser = parent_parser.add_parser(
+        "run-job",
+        help="run job with given id")
+    parser.add_argument(
+        "job_id", type=UUID4, help="A string representing a UUID4 value.")
+    parser.set_defaults(
+        run=lambda conn, args, parser: run_job(conn, args.job_id))
+
+def add_subparsers(parent_parser, *subparser_fns):
+    sub_parsers = parent_parser.add_subparsers(
+        title="subcommands", description="valid subcommands", required=True)
+    for parser_fn in subparser_fns:
+        parser_fn(sub_parsers)
+        pass
+
+    return parent_parser
+
+def parse_cli_args():
+    parser = add_subparsers(argparse.ArgumentParser(
+        description=sys.modules[__name__].__doc__.strip()), run_job_parser)
+    parser.add_argument(
+        "--redis-uri", required=True,
+        help=(
+            "URI to use to connect to job management db."
+            "The URI should be of the form "
+            "'<scheme>://<user>:<passwd>@<host>:<port>/<path>'"),
+        type=urlparse)
+    return parser, parser.parse_args()
+
+def launch_manager():
+    parser, args = parse_cli_args()
+    with redis_connection(args.redis_uri) as conn:
+        try:
+            return args.run(conn, args, parser)
+        except Exception as nsj:
+            prev_msg = (
+                conn.hget(f"{jobs.JOBS_NAMESPACE}:manager", key="stderr") or "")
+            if bool(prev_msg):
+                prev_msg = f"{prev_msg}\n"
+
+            notfoundmsg = (
+                f"{prev_msg}"
+                f"{datetime.now().isoformat()}: {type(nsj).__name__}: {traceback.format_exc()}")
+            conn.hset(
+                f"{jobs.JOBS_NAMESPACE}:manager",
+                key="stderr",
+                value=notfoundmsg)
+
+if __name__ == "__main__":
+    def run():
+        sys.exit(launch_manager())
+    run()