"""Implements the command-line interface for the qc application"""
import os
import sys
import mimetypes
from typing import Union, Callable
from argparse import ArgumentParser
from gn_libs.mysqldb import database_connection
from functional_tools import take
from quality_control.utils import make_progress_calculator
from quality_control.errors import InvalidValue, DuplicateHeading
from quality_control.parsing import FileType, strain_names, collect_errors
from .cli_parser import init_cli_parser
def is_file_mime(filepath:str, mimetype:str) -> bool:
"""Check that `filepath` has a mimetype of `mimetype` or `text/plain`"""
the_type = mimetypes.guess_type(filepath)[0]
return the_type in ("text/plain", mimetype)
def add_file_validation_arguments(parser: ArgumentParser) -> ArgumentParser:
"""File validation specific CLI arguments."""
parser.add_argument(
"filetype",
help="The type of file to check",
choices=("average", "standard-error"))
parser.add_argument(
"filepath",
help=(
"The path to the file to be checked."
"If an absolute path is not provided, then the file will be relative to"
f"\t'{os.getcwd()}'"))
parser.add_argument(
"-c", "--count", type=int,
help=(
"Number of errors to display. "
"A negative number means display all errors."),
default=20)
parser.add_argument(
"-v", "--verbose",
help="Controls whether to show extra output",
default=False, action="store_true")
return parser
def cli_argument_parser():
"""Create the parser for the CLI arguments"""
theparser = init_cli_parser(
"qc",
"Command-Line Interface program for quality control of data files")
theparser.add_argument("speciesid", type=int, help="ID of the species.")
return add_file_validation_arguments(theparser)
def make_progress_indicator(
verbose: bool, progress_calc_fn: Callable) -> Union[Callable, None]:
"""Utility to display the progress"""
if not verbose:
return None
def __indicator__(linenumber, linetext):
msg = progress_calc_fn(linenumber, linetext)
print(f"LINE: {msg.currentline} ({msg.percent:.2f}%)", end="\r")
return msg
return __indicator__
def print_errors(errors, verbose):
"""Print out the errors"""
errors_exist = False
starter = "\n" if verbose else ""
print(f"{starter}line(s)\tcolumn(s)\terrors")
for error in errors:
cols = (
error.column if isinstance(error, InvalidValue)
else (", ".join(str(col) for col in error.columns)
if isinstance(error, DuplicateHeading)
else "-"))
errors_exist = True
print(f"{starter}{error.line}\t{cols}\t{error.message}")
if not errors_exist:
print("No errors were found!")
return errors
def check(filepath, filetype, strains, count, verbose=False):
"""Check the file and print out results"""
updater = make_progress_indicator(
verbose, make_progress_calculator(os.stat(filepath).st_size))
if count > 0:
return print_errors(
take(collect_errors(filepath, filetype, strains, updater), count),
verbose)
return print_errors(
collect_errors(filepath, filetype, strains, updater), verbose)
def main():
"""Entry point function"""
argparser = cli_argument_parser()
args = argparser.parse_args()
if not os.path.exists(args.filepath):
print(f"The file '{args.filepath}' does not exist.", file=sys.stderr)
return 1
if not is_file_mime(args.filepath, "text/tab-separated-values"):
print(
f"The file '{args.filepath}' MUST be a tab-separated file.",
file=sys.stderr)
return 3
with database_connection(args.databaseuri) as dbconn:
strains = strain_names(dbconn, args.speciesid)
filepath = os.path.realpath(args.filepath)
if args.verbose:
print(f"Checking '{filepath}' for errors")
return check(
filepath, (
FileType.AVERAGE if args.filetype == "average"
else FileType.STANDARD_ERROR),
strains, args.count, verbose=args.verbose)
if __name__ == "__main__":
main()