From 47f032329f967440f44889ee856f1a8510fb773c Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Mon, 5 Dec 2022 12:17:59 +0300 Subject: scripts: add GN2 results parsing to parser script * wqflask/scripts/parse_corr_gn1_results_to_csv.py: Rename script * wqflask/scripts/parse_corr_html_results_to_csv.py: * Use argparse to parse CLI arguments * Add parsing for GN2 results --- wqflask/scripts/parse_corr_gn1_results_to_csv.py | 86 ----------- wqflask/scripts/parse_corr_html_results_to_csv.py | 172 ++++++++++++++++++++++ 2 files changed, 172 insertions(+), 86 deletions(-) delete mode 100644 wqflask/scripts/parse_corr_gn1_results_to_csv.py create mode 100644 wqflask/scripts/parse_corr_html_results_to_csv.py (limited to 'wqflask') diff --git a/wqflask/scripts/parse_corr_gn1_results_to_csv.py b/wqflask/scripts/parse_corr_gn1_results_to_csv.py deleted file mode 100644 index b04a0378..00000000 --- a/wqflask/scripts/parse_corr_gn1_results_to_csv.py +++ /dev/null @@ -1,86 +0,0 @@ -import csv -import sys -from functools import reduce, partial -import pathlib - -from lxml import etree - -def thread(value, *functions): - return reduce(lambda result, func: func(result), functions, value) - -def parse_file(filename: pathlib.Path): - with open(filename, encoding="utf-8") as inpfl: - raw_html = inpfl.read() - - return etree.HTML(raw_html) - -def first_row_headers(table): - return tuple( - " ".join(text.strip() for text in cell.xpath(".//child::text()")) - for cell in table.xpath("./tbody/tr[1]/td")) - - -def results_table(tables): - found = tuple(filter( - lambda table: ( - first_row_headers(table)[0:4] == - ("Index", "Record ID", "Symbol", "Description")), - tables)) - # print(f"Found {len(found)} with the expected first row") - return found[0] - -def table_contents(table): - return tuple( - tuple(" ".join(text.strip() for text in cell.xpath(".//child::text()")) - for cell in row) - for row in table.xpath("./tbody/tr")) - - -def to_dicts(contents): - frow = contents[0] - return tuple(dict(zip(frow, row)) for row in contents[1:]) - -def write_csv(input_file, to_output_file, contents): - def __write__(stream): - writer = csv.DictWriter( - stream, fieldnames=list(contents[0].keys()), - dialect=csv.unix_dialect) - writer.writeheader() - writer.writerows(contents) - - if not to_output_file: - return __write__(sys.stdout) - - output_file = input_file.parent.joinpath( - f"{input_file.stem}__results.csv") - with open(output_file, "w", encoding="utf-8") as out_file: - return __write__(out_file) - -def output_stream(): - if not to_output_file: - return sys.stdout - - output_file = input_file.parent.joinpath( - f"{input_file.stem}.csv") - with open(output_file) as out_file: - yield out_file - -def run(): - if len(sys.argv) != 3: - print("Usage: python3 test.py ") - sys.exit(1) - - _this_file, input_file, to_output_file = sys.argv - input_file = pathlib.Path(input_file).absolute() - thread( - input_file, - parse_file, - lambda tree: tree.xpath("//table"), - results_table, - table_contents, - to_dicts, - partial(write_csv, input_file, to_output_file == "Y") - ) - -if __name__ == "__main__": - run() diff --git a/wqflask/scripts/parse_corr_html_results_to_csv.py b/wqflask/scripts/parse_corr_html_results_to_csv.py new file mode 100644 index 00000000..c54d99ca --- /dev/null +++ b/wqflask/scripts/parse_corr_html_results_to_csv.py @@ -0,0 +1,172 @@ +import csv +import sys +import json +from pathlib import Path +from functools import reduce, partial +from typing import Any, Union, Sequence, Optional +from argparse import Action, Namespace, ArgumentError, ArgumentParser + +from lxml import etree + +def thread(value, *functions): + return reduce(lambda result, func: func(result), functions, value) + +def parse_file(filename: Path): + with open(filename, encoding="utf-8") as inpfl: + raw_html = inpfl.read() + + return etree.HTML(raw_html) + +def first_row_headers(table): + return tuple( + " ".join(text.strip() for text in cell.xpath(".//child::text()")) + for cell in table.xpath("./tbody/tr[1]/td")) + +def results_table(tables): + found = tuple(filter( + lambda table: ( + first_row_headers(table)[0:4] == + ("Index", "Record ID", "Symbol", "Description")), + tables)) + return found[0] + +def table_contents(table): + return tuple( + tuple(" ".join(text.strip() for text in cell.xpath(".//child::text()")) + for cell in row) + for row in table.xpath("./tbody/tr")) + + +def to_dicts(contents): + frow = contents[0] + return tuple(dict(zip(frow, row)) for row in contents[1:]) + +def write_csv( + input_file: Path, output_dir: Union[bool, Path], + contents: Sequence[dict]) -> Sequence[Sequence[str]]: + def __write__(stream): + writer = csv.DictWriter( + stream, fieldnames=list(contents[0].keys()), + dialect=csv.unix_dialect) + writer.writeheader() + writer.writerows(contents) + + if not bool(output_dir): + return __write__(sys.stdout) + + output_file = output_dir.joinpath( + f"{input_file.stem}__results.csv") + with open(output_file, "w", encoding="utf-8") as out_file: + return __write__(out_file) + +def output_stream(): + if not to_output_file: + return sys.stdout + + output_file = input_file.parent.joinpath( + f"{input_file.stem}.csv") + with open(output_file) as out_file: + yield out_file + +class FileCheck(Action): + """Action class to check existence of a given file path.""" + + def __init__(self, option_strings, dest, **kwargs): + "Initialise the FileCheck action class" + super().__init__(option_strings, dest, **kwargs) + + def __call__(# pylint: disable=[signature-differs] + self, parser: ArgumentParser, namespace: Namespace, + values: Union[str, Sequence[Any], None], + option_string: Optional[str] = "") -> None: + """Check existence of a given file path and set it, or raise an + exception.""" + the_path = str(values or "") + the_file = Path(the_path).absolute().resolve() + if not the_file.is_file(): + raise ArgumentError( + self, + f"The file '{values}' does not exist or is a folder/directory.") + + setattr(namespace, self.dest, the_file) + +class DirectoryCheck(Action): + """Action class to check the existence of a particular directory""" + def __init__(self, option_strings, dest, **kwargs): + """Init `DirectoryCheck` action object.""" + super().__init__(option_strings, dest, **kwargs) + + def __call__( + self, parser: ArgumentParser, namespace: Namespace, + values: Union[str, Sequence[Any], None], + option_string: Optional[str] = "") -> None: + the_dir = Path(str(values or "")).absolute().resolve() + if not the_dir.is_dir(): + raise ArgumentError( + self, f"The directory '{the_dir}' does not exist!") + + setattr(namespace, self.dest, the_dir) + +def gn1_parser(subparsers) -> None: + parser = subparsers.add_parser("gn1") + parser.add_argument( + "inputfile", help="The HTML file to parse", action=FileCheck) + parser.add_argument( + "--outputdir", help="Path to output directory", action=DirectoryCheck, + default=False) + parser.set_defaults( + func=lambda args: thread( + args.inputfile, + parse_file, + lambda tree: tree.xpath("//table"), + results_table, + table_contents, + to_dicts, + partial(write_csv, args.inputfile, args.outputdir))) + +def tablejson_script(scripts): + for script in scripts: + script_content = thread( + script.xpath('.//child::text()'), + lambda val: "".join(val).strip()) + if script_content.find("var tableJson") >= 0: + return json.loads(thread( + script_content, + lambda val: val[len("var tableJson = "):].strip())) + continue + return None + +def gn2_parser(subparsers) -> None: + parser = subparsers.add_parser("gn2") + parser.add_argument( + "inputfile", help="The HTML file to parse", action=FileCheck) + parser.add_argument( + "--outputdir", help="Path to output directory", action=DirectoryCheck, + default=False) + parser.set_defaults( + func=lambda args: thread( + args.inputfile, + parse_file, + lambda tree: tree.xpath("//script"), + tablejson_script, + partial(write_csv, args.inputfile, args.outputdir) + )) + +def parse_cli_args(): + parser = ArgumentParser( + "parse_corr_html_results_to_csv", + description = "Parse correlation results from the given HTML file.") + subparsers = parser.add_subparsers( + title="subcommands", description="Valid subcommands", + help="additional help") + gn1_parser(subparsers) + gn2_parser(subparsers) + + return parser.parse_args() + +def run(): + args = parse_cli_args() + args.func(args) + +if __name__ == "__main__": + run() -- cgit v1.2.3