1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
|
"""Implements the command-line interface for the qc application"""
import os
import sys
import argparse
import magic
from quality_control.errors import ParseError
from quality_control.parsing import (
FileType,
parse_file,
strain_names,
parse_errors,
parse_strains)
def is_file_mime(filepath, mimetype):
"""Check that `filepath` has a mimetype of `mimetype` or `text/plain`"""
return magic.from_file(filepath, mime=True) in ("text/plain", mimetype)
def cli_argument_parser():
"""Create the parser for the CLI arguments"""
parser = argparse.ArgumentParser(
prog="qc", description = (
"Command-Line Interface program for quality control of data files"))
parser.add_argument(
"filetype",
help="The type of file to check",
choices=("average", "standard-error"))
parser.add_argument(
"filepath",
help=(
"The path to the file to be checked."
"If an absolute path is not provided, then the file will be relative to"
f"\t'{os.getcwd()}'"))
default_strains_file = os.path.join(
os.path.dirname(os.path.dirname(__file__)), "etc/strains.csv")
parser.add_argument(
"-s", "--strainsfile",
help=(
"Path to the file containing allowable strains/samples. "
f"[default '{default_strains_file}']"),
default=default_strains_file)
parser.add_argument(
"-v", "--verbose",
help="Controls whether to show extra output",
default=False, action="store_true")
return parser
def check(filepath, filetype, strains, verbose=False):
"""Check the file and print out results"""
try:
for line_num, line in enumerate(parse_file(
filepath, filetype, strains), start=1):
if verbose:
print(f"Checked line: {line_num}")
print(f"Successfully checked the file. No errors found.")
except ParseError as pe:
print("line\terrors")
for line_num, error in enumerate(
parse_errors(filepath, filetype, strains,
pe.args[0]["line_number"]),
start = pe.args[0]["line_number"] + 1):
print(f"{line_num}\t{' '.join(error['message'])}")
def main():
"""Entry point function"""
argparser = cli_argument_parser()
args = argparser.parse_args()
if not os.path.exists(args.filepath):
print("The file '{args.filepath}' does not exist.", file=sys.stderr)
return 1
if not os.path.exists(args.strainsfile):
print(f"The file '{args.strainsfile}' does not exist.", file=sys.stderr)
return 2
if not is_file_mime(args.filepath, "text/tab-separated-values"):
print(
f"The file '{args.filepath}' MUST be a tab-separated file.",
file=sys.stderr)
return 3
if not is_file_mime(args.strainsfile, "text/csv"):
print(
f"The file '{args.strainsfile}' MUST be a tab-separated file.",
file=sys.stderr)
return 4
if args.verbose:
print(f"Parsing the strain names from '{args.strainsfile}'")
strains = strain_names(parse_strains(os.path.realpath(args.strainsfile)))
filepath = os.path.realpath(args.filepath)
if args.verbose:
print(f"Checking '{filepath}' for errors")
check(
filepath, (
FileType.AVERAGE if args.filetype == "average"
else FileType.STANDARD_ERROR), strains)
if __name__ == "__main__":
main()
|