aboutsummaryrefslogtreecommitdiff
path: root/gn2/scripts
diff options
context:
space:
mode:
Diffstat (limited to 'gn2/scripts')
-rw-r--r--gn2/scripts/__init__.py0
-rw-r--r--gn2/scripts/corr_compute.py77
-rw-r--r--gn2/scripts/parse_corr_html_results_to_csv.py175
-rw-r--r--gn2/scripts/profile_corrs.py67
-rw-r--r--gn2/scripts/run_external.py159
5 files changed, 478 insertions, 0 deletions
diff --git a/gn2/scripts/__init__.py b/gn2/scripts/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/gn2/scripts/__init__.py
diff --git a/gn2/scripts/corr_compute.py b/gn2/scripts/corr_compute.py
new file mode 100644
index 00000000..44dcac68
--- /dev/null
+++ b/gn2/scripts/corr_compute.py
@@ -0,0 +1,77 @@
+"""Compute the correlations."""
+
+import sys
+import json
+import pickle
+import pathlib
+import datetime
+
+from flask import g
+
+from gn2.wqflask import app
+from gn2.wqflask.user_session import UserSession
+from gn2.wqflask.correlation.show_corr_results import set_template_vars
+from gn2.wqflask.correlation.correlation_gn3_api import compute_correlation
+
+class UserSessionSimulator():
+
+ def __init__(self, user_id):
+ self._user_id = user_id
+
+ @property
+ def user_id(self):
+ return self._user_id
+
+error_types = {
+ "WrongCorrelationType": "Wrong Correlation Type",
+ "CalledProcessError": "Called Process Error"
+}
+
+def e_time():
+ return datetime.datetime.utcnow().isoformat()
+
+def compute(form):
+ import subprocess
+ try:
+ correlation_results = compute_correlation(form, compute_all=True)
+ except Exception as exc:
+ return {
+ "error-type": error_types[type(exc).__name__],
+ "error-message": exc.args[0]
+ }
+
+ return set_template_vars(form, correlation_results)
+
+if __name__ == "__main__":
+ ARGS_COUNT = 3
+ if len(sys.argv) < ARGS_COUNT:
+ print(f"{e_time()}: You need to pass the file with the picked form",
+ file=sys.stderr)
+ sys.exit(1)
+
+ if len(sys.argv) > ARGS_COUNT:
+ print(f"{e_time()}: Unknown arguments {sys.argv[ARGS_COUNT:]}",
+ file=sys.stderr)
+ sys.exit(1)
+
+ filepath = pathlib.Path(sys.argv[1])
+ if not filepath.exists():
+ print(f"File not found '{filepath}'", file=sys.stderr)
+ sys.exit(2)
+
+ with open(filepath, "rb") as pfile:
+ form = pickle.Unpickler(pfile).load()
+
+ with app.app_context():
+ g.user_session = UserSessionSimulator(sys.argv[2])
+ results = compute(form)
+
+ print(json.dumps(results), file=sys.stdout)
+
+ if "error-type" in results:
+ print(
+ f"{results['error-type']}: {results['error-message']}",
+ file=sys.stderr)
+ sys.exit(3)
+
+ sys.exit(0)
diff --git a/gn2/scripts/parse_corr_html_results_to_csv.py b/gn2/scripts/parse_corr_html_results_to_csv.py
new file mode 100644
index 00000000..464eb2ca
--- /dev/null
+++ b/gn2/scripts/parse_corr_html_results_to_csv.py
@@ -0,0 +1,175 @@
+import csv
+import sys
+import json
+from pathlib import Path
+from functools import reduce, partial
+from typing import Any, Union, Sequence, Optional
+from argparse import Action, Namespace, ArgumentError, ArgumentParser
+
+from lxml import etree
+
+def thread(value, *functions):
+ return reduce(lambda result, func: func(result), functions, value)
+
+def parse_file(filename: Path):
+ with open(filename, encoding="utf-8") as inpfl:
+ raw_html = inpfl.read()
+
+ return etree.HTML(raw_html)
+
+def first_row_headers(table):
+ return tuple(
+ " ".join(text.strip() for text in cell.xpath(".//child::text()"))
+ for cell in table.xpath("./tbody/tr[1]/td"))
+
+def results_table(tables):
+ found = tuple(filter(
+ lambda table: (
+ first_row_headers(table)[0:4] ==
+ ("Index", "Record ID", "Symbol", "Description")),
+ tables))
+ return found[0]
+
+def table_contents(table):
+ return tuple(
+ tuple(" ".join(text.strip() for text in cell.xpath(".//child::text()"))
+ for cell in row)
+ for row in table.xpath("./tbody/tr"))
+
+
+def to_dicts(contents):
+ frow = contents[0]
+ return tuple(dict(zip(frow, row)) for row in contents[1:])
+
+def write_csv(
+ input_file: Path, output_dir: Union[bool, Path],
+ contents: Sequence[dict]) -> Sequence[Sequence[str]]:
+ def __write__(stream):
+ writer = csv.DictWriter(
+ stream, fieldnames=list(contents[0].keys()),
+ dialect=csv.unix_dialect)
+ writer.writeheader()
+ writer.writerows(contents)
+
+ if not bool(output_dir):
+ return __write__(sys.stdout)
+
+ output_file = output_dir.joinpath(
+ f"{input_file.stem}__results.csv")
+ with open(output_file, "w", encoding="utf-8") as out_file:
+ return __write__(out_file)
+
+def output_stream():
+ if not to_output_file:
+ return sys.stdout
+
+ output_file = input_file.parent.joinpath(
+ f"{input_file.stem}.csv")
+ with open(output_file) as out_file:
+ yield out_file
+
+class FileCheck(Action):
+ """Action class to check existence of a given file path."""
+
+ def __init__(self, option_strings, dest, **kwargs):
+ "Initialise the FileCheck action class"
+ super().__init__(option_strings, dest, **kwargs)
+
+ def __call__(# pylint: disable=[signature-differs]
+ self, parser: ArgumentParser, namespace: Namespace,
+ values: Union[str, Sequence[Any], None],
+ option_string: Optional[str] = "") -> None:
+ """Check existence of a given file path and set it, or raise an
+ exception."""
+ the_path = str(values or "")
+ the_file = Path(the_path).absolute().resolve()
+ if not the_file.is_file():
+ raise ArgumentError(
+ self,
+ f"The file '{values}' does not exist or is a folder/directory.")
+
+ setattr(namespace, self.dest, the_file)
+
+class DirectoryCheck(Action):
+ """Action class to check the existence of a particular directory"""
+ def __init__(self, option_strings, dest, **kwargs):
+ """Init `DirectoryCheck` action object."""
+ super().__init__(option_strings, dest, **kwargs)
+
+ def __call__(
+ self, parser: ArgumentParser, namespace: Namespace,
+ values: Union[str, Sequence[Any], None],
+ option_string: Optional[str] = "") -> None:
+ the_dir = Path(str(values or "")).absolute().resolve()
+ if not the_dir.is_dir():
+ raise ArgumentError(
+ self, f"The directory '{the_dir}' does not exist!")
+
+ setattr(namespace, self.dest, the_dir)
+
+def gn1_parser(subparsers) -> None:
+ parser = subparsers.add_parser("gn1")
+ parser.add_argument(
+ "inputfile", help="The HTML file to parse", action=FileCheck)
+ parser.add_argument(
+ "--outputdir", help="Path to output directory", action=DirectoryCheck,
+ default=False)
+ parser.set_defaults(
+ func=lambda args: thread(
+ args.inputfile,
+ parse_file,
+ lambda tree: tree.xpath("//table"),
+ results_table,
+ table_contents,
+ to_dicts,
+ partial(write_csv, args.inputfile, args.outputdir)))
+
+def tablejson_script(scripts):
+ for script in scripts:
+ script_content = thread(
+ script.xpath('.//child::text()'),
+ lambda val: "".join(val).strip())
+ if script_content.find("var tableJson") >= 0:
+ return json.loads(thread(
+ script_content,
+ lambda val: val[len("var tableJson = "):].strip()))
+ continue
+ return None
+
+def gn2_parser(subparsers) -> None:
+ parser = subparsers.add_parser("gn2")
+ parser.add_argument(
+ "inputfile", help="The HTML file to parse", action=FileCheck)
+ parser.add_argument(
+ "--outputdir", help="Path to output directory", action=DirectoryCheck,
+ default=False)
+ parser.set_defaults(
+ func=lambda args: thread(
+ args.inputfile,
+ parse_file,
+ lambda tree: tree.xpath("//script"),
+ tablejson_script,
+ partial(write_csv, args.inputfile, args.outputdir)
+ ))
+
+def parse_cli_args():
+ parser = ArgumentParser(
+ "parse_corr_html_results_to_csv",
+ description = "Parse correlation results from the given HTML file.")
+ subparsers = parser.add_subparsers(
+ title="subcommands", description="Valid subcommands",
+ help="additional help")
+ gn1_parser(subparsers)
+ gn2_parser(subparsers)
+
+ return parser, parser.parse_args()
+
+def run():
+ parser, args = parse_cli_args()
+ try:
+ args.func(args)
+ except AttributeError as _attr_err:
+ parser.print_help()
+
+if __name__ == "__main__":
+ run()
diff --git a/gn2/scripts/profile_corrs.py b/gn2/scripts/profile_corrs.py
new file mode 100644
index 00000000..5f9a06da
--- /dev/null
+++ b/gn2/scripts/profile_corrs.py
@@ -0,0 +1,67 @@
+import io
+import sys
+import pstats
+import cProfile
+
+from flask import g, request
+
+from gn2.utility.startup_config import app_config
+
+from gn2.wqflask import app
+from gn2.wqflask.user_session import UserSession
+from gn2.wqflask.correlation.correlation_gn3_api import compute_correlation
+
+def sample_vals():
+ return '{"C57BL/6J":"10.835","DBA/2J":"11.142","B6D2F1":"11.126","D2B6F1":"11.143","BXD1":"10.811","BXD2":"11.503","BXD5":"10.766","BXD6":"10.986","BXD8":"11.050","BXD9":"10.822","BXD11":"10.670","BXD12":"10.946","BXD13":"10.890","BXD14":"x","BXD15":"10.884","BXD16":"11.222","BXD18":"x","BXD19":"10.968","BXD20":"10.962","BXD21":"10.906","BXD22":"11.080","BXD23":"11.046","BXD24":"11.146","BXD24a":"x","BXD25":"x","BXD27":"11.078","BXD28":"11.034","BXD29":"10.808","BXD30":"x","BXD31":"11.087","BXD32":"11.029","BXD33":"10.662","BXD34":"11.482","BXD35":"x","BXD36":"x","BXD37":"x","BXD38":"10.836","BXD39":"10.926","BXD40":"10.638","BXD41":"x","BXD42":"10.974","BXD43":"10.828","BXD44":"10.900","BXD45":"11.358","BXD48":"11.042","BXD48a":"10.975","BXD49":"x","BXD50":"11.228","BXD51":"11.126","BXD52":"x","BXD53":"x","BXD54":"x","BXD55":"11.580","BXD56":"x","BXD59":"x","BXD60":"10.829","BXD61":"11.152","BXD62":"11.156","BXD63":"10.942","BXD64":"10.506","BXD65":"11.126","BXD65a":"11.272","BXD65b":"11.157","BXD66":"11.071","BXD67":"11.080","BXD68":"10.997","BXD69":"11.096","BXD70":"11.152","BXD71":"x","BXD72":"x","BXD73":"11.262","BXD73a":"11.444","BXD73b":"x","BXD74":"10.974","BXD75":"11.150","BXD76":"10.920","BXD77":"10.928","BXD78":"x","BXD79":"11.371","BXD81":"x","BXD83":"10.946","BXD84":"11.181","BXD85":"10.992","BXD86":"10.770","BXD87":"11.200","BXD88":"x","BXD89":"10.930","BXD90":"11.183","BXD91":"x","BXD93":"11.056","BXD94":"10.737","BXD95":"x","BXD98":"10.986","BXD99":"10.892","BXD100":"x","BXD101":"x","BXD102":"x","BXD104":"x","BXD105":"x","BXD106":"x","BXD107":"x","BXD108":"x","BXD109":"x","BXD110":"x","BXD111":"x","BXD112":"x","BXD113":"x","BXD114":"x","BXD115":"x","BXD116":"x","BXD117":"x","BXD119":"x","BXD120":"x","BXD121":"x","BXD122":"x","BXD123":"x","BXD124":"x","BXD125":"x","BXD126":"x","BXD127":"x","BXD128":"x","BXD128a":"x","BXD130":"x","BXD131":"x","BXD132":"x","BXD133":"x","BXD134":"x","BXD135":"x","BXD136":"x","BXD137":"x","BXD138":"x","BXD139":"x","BXD141":"x","BXD142":"x","BXD144":"x","BXD145":"x","BXD146":"x","BXD147":"x","BXD148":"x","BXD149":"x","BXD150":"x","BXD151":"x","BXD152":"x","BXD153":"x","BXD154":"x","BXD155":"x","BXD156":"x","BXD157":"x","BXD160":"x","BXD161":"x","BXD162":"x","BXD165":"x","BXD168":"x","BXD169":"x","BXD170":"x","BXD171":"x","BXD172":"x","BXD173":"x","BXD174":"x","BXD175":"x","BXD176":"x","BXD177":"x","BXD178":"x","BXD180":"x","BXD181":"x","BXD183":"x","BXD184":"x","BXD186":"x","BXD187":"x","BXD188":"x","BXD189":"x","BXD190":"x","BXD191":"x","BXD192":"x","BXD193":"x","BXD194":"x","BXD195":"x","BXD196":"x","BXD197":"x","BXD198":"x","BXD199":"x","BXD200":"x","BXD201":"x","BXD202":"x","BXD203":"x","BXD204":"x","BXD205":"x","BXD206":"x","BXD207":"x","BXD208":"x","BXD209":"x","BXD210":"x","BXD211":"x","BXD212":"x","BXD213":"x","BXD214":"x","BXD215":"x","BXD216":"x","BXD217":"x","BXD218":"x","BXD219":"x","BXD220":"x"}'
+
+def simulated_form(corr_type: str = "sample"):
+ assert corr_type in ("sample", "tissue", "lit")
+ return {
+ "dataset": "HC_M2_0606_P",
+ "trait_id": "1435464_at",
+ "corr_dataset": "HC_M2_0606_P",
+ "corr_sample_method": "pearson",
+ "corr_return_results": "100",
+ "corr_samples_group": "samples_primary",
+ "sample_vals": sample_vals(),
+ "corr_type": corr_type,
+ "corr_sample_method": "pearson",
+ "location_type": "gene",
+ "corr_return_results": "20000"
+ }
+
+def profile_corrs():
+ "Profile tho correlations"
+ profiler = cProfile.Profile()
+ profiler.enable()
+ correlation_results = compute_correlation(request.form, compute_all=True)
+ profiler.disable()
+ return profiler
+
+def dump_stats(profiler):
+ iostr = io.StringIO()
+ sort_by = pstats.SortKey.CUMULATIVE
+ ps = pstats.Stats(profiler, stream=iostr).sort_stats(sort_by)
+ ps.print_stats()
+
+ cli_args = sys.argv
+ if len(cli_args) > 1:
+ with open(cli_args[1], "w+", encoding="utf-8") as output_file:
+ print(iostr.getvalue(), file=output_file)
+
+ return 0
+
+ print(iostr.getvalue())
+ return 0
+
+
+if __name__ == "__main__":
+ def main():
+ "Entry point for profiler script"
+ return dump_stats(profile_corrs())
+
+ app_config()
+ with app.app_context():
+ with app.test_request_context("/corr_compute", data=simulated_form()):
+ g.user_session = UserSession()
+ main()
diff --git a/gn2/scripts/run_external.py b/gn2/scripts/run_external.py
new file mode 100644
index 00000000..297d17a1
--- /dev/null
+++ b/gn2/scripts/run_external.py
@@ -0,0 +1,159 @@
+"""
+Run jobs in external processes.
+"""
+
+import os
+import sys
+import shlex
+import argparse
+import traceback
+import subprocess
+from uuid import UUID
+from time import sleep
+from datetime import datetime
+from urllib.parse import urlparse
+from tempfile import TemporaryDirectory
+
+# import psutil
+from redis import Redis
+
+import gn2.jobs.jobs as jobs
+
+def print_help(args, parser):
+ print(parser.format_help())
+
+def UUID4(val):
+ return UUID(val)
+
+def redis_connection(parsed_url):
+ return Redis.from_url(
+ f"redis://{parsed_url.netloc}{parsed_url.path}", decode_responses=True)
+
+def update_status(redis_conn: Redis, job_id: UUID, value: str):
+ "Update the job's status."
+ redis_conn.hset(jobs.job_namespace(job_id), key="status", value=value)
+
+def __update_stdout_stderr__(
+ redis_conn: Redis, job_id: UUID, bytes_read: bytes, stream: str):
+ job = jobs.job(redis_conn, job_id)
+ if job.is_nothing():
+ raise jobs.NoSuchJob(job_id)
+
+ job = job.maybe({}, lambda x: x)
+ redis_conn.hset(
+ jobs.job_namespace(job_id), key=stream,
+ value=(job.get(stream, "") + bytes_read.decode("utf-8")))
+
+def set_stdout(redis_conn: Redis, job_id:UUID, bytes_read: bytes):
+ """Set the stdout value for the given job."""
+ job = jobs.job(redis_conn, job_id)
+ if job.is_nothing():
+ raise jobs.NoSuchJob(job_id)
+
+ job = job.maybe({}, lambda x: x)
+ redis_conn.hset(
+ jobs.job_namespace(job_id), key="stdout",
+ value=bytes_read.decode("utf-8"))
+
+def update_stdout(redis_conn: Redis, job_id:UUID, bytes_read: bytes):
+ """Update the stdout value for the given job."""
+ __update_stdout_stderr__(redis_conn, job_id, bytes_read, "stdout")
+
+def update_stderr(redis_conn: Redis, job_id:UUID, bytes_read: bytes):
+ """Update the stderr value for the given job."""
+ __update_stdout_stderr__(redis_conn, job_id, bytes_read, "stderr")
+
+def set_meta(redis_conn: Redis, job_id: UUID, meta_key: str, meta_val: str):
+ job = jobs.job(redis_conn, job_id)
+ if job.is_nothing():
+ raise jobs.NoSuchJob(job_id)
+
+ redis_conn.hset(jobs.job_namespace(job_id), key=meta_key, value=meta_val)
+
+def run_job(redis_conn: Redis, job_id: UUID):
+ """Run the job in an external process."""
+ print(f"THE ARGUMENTS TO RUN_JOB:\n\tConnection: {redis_conn}\n\tJob ID: {job_id}\n")
+
+ the_job = jobs.job(redis_conn, job_id)
+ if the_job.is_nothing():
+ raise jobs.NoSuchJob(job_id)
+
+ with TemporaryDirectory() as tmpdir:
+ stdout_file = f"{tmpdir}/{job_id}.stdout"
+ stderr_file = f"{tmpdir}/{job_id}.stderr"
+ with open(stdout_file, "w+b") as outfl, open(stderr_file, "w+b") as errfl:
+ with subprocess.Popen(
+ jobs.command(the_job), stdout=outfl,
+ stderr=errfl) as process:
+ while process.poll() is None:
+ update_status(redis_conn, job_id, "running")
+ update_stdout(redis_conn, job_id, outfl.read1())
+ sleep(1)
+
+ update_status(redis_conn, job_id, "completed")
+ with open(stdout_file, "rb") as outfl, open(stderr_file, "rb") as errfl:
+ set_stdout(redis_conn, job_id, outfl.read())
+ update_stderr(redis_conn, job_id, errfl.read())
+
+ os.remove(stdout_file)
+ os.remove(stderr_file)
+
+ returncode = process.returncode
+ set_meta(redis_conn, job_id, "completion-status",
+ ("success" if returncode == 0 else "error"))
+ set_meta(redis_conn, job_id, "return-code", returncode)
+ return process.returncode
+
+def run_job_parser(parent_parser):
+ parser = parent_parser.add_parser(
+ "run-job",
+ help="run job with given id")
+ parser.add_argument(
+ "job_id", type=UUID4, help="A string representing a UUID4 value.")
+ parser.set_defaults(
+ run=lambda conn, args, parser: run_job(conn, args.job_id))
+
+def add_subparsers(parent_parser, *subparser_fns):
+ sub_parsers = parent_parser.add_subparsers(
+ title="subcommands", description="valid subcommands", required=True)
+ for parser_fn in subparser_fns:
+ parser_fn(sub_parsers)
+ pass
+
+ return parent_parser
+
+def parse_cli_args():
+ parser = add_subparsers(argparse.ArgumentParser(
+ description=sys.modules[__name__].__doc__.strip()), run_job_parser)
+ parser.add_argument(
+ "--redis-uri", required=True,
+ help=(
+ "URI to use to connect to job management db."
+ "The URI should be of the form "
+ "'<scheme>://<user>:<passwd>@<host>:<port>/<path>'"),
+ type=urlparse)
+ return parser, parser.parse_args()
+
+def launch_manager():
+ parser, args = parse_cli_args()
+ with redis_connection(args.redis_uri) as conn:
+ try:
+ return args.run(conn, args, parser)
+ except Exception as nsj:
+ prev_msg = (
+ conn.hget(f"{jobs.JOBS_NAMESPACE}:manager", key="stderr") or "")
+ if bool(prev_msg):
+ prev_msg = f"{prev_msg}\n"
+
+ notfoundmsg = (
+ f"{prev_msg}"
+ f"{datetime.now().isoformat()}: {type(nsj).__name__}: {traceback.format_exc()}")
+ conn.hset(
+ f"{jobs.JOBS_NAMESPACE}:manager",
+ key="stderr",
+ value=notfoundmsg)
+
+if __name__ == "__main__":
+ def run():
+ sys.exit(launch_manager())
+ run()