diff options
-rw-r--r-- | quality_control/average.py | 12 | ||||
-rw-r--r-- | quality_control/errors.py | 23 | ||||
-rw-r--r-- | quality_control/headers.py | 33 | ||||
-rw-r--r-- | quality_control/parsing.py | 141 | ||||
-rw-r--r-- | quality_control/standard_error.py | 15 | ||||
-rw-r--r-- | tests/qc/test_cells.py | 56 | ||||
-rw-r--r-- | tests/qc/test_error_collection.py | 42 | ||||
-rw-r--r-- | tests/qc/test_header.py | 45 | ||||
-rw-r--r-- | tests/qc/test_parsing.py | 53 |
9 files changed, 64 insertions, 356 deletions
diff --git a/quality_control/average.py b/quality_control/average.py index 9ca16a9..47a04d9 100644 --- a/quality_control/average.py +++ b/quality_control/average.py @@ -3,19 +3,11 @@ import re from typing import Union from .errors import InvalidValue -from .errors import InvalidCellValue - -def valid_value(val): - """Checks whether `val` is a valid value for averages""" - if re.search(r"^[0-9]+\.[0-9]{3}$", val): - return float(val) - raise InvalidCellValue( - f"Invalid value '{val}'. " - "Expected string representing a number with exactly three decimal " - "places.") def invalid_value(line_number: int, column_number: int, val: str) -> Union[ InvalidValue, None]: + """Return an `InvalidValue` object if `val` is not a valid "averages" + value.""" if re.search(r"^[0-9]+\.[0-9]{3}$", val): return None return InvalidValue( diff --git a/quality_control/errors.py b/quality_control/errors.py index 4206b5b..678fe09 100644 --- a/quality_control/errors.py +++ b/quality_control/errors.py @@ -2,29 +2,6 @@ from collections import namedtuple -class InvalidCellValue(Exception): - """Raised when a function encounters an invalid value""" - - def __init__(self, *args): - super().__init__(*args) - -class InvalidHeaderValue(Exception): - """Raised when a header contains values not in the reference file.""" - - def __init__(self, *args): - super().__init__(*args) - -class DuplicateHeader(Exception): - """Raised when a header contains 2 similar headers.""" - - def __init__(self, *args): - super().__init__(*args) - -class ParseError(Exception): - """Raised if any of the above exceptions are raised""" - def __init__(self, *args): - super().__init__(*args) - InvalidValue = namedtuple( "InvalidValue", ("line", "column", "value", "message")) diff --git a/quality_control/headers.py b/quality_control/headers.py index 3b1e0e6..79d7e43 100644 --- a/quality_control/headers.py +++ b/quality_control/headers.py @@ -4,41 +4,22 @@ from functools import reduce from typing import Union, Tuple, Sequence from quality_control.errors import InvalidValue, DuplicateHeading -from quality_control.errors import DuplicateHeader, InvalidHeaderValue - -def valid_header(strains, headers): - "Return the valid headers with reference to strains or throw an error" - if not bool(headers[1:]): - raise InvalidHeaderValue( - "The header MUST contain at least 2 columns") - invalid_headers = tuple( - header for header in headers[1:] if header not in strains) - if invalid_headers: - raise InvalidHeaderValue( - *(f"'{header}' not a valid strain." for header in invalid_headers)) - - unique_headers = set(headers) - if len(unique_headers) != len(headers): - repeated = ( - (header, headers.count(header)) - for header in unique_headers if headers.count(header) > 1) - raise DuplicateHeader(*( - f"'{header}' is present in the header row {times} times." - for header, times in repeated)) - - return headers - def invalid_header( line_number: int, headers: Sequence[str]) -> Union[InvalidValue, None]: + """Return an `InvalidValue` object if the header row has less than 2 + items.""" if len(headers) < 2: return InvalidValue( line_number, 0, "<TAB>".join(headers), "The header MUST contain at least 2 columns") + return None def invalid_headings( line_number: int, strains: Sequence[str], headings: Sequence[str]) -> Union[Tuple[InvalidValue, ...], None]: + """Return tuple of `InvalidValue` objects for each error found for every + column heading.""" return tuple( InvalidValue( line_number, col, header, f"'{header}' not a valid strain.") @@ -47,13 +28,15 @@ def invalid_headings( def duplicate_headings( line_number: int, headers: Sequence[str]) -> Union[InvalidValue, None]: + """Return a tuple of `DuplicateHeading` objects for each column heading that + is a duplicate of another column heading.""" def __update_columns__(acc, item): if item[1] in acc.keys(): return {**acc, item[1]: acc[item[1]] + (item[0],)} return {**acc, item[1]: (item[0],)} repeated = { heading: columns for heading, columns in - reduce(__update_columns__, enumerate(headers, start=1), dict()).items() + reduce(__update_columns__, enumerate(headers, start=1), {}).items() if len(columns) > 1 } return tuple( diff --git a/quality_control/parsing.py b/quality_control/parsing.py index 70a85ed..655b98a 100644 --- a/quality_control/parsing.py +++ b/quality_control/parsing.py @@ -1,40 +1,22 @@ """Module handling the high-level parsing of the files""" -import csv +import os import collections from enum import Enum -from functools import reduce, partial -from typing import Iterable, Generator +from functools import partial +from typing import Union, Iterable, Generator, Callable import quality_control.average as avg import quality_control.standard_error as se -from quality_control.headers import valid_header +from quality_control.errors import InvalidValue from quality_control.headers import ( invalid_header, invalid_headings, duplicate_headings) -from quality_control.errors import ( - ParseError, DuplicateHeader, InvalidCellValue, InvalidHeaderValue) class FileType(Enum): """Enumerate the expected file types""" AVERAGE = 1 STANDARD_ERROR = 2 -def __parse_header(line, strains): - return valid_header( - set(strains), - tuple(header.strip() for header in line.split("\t"))) - -def __parse_average_line(line): - return (line[0],) + tuple(avg.valid_value(field) for field in line[1:]) - -def __parse_standard_error_line(line): - return (line[0],) + tuple(se.valid_value(field) for field in line[1:]) - -LINE_PARSERS = { - FileType.AVERAGE: __parse_average_line, - FileType.STANDARD_ERROR: __parse_standard_error_line -} - def strain_names(filepath): """Retrieve the strains names from given file""" strains = set() @@ -51,90 +33,22 @@ def strain_names(filepath): return strains -def parse_file(filepath: str, filetype: FileType, strains: list): - """Parse the given file""" - seek_pos = 0 - try: - with open(filepath, encoding="utf-8") as input_file: - for line_number, line in enumerate(input_file): - if line_number == 0: - yield __parse_header(line, strains), seek_pos + len(line) - seek_pos = seek_pos + len(line) - continue - - yield ( - LINE_PARSERS[filetype]( - tuple(field.strip() for field in line.split("\t"))), - seek_pos + len(line)) - seek_pos = seek_pos + len(line) - except (DuplicateHeader, InvalidCellValue, InvalidHeaderValue) as err: - raise ParseError({ - "filepath": filepath, - "filetype": filetype, - "position": seek_pos, - "line_number": line_number, - "error": err - }) from err - -def parse_errors(filepath: str, filetype: FileType, strains: list, - seek_pos: int = 0) -> Generator: - """Retrieve ALL the parse errors""" - assert seek_pos >= 0, "The seek position must be at least zero (0)" - - def __error_type(error): - """Return a nicer string representatiton for the error type.""" - if isinstance(error, DuplicateHeader): - return "Duplicated Headers" - if isinstance(error, InvalidCellValue): - return "Invalid Value" - if isinstance(error, InvalidHeaderValue): - return "Invalid Strain" - - def __errors(filepath, filetype, strains, seek_pos): - """Return only the errors as values""" - with open(filepath, encoding="utf-8") as input_file: - ## TODO: Seek the file to the given seek position - for line_number, line in enumerate(input_file): - if seek_pos > 0: - input_file.seek(seek_pos, 0) - try: - if seek_pos == 0 and line_number == 0: - header = __parse_header(line, strains) - yield None - seek_pos = seek_pos + len(line) - continue - - parsed_line = LINE_PARSERS[filetype]( - tuple(field.strip() for field in line.split("\t"))) - yield None - seek_pos = seek_pos + len(line) - except (DuplicateHeader, InvalidCellValue, InvalidHeaderValue) as err: - yield { - "filepath": filepath, - "filetype": filetype, - "position": seek_pos, - "line_number": line_number, - "error": __error_type(err), - "message": err.args - } - seek_pos = seek_pos + len(line) - - return ( - error for error in __errors(filepath, filetype, strains, seek_pos) - if error is not None) - def header_errors(line_number, fields, strains): + """Gather all header row errors.""" return ( (invalid_header(line_number, fields),) + invalid_headings(line_number, strains, fields[1:]) + duplicate_headings(line_number, fields)) def empty_value(line_number, column_number, value): + """Check for empty field values.""" if value == "": return InvalidValue( line_number, column_number, value, "Empty value for column") + return None def average_errors(line_number, fields): + """Gather all errors for a line in a averages file.""" return ( (empty_value(line_number, 1, fields[0]),) + tuple( @@ -142,6 +56,7 @@ def average_errors(line_number, fields): for field in enumerate(fields[1:], start=2))) def se_errors(line_number, fields): + """Gather all errors for a line in a standard-errors file.""" return ( (empty_value(line_number, 1, fields[0]),) + tuple( @@ -149,7 +64,8 @@ def se_errors(line_number, fields): for field in enumerate(fields[1:], start=2))) def collect_errors( - filepath: str, filetype: FileType, strains: list, count: int = 10) -> Generator: + filepath: str, filetype: FileType, strains: list, + updater: Union[Callable, None] = None) -> Generator: """Run checks against file and collect all the errors""" errors = tuple() def __process_errors__(line_number, line, error_checker_fn, errors = tuple()): @@ -162,30 +78,37 @@ def collect_errors( return errors + tuple(error for error in errs if error is not None) return errors + (errs,) + filesize = os.stat(filepath).st_size + processed_size = 0 with open(filepath, encoding="utf-8") as input_file: for line_number, line in enumerate(input_file, start=1): if line_number == 1: - errors = __process_errors__( - line_number, line, partial(header_errors, strains=strains), - errors) - if line_number != 1: - errors = __process_errors__( - line_number, line, ( - average_errors if filetype == FileType.AVERAGE - else se_errors), - errors) + for error in __process_errors__( + line_number, line, partial(header_errors, strains=strains), + errors): + yield error - if count > 0 and len(errors) >= count: - break - - return errors[0:count] + if line_number != 1: + for error in __process_errors__( + line_number, line, ( + average_errors if filetype == FileType.AVERAGE + else se_errors), + errors): + yield error + + processed_size = processed_size + len(line) + if updater: + updater({ + "line_number": line_number, + "percent": (processed_size/filesize) * 100 + }) def take(iterable: Iterable, num: int) -> list: """Take at most `num` items from `iterable`.""" iterator = iter(iterable) items = [] try: - for i in range(0, num): + for i in range(0, num): # pylint: disable=[unused-variable] items.append(next(iterator)) return items diff --git a/quality_control/standard_error.py b/quality_control/standard_error.py index 022cc9b..c866993 100644 --- a/quality_control/standard_error.py +++ b/quality_control/standard_error.py @@ -3,19 +3,10 @@ import re from typing import Union from .errors import InvalidValue -from .errors import InvalidCellValue -def valid_value(val): - """Checks whether `val` is a valid value for standard errors""" - if re.search(r"^[0-9]+\.[0-9]{6,}$", val): - return float(val) - raise InvalidCellValue( - f"Invalid value '{val}'. " - "Expected string representing a number with at least six decimal " - "places.") - -def invalid_value(line_number: int, column_number: int, val: str) -> Union[ - InvalidValue, None]: +def invalid_value( + line_number: int, column_number: int, val: str) -> Union[ + InvalidValue, None]: """ Returns a `quality_control.errors.InvalidValue` object in the case where `val` is not a valid input for standard error files, otherwise, it returns diff --git a/tests/qc/test_cells.py b/tests/qc/test_cells.py index 46aeb64..a38be30 100644 --- a/tests/qc/test_cells.py +++ b/tests/qc/test_cells.py @@ -1,64 +1,12 @@ """Test that values in cells within a line fulfill the required criteria""" -import pytest from random import randint from hypothesis import given from hypothesis import strategies as st from quality_control.errors import InvalidValue -from quality_control.errors import InvalidCellValue -from quality_control.average import ( - valid_value as avg_valid_value, - invalid_value as avg_invalid_value) -from quality_control.standard_error import ( - valid_value as se_valid_value, - invalid_value as se_invalid_value) - -@given(num_str=st.from_regex( - r"^(?!([0-9]+\.([0-9]{3}|[0-9]{6,}))).*", fullmatch=True)) -def test_cell_value_errors_with_invalid_inputs(num_str): - """Check that an error is raised for a cell with an invalid value.""" - with pytest.raises(InvalidCellValue): - avg_valid_value(num_str) - with pytest.raises(InvalidCellValue): - se_valid_value(num_str) - -@given(num_str=st.from_regex( - r"^[0-9]+\.([0-9]{1,2}|[0-9]{4,}$)", fullmatch=True)) -def test_cell_average_value_errors_if_not_three_decimal_places(num_str): - """Check that an error is raised if the average value does not have 3 decimal places""" - with pytest.raises(InvalidCellValue): - avg_valid_value(num_str) - -@given(num_str=st.from_regex(r"^[0-9]+\.[0-9]{3}$", fullmatch=True)) -def test_cell_average_value_pass_if_three_decimal_places(num_str): - """Check that there is no error if the average value has 3 decimal places.""" - processed = avg_valid_value(num_str) - assert ( - isinstance(processed, float) and - processed == float(num_str)) - -@given(num_str=st.from_regex(r"^[0-9]+\.([0-9]{0,5}$)", fullmatch=True)) -def test_cell_standard_error_value_errors_if_less_than_six_decimal_places(num_str): - """ - Check that an error is raised if the standard error value does not have 6 - decimal places - """ - with pytest.raises(InvalidCellValue): - se_valid_value(num_str) - -@given(num_str=st.from_regex(r"^[0-9]+\.[0-9]{6,}$", fullmatch=True)) -def test_cell_standard_error_value_pass_if_six_or_more_decimal_places(num_str): - """ - Check that there is no error if the standard error value has 3 decimal - places. - """ - processed = se_valid_value(num_str) - assert ( - isinstance(processed, float) and - processed == float(num_str)) - -## ================================================================================ +from quality_control.average import invalid_value as avg_invalid_value +from quality_control.standard_error import invalid_value as se_invalid_value @given(num_str=st.from_regex( r"^(?!([0-9]+\.([0-9]{3}|[0-9]{6,}))).*", fullmatch=True)) diff --git a/tests/qc/test_error_collection.py b/tests/qc/test_error_collection.py index 466f455..fe85bb1 100644 --- a/tests/qc/test_error_collection.py +++ b/tests/qc/test_error_collection.py @@ -1,33 +1,9 @@ +"""Check that error collection works as expected""" + import pytest -from quality_control.parsing import take, FileType, parse_errors -from quality_control.parsing import collect_errors - -@pytest.mark.slow -@pytest.mark.parametrize( - "filepath,filetype,seek_pos", - (("tests/test_data/average_crlf.tsv", FileType.AVERAGE, 0), - ("tests/test_data/average_error_at_end_200MB.tsv", FileType.AVERAGE, - 205500004 # Skip first 500K lines - ), - ("tests/test_data/average.tsv", FileType.AVERAGE, 0), - ("tests/test_data/standarderror_1_error_at_end.tsv", - FileType.STANDARD_ERROR, 0), - ("tests/test_data/standarderror.tsv", FileType.STANDARD_ERROR, 0), - ("tests/test_data/duplicated_headers_no_data_errors.tsv", - FileType.AVERAGE, 0))) -def test_parse_errors(filepath, filetype, strains, seek_pos): - """ - Check that only errors are returned, and that certain properties hold for - said errors. - """ - for error in parse_errors(filepath, filetype, strains, seek_pos): - assert isinstance(error, dict) - assert "filepath" in error - assert "filetype" in error - assert "position" in error - assert "error" in error and isinstance(error["error"], str) - assert "message" in error +from quality_control.errors import InvalidValue, DuplicateHeading +from quality_control.parsing import take, FileType, collect_errors @pytest.mark.parametrize( "sample,num,expected", @@ -35,13 +11,11 @@ def test_parse_errors(filepath, filetype, strains, seek_pos): ([0, 1, 2, 3], 200, [0, 1, 2, 3]), (("he", "is", "a", "lovely", "boy"), 3, ["he", "is", "a"]))) def test_take(sample, num, expected): + """Check that `take` works correctly.""" taken = take(sample, num) assert len(taken) <= num assert taken == expected - -## ================================================== - @pytest.mark.slow @pytest.mark.parametrize( "filepath,filetype,count", @@ -55,4 +29,8 @@ def test_take(sample, num, expected): ("tests/test_data/duplicated_headers_no_data_errors.tsv", FileType.AVERAGE, 10))) def test_collect_errors(filepath, filetype, strains, count): - assert len(collect_errors(filepath, filetype, strains, count)) <= count + """Check that `collect_errors` works as expected.""" + results = take(collect_errors(filepath, filetype, strains), count) + def __valid_instance(item): + return isinstance(item, (InvalidValue, DuplicateHeading)) + assert all(__valid_instance(error) for error in results) diff --git a/tests/qc/test_header.py b/tests/qc/test_header.py index f860a71..2557e85 100644 --- a/tests/qc/test_header.py +++ b/tests/qc/test_header.py @@ -3,42 +3,13 @@ import pytest from hypothesis import given from hypothesis import strategies as st -from quality_control.headers import valid_header from quality_control.errors import InvalidValue, DuplicateHeading -from quality_control.errors import DuplicateHeader, InvalidHeaderValue from quality_control.headers import ( invalid_header, invalid_headings, duplicate_headings) -@given(headers=st.lists(st.text(max_size=10))) -def test_valid_header_errors_with_invalid_headers(headers): - "Verify that the check for header validity works" - with pytest.raises(InvalidHeaderValue): - valid_header(("BXD1", "BXD2", "BXD3"), headers) - -@pytest.mark.parametrize( - "strains,headers", [ - (("BXD1", "BXD2", "BXD3"), ("ProbeSet", "BXD3", "BXD1")), - (("AStrain", "AnotherStrain", "YetAnotherStrain"), - ("Individual", "AStrain", "AnotherStrain", "YetAnotherStrain"))]) -def test_valid_header_strains_passes_with_valid_headers(strains, headers): - "Verify that the check for header validity works" - assert valid_header(strains, headers) - -@pytest.mark.parametrize( - "strains,headers", [ - (("BXD1", "BXD2", "BXD3"), ("ProbeSet", "BXD3", "BXD1", "BXD1")), - (("AStrain", "AnotherStrain", "YetAnotherStrain"), - ("Individual", "AStrain", "AnotherStrain", "YetAnotherStrain", - "AStrain"))]) -def test_valid_header_fails_with_duplicate_headers(strains, headers): - """Check that parsing fails if any header is duplicated""" - with pytest.raises(DuplicateHeader): - valid_header(strains, headers) - -## ============================================================ - @given(headers=st.lists(st.text(max_size=10), max_size=1)) def test_invalid_header_with_list_of_one_value(headers): + """Test `invalid_header` with invalid header row""" assert invalid_header(0, headers) == InvalidValue( 0, 0, "<TAB>".join(headers), "The header MUST contain at least 2 columns") @@ -51,13 +22,12 @@ def test_invalid_headings_with_invalid_inputs(headings): for col, heading in enumerate(headings, start=2)) @pytest.mark.parametrize( - "strains,headers", [ - (("BXD1", "BXD2", "BXD3"), ("ProbeSet", "BXD3", "BXD1")), - (("AStrain", "AnotherStrain", "YetAnotherStrain"), - ("Individual", "AStrain", "AnotherStrain", "YetAnotherStrain"))]) -def test_invalid_header_with_valid_headers(strains, headers): + "headers", [ + (("ProbeSet", "BXD3", "BXD1")), + (("Individual", "AStrain", "AnotherStrain", "YetAnotherStrain"))]) +def test_invalid_header_with_valid_headers(headers): "Verify that the check for header validity works" - assert invalid_header(0, headers) == None + assert invalid_header(0, headers) is None @pytest.mark.parametrize( "strains,headings", [ @@ -76,7 +46,7 @@ def test_invalid_headings_with_valid_headings(strains, headings): def test_duplicate_headers_with_repeated_column_headings(headers, repeated): """Check that parsing fails if any header is duplicated""" assert duplicate_headings(0, headers) == tuple( - DuplicateHeading(0, head, cols, ( + DuplicateHeading(0, cols, head, ( f"Heading '{head}', is repeated in columns " f"{','.join(str(i) for i in cols)}")) for head, cols in repeated.items()) @@ -88,4 +58,3 @@ def test_duplicate_headers_with_repeated_column_headings(headers, repeated): def test_duplicate_headers_with_unique_column_headings(headers): """Check that parsing fails if any header is duplicated""" assert duplicate_headings(0, headers) == tuple() - diff --git a/tests/qc/test_parsing.py b/tests/qc/test_parsing.py deleted file mode 100644 index 41739ad..0000000 --- a/tests/qc/test_parsing.py +++ /dev/null @@ -1,53 +0,0 @@ -"""Test the parsing of the files""" -import pytest - -from quality_control.errors import ParseError -from quality_control.parsing import FileType, parse_file - -@pytest.mark.parametrize( - "filepath,filetype", - (("tests/test_data/average_crlf.tsv", FileType.STANDARD_ERROR), - ("tests/test_data/average_error_at_end_200MB.tsv", - FileType.STANDARD_ERROR), - ("tests/test_data/average.tsv", FileType.STANDARD_ERROR), - ("tests/test_data/standarderror_1_error_at_end.tsv", FileType.AVERAGE), - ("tests/test_data/standarderror.tsv", FileType.AVERAGE), - ("tests/test_data/duplicated_headers_no_data_errors.tsv", - FileType.STANDARD_ERROR),)) -def test_parse_file_fails_with_wrong_filetype_declaration(filepath, filetype, strains): - """Check that parsing fails if the wrong file type is declared""" - with pytest.raises(ParseError): - for line in parse_file(filepath, filetype, strains): # pylint: disable=[unused-variable] - pass - -@pytest.mark.parametrize( - "filepath,filetype", - (("tests/test_data/no_data_errors.tsv", FileType.AVERAGE),)) -def test_parse_file_passes_with_valid_files(filepath, filetype, strains): - """Check that parsing succeeds with valid files""" - for line in parse_file(filepath, filetype, strains): - assert bool(line) - -@pytest.mark.slow -@pytest.mark.parametrize( - "filepath,filetype", - (("tests/test_data/average_large_no_errors.tsv", FileType.AVERAGE), - # ("tests/test_data/average_no_errors.tsv", FileType.AVERAGE), - # ("tests/test_data/standarderror_no_errors.tsv", FileType.STANDARD_ERROR), - )) -def test_parse_file_works_with_large_files(filepath, filetype, strains): - """Check that parsing succeeds even with large files.""" - for line in parse_file(filepath, filetype, strains): - assert bool(line) - -@pytest.mark.slow -@pytest.mark.parametrize( - "filepath,filetype", - (("tests/test_data/average_error_at_end_200MB.tsv", FileType.AVERAGE), - ("tests/test_data/standarderror_1_error_at_end.tsv", FileType.STANDARD_ERROR), - ("tests/test_data/duplicated_headers_no_data_errors.tsv", FileType.AVERAGE))) -def test_parse_file_raises_exception_on_error_in_file(filepath, filetype, strains): - "Check that parsing fails if any error is found in a file" - with pytest.raises(ParseError): - for line in parse_file(filepath, filetype, strains): # pylint: disable=[unused-variable] - pass |