aboutsummaryrefslogtreecommitdiff
"""Implements the command-line interface for the qc application"""
import os
import sys
import mimetypes
from typing import Union, Callable
from argparse import ArgumentParser

from functional_tools import take

from quality_control.utils import make_progress_calculator
from quality_control.errors import InvalidValue, DuplicateHeading
from quality_control.parsing import FileType, strain_names, collect_errors

from qc_app.db_utils import database_connection

from .cli_parser import init_cli_parser


def is_file_mime(filepath:str, mimetype:str) -> bool:
    """Check that `filepath` has a mimetype of `mimetype` or `text/plain`"""
    the_type = mimetypes.guess_type(filepath)[0]
    return the_type in ("text/plain", mimetype)

def add_file_validation_arguments(parser: ArgumentParser) -> ArgumentParser:
    """File validation specific CLI arguments."""
    parser.add_argument(
        "filetype",
        help="The type of file to check",
        choices=("average", "standard-error"))
    parser.add_argument(
        "filepath",
        help=(
            "The path to the file to be checked."
            "If an absolute path is not provided, then the file will be relative to"
            f"\t'{os.getcwd()}'"))

    parser.add_argument(
        "-c", "--count", type=int,
        help=(
            "Number of errors to display. "
            "A negative number means display all errors."),
        default=20)

    parser.add_argument(
        "-v", "--verbose",
        help="Controls whether to show extra output",
        default=False, action="store_true")
    return parser

def cli_argument_parser():
    """Create the parser for the CLI arguments"""
    theparser = init_cli_parser(
        "qc",
        "Command-Line Interface program for quality control of data files")
    theparser.add_argument("speciesid", type=int, help="ID of the species.")
    return add_file_validation_arguments(theparser)

def make_progress_indicator(
        verbose: bool, progress_calc_fn: Callable) -> Union[Callable, None]:
    """Utility to display the progress"""
    if not verbose:
        return None

    def __indicator__(linenumber, linetext):
        msg = progress_calc_fn(linenumber, linetext)
        print(f"LINE: {msg.currentline} ({msg.percent:.2f}%)", end="\r")
        return msg

    return __indicator__

def print_errors(errors, verbose):
    """Print out the errors"""
    errors_exist = False
    starter = "\n" if verbose else ""
    print(f"{starter}line(s)\tcolumn(s)\terrors")
    for error in errors:
        cols = (
            error.column if isinstance(error, InvalidValue)
            else (", ".join(str(col) for col in error.columns)
                  if isinstance(error, DuplicateHeading)
                  else "-"))
        errors_exist = True
        print(f"{starter}{error.line}\t{cols}\t{error.message}")

    if not errors_exist:
        print("No errors were found!")

    return errors

def check(filepath, filetype, strains, count, verbose=False):
    """Check the file and print out results"""
    updater = make_progress_indicator(
        verbose, make_progress_calculator(os.stat(filepath).st_size))

    if count > 0:
        return print_errors(
            take(collect_errors(filepath, filetype, strains, updater), count),
            verbose)
    return print_errors(
        collect_errors(filepath, filetype, strains, updater), verbose)

def main():
    """Entry point function"""
    argparser = cli_argument_parser()
    args = argparser.parse_args()
    if not os.path.exists(args.filepath):
        print(f"The file '{args.filepath}' does not exist.", file=sys.stderr)
        return 1

    if not is_file_mime(args.filepath, "text/tab-separated-values"):
        print(
            f"The file '{args.filepath}' MUST be a tab-separated file.",
            file=sys.stderr)
        return 3

    with database_connection(args.databaseuri) as dbconn:
        strains = strain_names(dbconn, args.speciesid)

    filepath = os.path.realpath(args.filepath)
    if args.verbose:
        print(f"Checking '{filepath}' for errors")

    return check(
        filepath, (
            FileType.AVERAGE if args.filetype == "average"
            else FileType.STANDARD_ERROR),
        strains, args.count, verbose=args.verbose)

if __name__ == "__main__":
    main()