about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2022-12-05 12:17:59 +0300
committerFrederick Muriuki Muriithi2022-12-05 12:17:59 +0300
commit47f032329f967440f44889ee856f1a8510fb773c (patch)
tree212686d805cb4a3cd4b1fd767f5af92cb38aae46
parent1a651da72faf987f7e3d0c7ba4c60cafe850c7a0 (diff)
downloadgenenetwork2-47f032329f967440f44889ee856f1a8510fb773c.tar.gz
scripts: add GN2 results parsing to parser script
* wqflask/scripts/parse_corr_gn1_results_to_csv.py: Rename script
* wqflask/scripts/parse_corr_html_results_to_csv.py:
  * Use argparse to parse CLI arguments
  * Add parsing for GN2 results
-rw-r--r--wqflask/scripts/parse_corr_gn1_results_to_csv.py86
-rw-r--r--wqflask/scripts/parse_corr_html_results_to_csv.py172
2 files changed, 172 insertions, 86 deletions
diff --git a/wqflask/scripts/parse_corr_gn1_results_to_csv.py b/wqflask/scripts/parse_corr_gn1_results_to_csv.py
deleted file mode 100644
index b04a0378..00000000
--- a/wqflask/scripts/parse_corr_gn1_results_to_csv.py
+++ /dev/null
@@ -1,86 +0,0 @@
-import csv
-import sys
-from functools import reduce, partial
-import pathlib
-
-from lxml import etree
-
-def thread(value, *functions):
-    return reduce(lambda result, func: func(result), functions, value)
-
-def parse_file(filename: pathlib.Path):
-    with open(filename, encoding="utf-8") as inpfl:
-        raw_html = inpfl.read()
-
-    return etree.HTML(raw_html)
-
-def first_row_headers(table):
-    return tuple(
-        " ".join(text.strip() for text in cell.xpath(".//child::text()"))
-        for cell in table.xpath("./tbody/tr[1]/td"))
-    
-
-def results_table(tables):
-    found = tuple(filter(
-        lambda table: (
-            first_row_headers(table)[0:4] ==
-            ("Index", "Record ID", "Symbol", "Description")),
-        tables))
-    # print(f"Found {len(found)} with the expected first row")
-    return found[0]
-
-def table_contents(table):
-    return tuple(
-        tuple(" ".join(text.strip() for text in cell.xpath(".//child::text()"))
-            for cell in row)
-        for row in table.xpath("./tbody/tr"))
-
-
-def to_dicts(contents):
-    frow = contents[0]
-    return tuple(dict(zip(frow, row)) for row in contents[1:])
-
-def write_csv(input_file, to_output_file, contents):
-    def __write__(stream):
-        writer = csv.DictWriter(
-            stream, fieldnames=list(contents[0].keys()),
-            dialect=csv.unix_dialect)
-        writer.writeheader()
-        writer.writerows(contents)
-
-    if not to_output_file:
-        return __write__(sys.stdout)
-
-    output_file = input_file.parent.joinpath(
-        f"{input_file.stem}__results.csv")
-    with open(output_file, "w", encoding="utf-8") as out_file:
-        return __write__(out_file)
-
-def output_stream():
-    if not to_output_file:
-        return sys.stdout
-
-    output_file = input_file.parent.joinpath(
-        f"{input_file.stem}.csv")
-    with open(output_file) as out_file:
-        yield out_file
-
-def run():
-    if len(sys.argv) != 3:
-        print("Usage: python3 test.py <input-file> <to-ouput-file: [Y/n]>")
-        sys.exit(1)
-
-    _this_file, input_file, to_output_file = sys.argv
-    input_file = pathlib.Path(input_file).absolute()
-    thread(
-        input_file,
-        parse_file,
-        lambda tree: tree.xpath("//table"),
-        results_table,
-        table_contents,
-        to_dicts,
-        partial(write_csv, input_file, to_output_file == "Y")
-    )
-
-if __name__ == "__main__":
-    run()
diff --git a/wqflask/scripts/parse_corr_html_results_to_csv.py b/wqflask/scripts/parse_corr_html_results_to_csv.py
new file mode 100644
index 00000000..c54d99ca
--- /dev/null
+++ b/wqflask/scripts/parse_corr_html_results_to_csv.py
@@ -0,0 +1,172 @@
+import csv
+import sys
+import json
+from pathlib import Path
+from functools import reduce, partial
+from typing import Any, Union, Sequence, Optional
+from argparse import Action, Namespace, ArgumentError, ArgumentParser
+
+from lxml import etree
+
+def thread(value, *functions):
+    return reduce(lambda result, func: func(result), functions, value)
+
+def parse_file(filename: Path):
+    with open(filename, encoding="utf-8") as inpfl:
+        raw_html = inpfl.read()
+
+    return etree.HTML(raw_html)
+
+def first_row_headers(table):
+    return tuple(
+        " ".join(text.strip() for text in cell.xpath(".//child::text()"))
+        for cell in table.xpath("./tbody/tr[1]/td"))
+
+def results_table(tables):
+    found = tuple(filter(
+        lambda table: (
+            first_row_headers(table)[0:4] ==
+            ("Index", "Record ID", "Symbol", "Description")),
+        tables))
+    return found[0]
+
+def table_contents(table):
+    return tuple(
+        tuple(" ".join(text.strip() for text in cell.xpath(".//child::text()"))
+            for cell in row)
+        for row in table.xpath("./tbody/tr"))
+
+
+def to_dicts(contents):
+    frow = contents[0]
+    return tuple(dict(zip(frow, row)) for row in contents[1:])
+
+def write_csv(
+        input_file: Path, output_dir: Union[bool, Path],
+        contents: Sequence[dict]) -> Sequence[Sequence[str]]:
+    def __write__(stream):
+        writer = csv.DictWriter(
+            stream, fieldnames=list(contents[0].keys()),
+            dialect=csv.unix_dialect)
+        writer.writeheader()
+        writer.writerows(contents)
+
+    if not bool(output_dir):
+        return __write__(sys.stdout)
+
+    output_file = output_dir.joinpath(
+        f"{input_file.stem}__results.csv")
+    with open(output_file, "w", encoding="utf-8") as out_file:
+        return __write__(out_file)
+
+def output_stream():
+    if not to_output_file:
+        return sys.stdout
+
+    output_file = input_file.parent.joinpath(
+        f"{input_file.stem}.csv")
+    with open(output_file) as out_file:
+        yield out_file
+
+class FileCheck(Action):
+    """Action class to check existence of a given file path."""
+
+    def __init__(self, option_strings, dest, **kwargs):
+        "Initialise the FileCheck action class"
+        super().__init__(option_strings, dest, **kwargs)
+
+    def __call__(# pylint: disable=[signature-differs]
+            self, parser: ArgumentParser, namespace: Namespace,
+            values: Union[str, Sequence[Any], None],
+            option_string: Optional[str] = "") -> None:
+        """Check existence of a given file path and set it, or raise an
+        exception."""
+        the_path = str(values or "")
+        the_file = Path(the_path).absolute().resolve()
+        if not the_file.is_file():
+            raise ArgumentError(
+                self,
+                f"The file '{values}' does not exist or is a folder/directory.")
+
+        setattr(namespace, self.dest, the_file)
+
+class DirectoryCheck(Action):
+    """Action class to check the existence of a particular directory"""
+    def __init__(self, option_strings, dest, **kwargs):
+        """Init `DirectoryCheck` action object."""
+        super().__init__(option_strings, dest, **kwargs)
+
+    def __call__(
+            self, parser: ArgumentParser, namespace: Namespace,
+            values: Union[str, Sequence[Any], None],
+            option_string: Optional[str] = "") -> None:
+        the_dir = Path(str(values or "")).absolute().resolve()
+        if not the_dir.is_dir():
+            raise ArgumentError(
+                self, f"The directory '{the_dir}' does not exist!")
+
+        setattr(namespace, self.dest, the_dir)
+
+def gn1_parser(subparsers) -> None:
+    parser = subparsers.add_parser("gn1")
+    parser.add_argument(
+        "inputfile", help="The HTML file to parse", action=FileCheck)
+    parser.add_argument(
+        "--outputdir", help="Path to output directory", action=DirectoryCheck,
+        default=False)
+    parser.set_defaults(
+        func=lambda args: thread(
+            args.inputfile,
+            parse_file,
+            lambda tree: tree.xpath("//table"),
+            results_table,
+            table_contents,
+            to_dicts,
+            partial(write_csv, args.inputfile, args.outputdir)))
+
+def tablejson_script(scripts):
+    for script in scripts:
+        script_content = thread(
+            script.xpath('.//child::text()'),
+            lambda val: "".join(val).strip())
+        if script_content.find("var tableJson") >= 0:
+            return json.loads(thread(
+                script_content,
+                lambda val: val[len("var tableJson = "):].strip()))
+        continue
+    return None
+
+def gn2_parser(subparsers) -> None:
+    parser = subparsers.add_parser("gn2")
+    parser.add_argument(
+        "inputfile", help="The HTML file to parse", action=FileCheck)
+    parser.add_argument(
+        "--outputdir", help="Path to output directory", action=DirectoryCheck,
+        default=False)
+    parser.set_defaults(
+        func=lambda args: thread(
+            args.inputfile,
+            parse_file,
+            lambda tree: tree.xpath("//script"),
+            tablejson_script,
+            partial(write_csv, args.inputfile, args.outputdir)
+        ))
+
+def parse_cli_args():
+    parser = ArgumentParser(
+        "parse_corr_html_results_to_csv",
+        description = "Parse correlation results from the given HTML file.")
+    subparsers = parser.add_subparsers(
+        title="subcommands", description="Valid subcommands",
+        help="additional help")
+    gn1_parser(subparsers)
+    gn2_parser(subparsers)
+
+    return parser.parse_args()
+
+def run():
+    args = parse_cli_args()
+    args.func(args)
+
+if __name__ == "__main__":
+    run()