import csv import sys import json from pathlib import Path from functools import reduce, partial from typing import Any, Union, Sequence, Optional from argparse import Action, Namespace, ArgumentError, ArgumentParser from lxml import etree def thread(value, *functions): return reduce(lambda result, func: func(result), functions, value) def parse_file(filename: Path): with open(filename, encoding="utf-8") as inpfl: raw_html = inpfl.read() return etree.HTML(raw_html) def first_row_headers(table): return tuple( " ".join(text.strip() for text in cell.xpath(".//child::text()")) for cell in table.xpath("./tbody/tr[1]/td")) def results_table(tables): found = tuple(filter( lambda table: ( first_row_headers(table)[0:4] == ("Index", "Record ID", "Symbol", "Description")), tables)) return found[0] def table_contents(table): return tuple( tuple(" ".join(text.strip() for text in cell.xpath(".//child::text()")) for cell in row) for row in table.xpath("./tbody/tr")) def to_dicts(contents): frow = contents[0] return tuple(dict(zip(frow, row)) for row in contents[1:]) def write_csv( input_file: Path, output_dir: Union[bool, Path], contents: Sequence[dict]) -> Sequence[Sequence[str]]: def __write__(stream): writer = csv.DictWriter( stream, fieldnames=list(contents[0].keys()), dialect=csv.unix_dialect) writer.writeheader() writer.writerows(contents) if not bool(output_dir): return __write__(sys.stdout) output_file = output_dir.joinpath( f"{input_file.stem}__results.csv") with open(output_file, "w", encoding="utf-8") as out_file: return __write__(out_file) def output_stream(): if not to_output_file: return sys.stdout output_file = input_file.parent.joinpath( f"{input_file.stem}.csv") with open(output_file) as out_file: yield out_file class FileCheck(Action): """Action class to check existence of a given file path.""" def __init__(self, option_strings, dest, **kwargs): "Initialise the FileCheck action class" super().__init__(option_strings, dest, **kwargs) def __call__(# pylint: disable=[signature-differs] self, parser: ArgumentParser, namespace: Namespace, values: Union[str, Sequence[Any], None], option_string: Optional[str] = "") -> None: """Check existence of a given file path and set it, or raise an exception.""" the_path = str(values or "") the_file = Path(the_path).absolute().resolve() if not the_file.is_file(): raise ArgumentError( self, f"The file '{values}' does not exist or is a folder/directory.") setattr(namespace, self.dest, the_file) class DirectoryCheck(Action): """Action class to check the existence of a particular directory""" def __init__(self, option_strings, dest, **kwargs): """Init `DirectoryCheck` action object.""" super().__init__(option_strings, dest, **kwargs) def __call__( self, parser: ArgumentParser, namespace: Namespace, values: Union[str, Sequence[Any], None], option_string: Optional[str] = "") -> None: the_dir = Path(str(values or "")).absolute().resolve() if not the_dir.is_dir(): raise ArgumentError( self, f"The directory '{the_dir}' does not exist!") setattr(namespace, self.dest, the_dir) def gn1_parser(subparsers) -> None: parser = subparsers.add_parser("gn1") parser.add_argument( "inputfile", help="The HTML file to parse", action=FileCheck) parser.add_argument( "--outputdir", help="Path to output directory", action=DirectoryCheck, default=False) parser.set_defaults( func=lambda args: thread( args.inputfile, parse_file, lambda tree: tree.xpath("//table"), results_table, table_contents, to_dicts, partial(write_csv, args.inputfile, args.outputdir))) def tablejson_script(scripts): for script in scripts: script_content = thread( script.xpath('.//child::text()'), lambda val: "".join(val).strip()) if script_content.find("var tableJson") >= 0: return json.loads(thread( script_content, lambda val: val[len("var tableJson = "):].strip())) continue return None def gn2_parser(subparsers) -> None: parser = subparsers.add_parser("gn2") parser.add_argument( "inputfile", help="The HTML file to parse", action=FileCheck) parser.add_argument( "--outputdir", help="Path to output directory", action=DirectoryCheck, default=False) parser.set_defaults( func=lambda args: thread( args.inputfile, parse_file, lambda tree: tree.xpath("//script"), tablejson_script, partial(write_csv, args.inputfile, args.outputdir) )) def parse_cli_args(): parser = ArgumentParser( "parse_corr_html_results_to_csv", description = "Parse correlation results from the given HTML file.") subparsers = parser.add_subparsers( title="subcommands", description="Valid subcommands", help="additional help") gn1_parser(subparsers) gn2_parser(subparsers) return parser, parser.parse_args() def run(): parser, args = parse_cli_args() try: args.func(args) except AttributeError as _attr_err: parser.print_help() if __name__ == "__main__": run()