diff options
Diffstat (limited to 'quality_control')
-rw-r--r-- | quality_control/average.py | 7 | ||||
-rw-r--r-- | quality_control/checks.py | 5 | ||||
-rw-r--r-- | quality_control/errors.py | 6 | ||||
-rw-r--r-- | quality_control/headers.py | 24 | ||||
-rw-r--r-- | quality_control/parsing.py | 44 | ||||
-rw-r--r-- | quality_control/standard_error.py | 11 |
6 files changed, 55 insertions, 42 deletions
diff --git a/quality_control/average.py b/quality_control/average.py index ad732d0..bf288de 100644 --- a/quality_control/average.py +++ b/quality_control/average.py @@ -4,12 +4,13 @@ from typing import Union from .utils import cell_error from .errors import InvalidValue -def invalid_value(line_number: int, column_number: int, val: str) -> Union[ - InvalidValue, None]: +def invalid_value( + filename: str, line_number: int, column_number: int, val: str) -> Union[ + InvalidValue, None]: """Return an `InvalidValue` object if `val` is not a valid "averages" value.""" return cell_error( - r"^([0-9]+\.[0-9]{3}|[0-9]+\.?0*)$", val, line=line_number, + r"^([0-9]+\.[0-9]{3}|[0-9]+\.?0*)$", val, filename=filename, line=line_number, column=column_number, value=val, message=( f"Invalid value '{val}'. " "Expected string representing a number with exactly three " diff --git a/quality_control/checks.py b/quality_control/checks.py index 475eb9e..bdfd12b 100644 --- a/quality_control/checks.py +++ b/quality_control/checks.py @@ -52,7 +52,8 @@ def decimal_places_pattern(mini: int, maxi: Optional[int] = None) -> re.Pattern: + r")$" ) -def decimal_points_error(lineno: int, +def decimal_points_error(filename: str,# pylint: disable=[too-many-arguments] + lineno: int, field: str, value: str, mini: int, @@ -61,7 +62,7 @@ def decimal_points_error(lineno: int, Check that 'value' in a decimal number with the appropriate decimal places. """ if not bool(decimal_places_pattern(mini, maxi).match(value)): - return InvalidValue(lineno, field, value, ( + return InvalidValue(filename, lineno, field, value, ( f"Invalid value '{value}'. Expected numerical value " + f"with at least {mini} decimal places" + (f" and at most {maxi} decimal places" if maxi is not None else "") diff --git a/quality_control/errors.py b/quality_control/errors.py index fff6c7c..01afa81 100644 --- a/quality_control/errors.py +++ b/quality_control/errors.py @@ -3,10 +3,10 @@ from collections import namedtuple InvalidValue = namedtuple( - "InvalidValue", ("line", "column", "value", "message")) + "InvalidValue", ("filename", "line", "column", "value", "message")) DuplicateHeading = namedtuple( - "DuplicateHeading", ("line", "columns", "heading", "message")) + "DuplicateHeading", ("filename", "line", "columns", "heading", "message")) InconsistentColumns = namedtuple( - "InconsistentColumns", ("line", "header_count", "contents_count", "message")) + "InconsistentColumns", ("filename", "line", "header_count", "contents_count", "message")) diff --git a/quality_control/headers.py b/quality_control/headers.py index f4f4dad..436ea5a 100644 --- a/quality_control/headers.py +++ b/quality_control/headers.py @@ -5,30 +5,34 @@ from typing import Union, Tuple, Sequence from quality_control.errors import InvalidValue, DuplicateHeading -def invalid_header( - line_number: int, headers: Sequence[str]) -> Union[InvalidValue, None]: +def invalid_header(filename: str, + line_number: int, + headers: Sequence[str]) -> Union[InvalidValue, None]: """Return an `InvalidValue` object if the header row has less than 2 items.""" if len(headers) < 2: return InvalidValue( - line_number, 0, "<TAB>".join(headers), + filename, line_number, 0, "<TAB>".join(headers), "The header MUST contain at least 2 columns") return None def invalid_headings( - line_number: int, strains: Sequence[str], + filename: str, line_number: int, strains: Sequence[str], headings: Sequence[str]) -> Union[Tuple[InvalidValue, ...], None]: """Return tuple of `InvalidValue` objects for each error found for every column heading.""" return tuple( - InvalidValue( - line_number, col, header, f"'{header}' not a valid strain.") + InvalidValue(filename, + line_number, + col, + header, + f"'{header}' not a valid strain.") for col, header in enumerate(headings, start=2) if header not in strains) -def duplicate_headings( - line_number: int, - headers: Sequence[str]) -> Tuple[DuplicateHeading, ...]: +def duplicate_headings(filename: str, + line_number: int, + headers: Sequence[str]) -> Tuple[DuplicateHeading, ...]: """Return a tuple of `DuplicateHeading` objects for each column heading that is a duplicate of another column heading.""" def __update_columns__(acc, item): @@ -42,7 +46,7 @@ def duplicate_headings( } return tuple( DuplicateHeading( - line_number, columns, heading, ( + filename, line_number, columns, heading, ( f"Heading '{heading}', is repeated in columns " f"{','.join(str(i) for i in columns)}")) for heading, columns in repeated.items()) diff --git a/quality_control/parsing.py b/quality_control/parsing.py index f7a664f..f1d21fc 100644 --- a/quality_control/parsing.py +++ b/quality_control/parsing.py @@ -1,6 +1,7 @@ """Module handling the high-level parsing of the files""" import collections from enum import Enum +from pathlib import Path from functools import partial from typing import Tuple, Union, Generator, Callable, Optional @@ -30,44 +31,44 @@ def strain_names(dbconn: mdb.Connection, speciesid: int) -> tuple[str, ...]: lambda item: bool(item.strip() if item is not None else item), (name for names in samplenames for name in names)))) -def header_errors(line_number, fields, strains): +def header_errors(filename, line_number, fields, strains): """Gather all header row errors.""" return ( - (invalid_header(line_number, fields),) + - invalid_headings(line_number, strains, fields[1:]) + - duplicate_headings(line_number, fields)) + (invalid_header(filename, line_number, fields),) + + invalid_headings(filename, line_number, strains, fields[1:]) + + duplicate_headings(filename, line_number, fields)) -def empty_value(line_number, column_number, value): +def empty_value(filename, line_number, column_number, value): """Check for empty field values.""" if value == "": - return InvalidValue( - line_number, column_number, value, "Empty value for column") + return InvalidValue(filename, line_number, column_number, value, + "Empty value for column") return None -def average_errors(line_number, fields): +def average_errors(filename, line_number, fields): """Gather all errors for a line in a averages file.""" return ( - (empty_value(line_number, 1, fields[0]),) + + (empty_value(filename, line_number, 1, fields[0]),) + tuple( - avg.invalid_value(line_number, *field) + avg.invalid_value(filename, line_number, *field) for field in enumerate(fields[1:], start=2))) -def se_errors(line_number, fields): +def se_errors(filename, line_number, fields): """Gather all errors for a line in a standard-errors file.""" return ( - (empty_value(line_number, 1, fields[0]),) + + (empty_value(filename, line_number, 1, fields[0]),) + tuple( - se.invalid_value(line_number, *field) + se.invalid_value(filename, line_number, *field) for field in enumerate(fields[1:], start=2))) -def make_column_consistency_checker(header_row): +def make_column_consistency_checker(filename, header_row): """Build function to check for column consistency""" headers = tuple(field.strip() for field in header_row.split("\t")) def __checker__(line_number, contents_row): contents = tuple(field.strip() for field in contents_row.split("\t")) if len(contents) != len(headers): return InconsistentColumns( - line_number, len(headers), len(contents), + filename, line_number, len(headers), len(contents), (f"Header row has {len(headers)} columns while row " f"{line_number} has {len(contents)} columns")) return None @@ -79,8 +80,10 @@ def collect_errors( user_aborted: Callable = lambda: False) -> Generator: """Run checks against file and collect all the errors""" errors:Tuple[Union[InvalidValue, DuplicateHeading], ...] = tuple() - def __process_errors__(line_number, line, error_checker_fn, errors = tuple()): + def __process_errors__( + filename, line_number, line, error_checker_fn, errors = tuple()): errs = error_checker_fn( + filename, line_number, tuple(field.strip() for field in line.split("\t"))) if errs is None: @@ -90,6 +93,7 @@ def collect_errors( return errors + (errs,) with open_file(filepath) as input_file: + filename = Path(filepath).name for line_number, line in enumerate(input_file, start=1): if user_aborted(): break @@ -98,9 +102,11 @@ def collect_errors( line = line.decode("utf-8") if line_number == 1: - consistent_columns_checker = make_column_consistency_checker(line) + consistent_columns_checker = make_column_consistency_checker( + filename, line) for error in __process_errors__( - line_number, line, partial(header_errors, strains=strains), + filename, line_number, line, + partial(header_errors, strains=strains), errors): yield error @@ -110,7 +116,7 @@ def collect_errors( yield col_consistency_error for error in __process_errors__( - line_number, line, ( + filename, line_number, line, ( average_errors if filetype == FileType.AVERAGE else se_errors), errors): diff --git a/quality_control/standard_error.py b/quality_control/standard_error.py index 90beb8a..00b1ac6 100644 --- a/quality_control/standard_error.py +++ b/quality_control/standard_error.py @@ -4,17 +4,18 @@ from typing import Union from .utils import cell_error from .errors import InvalidValue -def invalid_value( - line_number: int, column_number: int, val: str) -> Union[ - InvalidValue, None]: +def invalid_value(filename: str, + line_number: int, + column_number: int, + val: str) -> Union[InvalidValue, None]: """ Returns a `quality_control.errors.InvalidValue` object in the case where `val` is not a valid input for standard error files, otherwise, it returns `None`. """ return cell_error( - r"^([0-9]+\.[0-9]{6,}|[0-9]+\.?0*)$", val, line=line_number, - column=column_number, value=val, message=( + r"^([0-9]+\.[0-9]{6,}|[0-9]+\.?0*)$", val, filename=filename, + line=line_number, column=column_number, value=val, message=( f"Invalid value '{val}'. " "Expected string representing a number with at least six " "decimal places.")) |