"""Implements the command-line interface for the qc application""" import os import sys import mimetypes from typing import Union, Callable from argparse import ArgumentParser from gn_libs.mysqldb import database_connection from functional_tools import take from quality_control.utils import make_progress_calculator from quality_control.errors import InvalidValue, DuplicateHeading from quality_control.parsing import FileType, strain_names, collect_errors from .cli_parser import init_cli_parser def is_file_mime(filepath:str, mimetype:str) -> bool: """Check that `filepath` has a mimetype of `mimetype` or `text/plain`""" the_type = mimetypes.guess_type(filepath)[0] return the_type in ("text/plain", mimetype) def add_file_validation_arguments(parser: ArgumentParser) -> ArgumentParser: """File validation specific CLI arguments.""" parser.add_argument( "filetype", help="The type of file to check", choices=("average", "standard-error")) parser.add_argument( "filepath", help=( "The path to the file to be checked." "If an absolute path is not provided, then the file will be relative to" f"\t'{os.getcwd()}'")) parser.add_argument( "-c", "--count", type=int, help=( "Number of errors to display. " "A negative number means display all errors."), default=20) parser.add_argument( "-v", "--verbose", help="Controls whether to show extra output", default=False, action="store_true") return parser def cli_argument_parser(): """Create the parser for the CLI arguments""" theparser = init_cli_parser( "qc", "Command-Line Interface program for quality control of data files") theparser.add_argument("speciesid", type=int, help="ID of the species.") return add_file_validation_arguments(theparser) def make_progress_indicator( verbose: bool, progress_calc_fn: Callable) -> Union[Callable, None]: """Utility to display the progress""" if not verbose: return None def __indicator__(linenumber, linetext): msg = progress_calc_fn(linenumber, linetext) print(f"LINE: {msg.currentline} ({msg.percent:.2f}%)", end="\r") return msg return __indicator__ def print_errors(errors, verbose): """Print out the errors""" errors_exist = False starter = "\n" if verbose else "" print(f"{starter}line(s)\tcolumn(s)\terrors") for error in errors: cols = ( error.column if isinstance(error, InvalidValue) else (", ".join(str(col) for col in error.columns) if isinstance(error, DuplicateHeading) else "-")) errors_exist = True print(f"{starter}{error.line}\t{cols}\t{error.message}") if not errors_exist: print("No errors were found!") return errors def check(filepath, filetype, strains, count, verbose=False): """Check the file and print out results""" updater = make_progress_indicator( verbose, make_progress_calculator(os.stat(filepath).st_size)) if count > 0: return print_errors( take(collect_errors(filepath, filetype, strains, updater), count), verbose) return print_errors( collect_errors(filepath, filetype, strains, updater), verbose) def main(): """Entry point function""" argparser = cli_argument_parser() args = argparser.parse_args() if not os.path.exists(args.filepath): print(f"The file '{args.filepath}' does not exist.", file=sys.stderr) return 1 if not is_file_mime(args.filepath, "text/tab-separated-values"): print( f"The file '{args.filepath}' MUST be a tab-separated file.", file=sys.stderr) return 3 with database_connection(args.databaseuri) as dbconn: strains = strain_names(dbconn, args.speciesid) filepath = os.path.realpath(args.filepath) if args.verbose: print(f"Checking '{filepath}' for errors") return check( filepath, ( FileType.AVERAGE if args.filetype == "average" else FileType.STANDARD_ERROR), strains, args.count, verbose=args.verbose) if __name__ == "__main__": main()