diff options
Diffstat (limited to 'gn2/scripts')
-rw-r--r-- | gn2/scripts/__init__.py | 0 | ||||
-rw-r--r-- | gn2/scripts/corr_compute.py | 77 | ||||
-rw-r--r-- | gn2/scripts/parse_corr_html_results_to_csv.py | 175 | ||||
-rw-r--r-- | gn2/scripts/profile_corrs.py | 67 | ||||
-rw-r--r-- | gn2/scripts/run_external.py | 159 |
5 files changed, 478 insertions, 0 deletions
diff --git a/gn2/scripts/__init__.py b/gn2/scripts/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/gn2/scripts/__init__.py diff --git a/gn2/scripts/corr_compute.py b/gn2/scripts/corr_compute.py new file mode 100644 index 00000000..44dcac68 --- /dev/null +++ b/gn2/scripts/corr_compute.py @@ -0,0 +1,77 @@ +"""Compute the correlations.""" + +import sys +import json +import pickle +import pathlib +import datetime + +from flask import g + +from gn2.wqflask import app +from gn2.wqflask.user_session import UserSession +from gn2.wqflask.correlation.show_corr_results import set_template_vars +from gn2.wqflask.correlation.correlation_gn3_api import compute_correlation + +class UserSessionSimulator(): + + def __init__(self, user_id): + self._user_id = user_id + + @property + def user_id(self): + return self._user_id + +error_types = { + "WrongCorrelationType": "Wrong Correlation Type", + "CalledProcessError": "Called Process Error" +} + +def e_time(): + return datetime.datetime.utcnow().isoformat() + +def compute(form): + import subprocess + try: + correlation_results = compute_correlation(form, compute_all=True) + except Exception as exc: + return { + "error-type": error_types[type(exc).__name__], + "error-message": exc.args[0] + } + + return set_template_vars(form, correlation_results) + +if __name__ == "__main__": + ARGS_COUNT = 3 + if len(sys.argv) < ARGS_COUNT: + print(f"{e_time()}: You need to pass the file with the picked form", + file=sys.stderr) + sys.exit(1) + + if len(sys.argv) > ARGS_COUNT: + print(f"{e_time()}: Unknown arguments {sys.argv[ARGS_COUNT:]}", + file=sys.stderr) + sys.exit(1) + + filepath = pathlib.Path(sys.argv[1]) + if not filepath.exists(): + print(f"File not found '{filepath}'", file=sys.stderr) + sys.exit(2) + + with open(filepath, "rb") as pfile: + form = pickle.Unpickler(pfile).load() + + with app.app_context(): + g.user_session = UserSessionSimulator(sys.argv[2]) + results = compute(form) + + print(json.dumps(results), file=sys.stdout) + + if "error-type" in results: + print( + f"{results['error-type']}: {results['error-message']}", + file=sys.stderr) + sys.exit(3) + + sys.exit(0) diff --git a/gn2/scripts/parse_corr_html_results_to_csv.py b/gn2/scripts/parse_corr_html_results_to_csv.py new file mode 100644 index 00000000..464eb2ca --- /dev/null +++ b/gn2/scripts/parse_corr_html_results_to_csv.py @@ -0,0 +1,175 @@ +import csv +import sys +import json +from pathlib import Path +from functools import reduce, partial +from typing import Any, Union, Sequence, Optional +from argparse import Action, Namespace, ArgumentError, ArgumentParser + +from lxml import etree + +def thread(value, *functions): + return reduce(lambda result, func: func(result), functions, value) + +def parse_file(filename: Path): + with open(filename, encoding="utf-8") as inpfl: + raw_html = inpfl.read() + + return etree.HTML(raw_html) + +def first_row_headers(table): + return tuple( + " ".join(text.strip() for text in cell.xpath(".//child::text()")) + for cell in table.xpath("./tbody/tr[1]/td")) + +def results_table(tables): + found = tuple(filter( + lambda table: ( + first_row_headers(table)[0:4] == + ("Index", "Record ID", "Symbol", "Description")), + tables)) + return found[0] + +def table_contents(table): + return tuple( + tuple(" ".join(text.strip() for text in cell.xpath(".//child::text()")) + for cell in row) + for row in table.xpath("./tbody/tr")) + + +def to_dicts(contents): + frow = contents[0] + return tuple(dict(zip(frow, row)) for row in contents[1:]) + +def write_csv( + input_file: Path, output_dir: Union[bool, Path], + contents: Sequence[dict]) -> Sequence[Sequence[str]]: + def __write__(stream): + writer = csv.DictWriter( + stream, fieldnames=list(contents[0].keys()), + dialect=csv.unix_dialect) + writer.writeheader() + writer.writerows(contents) + + if not bool(output_dir): + return __write__(sys.stdout) + + output_file = output_dir.joinpath( + f"{input_file.stem}__results.csv") + with open(output_file, "w", encoding="utf-8") as out_file: + return __write__(out_file) + +def output_stream(): + if not to_output_file: + return sys.stdout + + output_file = input_file.parent.joinpath( + f"{input_file.stem}.csv") + with open(output_file) as out_file: + yield out_file + +class FileCheck(Action): + """Action class to check existence of a given file path.""" + + def __init__(self, option_strings, dest, **kwargs): + "Initialise the FileCheck action class" + super().__init__(option_strings, dest, **kwargs) + + def __call__(# pylint: disable=[signature-differs] + self, parser: ArgumentParser, namespace: Namespace, + values: Union[str, Sequence[Any], None], + option_string: Optional[str] = "") -> None: + """Check existence of a given file path and set it, or raise an + exception.""" + the_path = str(values or "") + the_file = Path(the_path).absolute().resolve() + if not the_file.is_file(): + raise ArgumentError( + self, + f"The file '{values}' does not exist or is a folder/directory.") + + setattr(namespace, self.dest, the_file) + +class DirectoryCheck(Action): + """Action class to check the existence of a particular directory""" + def __init__(self, option_strings, dest, **kwargs): + """Init `DirectoryCheck` action object.""" + super().__init__(option_strings, dest, **kwargs) + + def __call__( + self, parser: ArgumentParser, namespace: Namespace, + values: Union[str, Sequence[Any], None], + option_string: Optional[str] = "") -> None: + the_dir = Path(str(values or "")).absolute().resolve() + if not the_dir.is_dir(): + raise ArgumentError( + self, f"The directory '{the_dir}' does not exist!") + + setattr(namespace, self.dest, the_dir) + +def gn1_parser(subparsers) -> None: + parser = subparsers.add_parser("gn1") + parser.add_argument( + "inputfile", help="The HTML file to parse", action=FileCheck) + parser.add_argument( + "--outputdir", help="Path to output directory", action=DirectoryCheck, + default=False) + parser.set_defaults( + func=lambda args: thread( + args.inputfile, + parse_file, + lambda tree: tree.xpath("//table"), + results_table, + table_contents, + to_dicts, + partial(write_csv, args.inputfile, args.outputdir))) + +def tablejson_script(scripts): + for script in scripts: + script_content = thread( + script.xpath('.//child::text()'), + lambda val: "".join(val).strip()) + if script_content.find("var tableJson") >= 0: + return json.loads(thread( + script_content, + lambda val: val[len("var tableJson = "):].strip())) + continue + return None + +def gn2_parser(subparsers) -> None: + parser = subparsers.add_parser("gn2") + parser.add_argument( + "inputfile", help="The HTML file to parse", action=FileCheck) + parser.add_argument( + "--outputdir", help="Path to output directory", action=DirectoryCheck, + default=False) + parser.set_defaults( + func=lambda args: thread( + args.inputfile, + parse_file, + lambda tree: tree.xpath("//script"), + tablejson_script, + partial(write_csv, args.inputfile, args.outputdir) + )) + +def parse_cli_args(): + parser = ArgumentParser( + "parse_corr_html_results_to_csv", + description = "Parse correlation results from the given HTML file.") + subparsers = parser.add_subparsers( + title="subcommands", description="Valid subcommands", + help="additional help") + gn1_parser(subparsers) + gn2_parser(subparsers) + + return parser, parser.parse_args() + +def run(): + parser, args = parse_cli_args() + try: + args.func(args) + except AttributeError as _attr_err: + parser.print_help() + +if __name__ == "__main__": + run() diff --git a/gn2/scripts/profile_corrs.py b/gn2/scripts/profile_corrs.py new file mode 100644 index 00000000..5f9a06da --- /dev/null +++ b/gn2/scripts/profile_corrs.py @@ -0,0 +1,67 @@ +import io +import sys +import pstats +import cProfile + +from flask import g, request + +from gn2.utility.startup_config import app_config + +from gn2.wqflask import app +from gn2.wqflask.user_session import UserSession +from gn2.wqflask.correlation.correlation_gn3_api import compute_correlation + +def sample_vals(): + return '{"C57BL/6J":"10.835","DBA/2J":"11.142","B6D2F1":"11.126","D2B6F1":"11.143","BXD1":"10.811","BXD2":"11.503","BXD5":"10.766","BXD6":"10.986","BXD8":"11.050","BXD9":"10.822","BXD11":"10.670","BXD12":"10.946","BXD13":"10.890","BXD14":"x","BXD15":"10.884","BXD16":"11.222","BXD18":"x","BXD19":"10.968","BXD20":"10.962","BXD21":"10.906","BXD22":"11.080","BXD23":"11.046","BXD24":"11.146","BXD24a":"x","BXD25":"x","BXD27":"11.078","BXD28":"11.034","BXD29":"10.808","BXD30":"x","BXD31":"11.087","BXD32":"11.029","BXD33":"10.662","BXD34":"11.482","BXD35":"x","BXD36":"x","BXD37":"x","BXD38":"10.836","BXD39":"10.926","BXD40":"10.638","BXD41":"x","BXD42":"10.974","BXD43":"10.828","BXD44":"10.900","BXD45":"11.358","BXD48":"11.042","BXD48a":"10.975","BXD49":"x","BXD50":"11.228","BXD51":"11.126","BXD52":"x","BXD53":"x","BXD54":"x","BXD55":"11.580","BXD56":"x","BXD59":"x","BXD60":"10.829","BXD61":"11.152","BXD62":"11.156","BXD63":"10.942","BXD64":"10.506","BXD65":"11.126","BXD65a":"11.272","BXD65b":"11.157","BXD66":"11.071","BXD67":"11.080","BXD68":"10.997","BXD69":"11.096","BXD70":"11.152","BXD71":"x","BXD72":"x","BXD73":"11.262","BXD73a":"11.444","BXD73b":"x","BXD74":"10.974","BXD75":"11.150","BXD76":"10.920","BXD77":"10.928","BXD78":"x","BXD79":"11.371","BXD81":"x","BXD83":"10.946","BXD84":"11.181","BXD85":"10.992","BXD86":"10.770","BXD87":"11.200","BXD88":"x","BXD89":"10.930","BXD90":"11.183","BXD91":"x","BXD93":"11.056","BXD94":"10.737","BXD95":"x","BXD98":"10.986","BXD99":"10.892","BXD100":"x","BXD101":"x","BXD102":"x","BXD104":"x","BXD105":"x","BXD106":"x","BXD107":"x","BXD108":"x","BXD109":"x","BXD110":"x","BXD111":"x","BXD112":"x","BXD113":"x","BXD114":"x","BXD115":"x","BXD116":"x","BXD117":"x","BXD119":"x","BXD120":"x","BXD121":"x","BXD122":"x","BXD123":"x","BXD124":"x","BXD125":"x","BXD126":"x","BXD127":"x","BXD128":"x","BXD128a":"x","BXD130":"x","BXD131":"x","BXD132":"x","BXD133":"x","BXD134":"x","BXD135":"x","BXD136":"x","BXD137":"x","BXD138":"x","BXD139":"x","BXD141":"x","BXD142":"x","BXD144":"x","BXD145":"x","BXD146":"x","BXD147":"x","BXD148":"x","BXD149":"x","BXD150":"x","BXD151":"x","BXD152":"x","BXD153":"x","BXD154":"x","BXD155":"x","BXD156":"x","BXD157":"x","BXD160":"x","BXD161":"x","BXD162":"x","BXD165":"x","BXD168":"x","BXD169":"x","BXD170":"x","BXD171":"x","BXD172":"x","BXD173":"x","BXD174":"x","BXD175":"x","BXD176":"x","BXD177":"x","BXD178":"x","BXD180":"x","BXD181":"x","BXD183":"x","BXD184":"x","BXD186":"x","BXD187":"x","BXD188":"x","BXD189":"x","BXD190":"x","BXD191":"x","BXD192":"x","BXD193":"x","BXD194":"x","BXD195":"x","BXD196":"x","BXD197":"x","BXD198":"x","BXD199":"x","BXD200":"x","BXD201":"x","BXD202":"x","BXD203":"x","BXD204":"x","BXD205":"x","BXD206":"x","BXD207":"x","BXD208":"x","BXD209":"x","BXD210":"x","BXD211":"x","BXD212":"x","BXD213":"x","BXD214":"x","BXD215":"x","BXD216":"x","BXD217":"x","BXD218":"x","BXD219":"x","BXD220":"x"}' + +def simulated_form(corr_type: str = "sample"): + assert corr_type in ("sample", "tissue", "lit") + return { + "dataset": "HC_M2_0606_P", + "trait_id": "1435464_at", + "corr_dataset": "HC_M2_0606_P", + "corr_sample_method": "pearson", + "corr_return_results": "100", + "corr_samples_group": "samples_primary", + "sample_vals": sample_vals(), + "corr_type": corr_type, + "corr_sample_method": "pearson", + "location_type": "gene", + "corr_return_results": "20000" + } + +def profile_corrs(): + "Profile tho correlations" + profiler = cProfile.Profile() + profiler.enable() + correlation_results = compute_correlation(request.form, compute_all=True) + profiler.disable() + return profiler + +def dump_stats(profiler): + iostr = io.StringIO() + sort_by = pstats.SortKey.CUMULATIVE + ps = pstats.Stats(profiler, stream=iostr).sort_stats(sort_by) + ps.print_stats() + + cli_args = sys.argv + if len(cli_args) > 1: + with open(cli_args[1], "w+", encoding="utf-8") as output_file: + print(iostr.getvalue(), file=output_file) + + return 0 + + print(iostr.getvalue()) + return 0 + + +if __name__ == "__main__": + def main(): + "Entry point for profiler script" + return dump_stats(profile_corrs()) + + app_config() + with app.app_context(): + with app.test_request_context("/corr_compute", data=simulated_form()): + g.user_session = UserSession() + main() diff --git a/gn2/scripts/run_external.py b/gn2/scripts/run_external.py new file mode 100644 index 00000000..297d17a1 --- /dev/null +++ b/gn2/scripts/run_external.py @@ -0,0 +1,159 @@ +""" +Run jobs in external processes. +""" + +import os +import sys +import shlex +import argparse +import traceback +import subprocess +from uuid import UUID +from time import sleep +from datetime import datetime +from urllib.parse import urlparse +from tempfile import TemporaryDirectory + +# import psutil +from redis import Redis + +import gn2.jobs.jobs as jobs + +def print_help(args, parser): + print(parser.format_help()) + +def UUID4(val): + return UUID(val) + +def redis_connection(parsed_url): + return Redis.from_url( + f"redis://{parsed_url.netloc}{parsed_url.path}", decode_responses=True) + +def update_status(redis_conn: Redis, job_id: UUID, value: str): + "Update the job's status." + redis_conn.hset(jobs.job_namespace(job_id), key="status", value=value) + +def __update_stdout_stderr__( + redis_conn: Redis, job_id: UUID, bytes_read: bytes, stream: str): + job = jobs.job(redis_conn, job_id) + if job.is_nothing(): + raise jobs.NoSuchJob(job_id) + + job = job.maybe({}, lambda x: x) + redis_conn.hset( + jobs.job_namespace(job_id), key=stream, + value=(job.get(stream, "") + bytes_read.decode("utf-8"))) + +def set_stdout(redis_conn: Redis, job_id:UUID, bytes_read: bytes): + """Set the stdout value for the given job.""" + job = jobs.job(redis_conn, job_id) + if job.is_nothing(): + raise jobs.NoSuchJob(job_id) + + job = job.maybe({}, lambda x: x) + redis_conn.hset( + jobs.job_namespace(job_id), key="stdout", + value=bytes_read.decode("utf-8")) + +def update_stdout(redis_conn: Redis, job_id:UUID, bytes_read: bytes): + """Update the stdout value for the given job.""" + __update_stdout_stderr__(redis_conn, job_id, bytes_read, "stdout") + +def update_stderr(redis_conn: Redis, job_id:UUID, bytes_read: bytes): + """Update the stderr value for the given job.""" + __update_stdout_stderr__(redis_conn, job_id, bytes_read, "stderr") + +def set_meta(redis_conn: Redis, job_id: UUID, meta_key: str, meta_val: str): + job = jobs.job(redis_conn, job_id) + if job.is_nothing(): + raise jobs.NoSuchJob(job_id) + + redis_conn.hset(jobs.job_namespace(job_id), key=meta_key, value=meta_val) + +def run_job(redis_conn: Redis, job_id: UUID): + """Run the job in an external process.""" + print(f"THE ARGUMENTS TO RUN_JOB:\n\tConnection: {redis_conn}\n\tJob ID: {job_id}\n") + + the_job = jobs.job(redis_conn, job_id) + if the_job.is_nothing(): + raise jobs.NoSuchJob(job_id) + + with TemporaryDirectory() as tmpdir: + stdout_file = f"{tmpdir}/{job_id}.stdout" + stderr_file = f"{tmpdir}/{job_id}.stderr" + with open(stdout_file, "w+b") as outfl, open(stderr_file, "w+b") as errfl: + with subprocess.Popen( + jobs.command(the_job), stdout=outfl, + stderr=errfl) as process: + while process.poll() is None: + update_status(redis_conn, job_id, "running") + update_stdout(redis_conn, job_id, outfl.read1()) + sleep(1) + + update_status(redis_conn, job_id, "completed") + with open(stdout_file, "rb") as outfl, open(stderr_file, "rb") as errfl: + set_stdout(redis_conn, job_id, outfl.read()) + update_stderr(redis_conn, job_id, errfl.read()) + + os.remove(stdout_file) + os.remove(stderr_file) + + returncode = process.returncode + set_meta(redis_conn, job_id, "completion-status", + ("success" if returncode == 0 else "error")) + set_meta(redis_conn, job_id, "return-code", returncode) + return process.returncode + +def run_job_parser(parent_parser): + parser = parent_parser.add_parser( + "run-job", + help="run job with given id") + parser.add_argument( + "job_id", type=UUID4, help="A string representing a UUID4 value.") + parser.set_defaults( + run=lambda conn, args, parser: run_job(conn, args.job_id)) + +def add_subparsers(parent_parser, *subparser_fns): + sub_parsers = parent_parser.add_subparsers( + title="subcommands", description="valid subcommands", required=True) + for parser_fn in subparser_fns: + parser_fn(sub_parsers) + pass + + return parent_parser + +def parse_cli_args(): + parser = add_subparsers(argparse.ArgumentParser( + description=sys.modules[__name__].__doc__.strip()), run_job_parser) + parser.add_argument( + "--redis-uri", required=True, + help=( + "URI to use to connect to job management db." + "The URI should be of the form " + "'<scheme>://<user>:<passwd>@<host>:<port>/<path>'"), + type=urlparse) + return parser, parser.parse_args() + +def launch_manager(): + parser, args = parse_cli_args() + with redis_connection(args.redis_uri) as conn: + try: + return args.run(conn, args, parser) + except Exception as nsj: + prev_msg = ( + conn.hget(f"{jobs.JOBS_NAMESPACE}:manager", key="stderr") or "") + if bool(prev_msg): + prev_msg = f"{prev_msg}\n" + + notfoundmsg = ( + f"{prev_msg}" + f"{datetime.now().isoformat()}: {type(nsj).__name__}: {traceback.format_exc()}") + conn.hset( + f"{jobs.JOBS_NAMESPACE}:manager", + key="stderr", + value=notfoundmsg) + +if __name__ == "__main__": + def run(): + sys.exit(launch_manager()) + run() |