From 582686e030b660f218cb7091aaab3cafa103465d Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Wed, 18 May 2022 10:36:10 +0300 Subject: Return errors when found or None otherwise This commit adds a number of functions that return the error object when an error is found, or `None` otherwise. It avoids the use of exceptions as control flow constructs. --- quality_control/average.py | 12 ++++++ quality_control/errors.py | 8 ++++ quality_control/headers.py | 39 ++++++++++++++++++ quality_control/standard_error.py | 17 ++++++++ tests/qc/test_cells.py | 85 ++++++++++++++++++++++++++++++++++++++- tests/qc/test_header.py | 58 ++++++++++++++++++++++++++ 6 files changed, 217 insertions(+), 2 deletions(-) diff --git a/quality_control/average.py b/quality_control/average.py index 2907b9c..9ca16a9 100644 --- a/quality_control/average.py +++ b/quality_control/average.py @@ -1,6 +1,8 @@ """Contain logic for checking average files""" import re +from typing import Union +from .errors import InvalidValue from .errors import InvalidCellValue def valid_value(val): @@ -11,3 +13,13 @@ def valid_value(val): f"Invalid value '{val}'. " "Expected string representing a number with exactly three decimal " "places.") + +def invalid_value(line_number: int, column_number: int, val: str) -> Union[ + InvalidValue, None]: + if re.search(r"^[0-9]+\.[0-9]{3}$", val): + return None + return InvalidValue( + line_number, column_number, val, ( + f"Invalid value '{val}'. " + "Expected string representing a number with exactly three decimal " + "places.")) diff --git a/quality_control/errors.py b/quality_control/errors.py index 29a38f9..1eda646 100644 --- a/quality_control/errors.py +++ b/quality_control/errors.py @@ -1,5 +1,7 @@ """Hold exceptions for QC package""" +from collections import namedtuple + class InvalidCellValue(Exception): """Raised when a function encounters an invalid value""" @@ -22,3 +24,9 @@ class ParseError(Exception): """Raised if any of the above exceptions are raised""" def __init__(self, *args): super().__init__(*args) + +InvalidValue = namedtuple( + "InvalidValue", ("line_number", "column_number", "value", "message")) + +DuplicateHeading = namedtuple( + "InvalidValue", ("line_number", "heading", "columns","message")) diff --git a/quality_control/headers.py b/quality_control/headers.py index b7bc01e..a5a5065 100644 --- a/quality_control/headers.py +++ b/quality_control/headers.py @@ -1,5 +1,9 @@ """Validate the headers""" +from functools import reduce +from typing import Union, Tuple, Sequence + +from quality_control.errors import InvalidValue, DuplicateHeading from quality_control.errors import DuplicateHeader, InvalidHeaderValue def valid_header(strains, headers): @@ -23,3 +27,38 @@ def valid_header(strains, headers): for header, times in repeated)) return headers + + +def invalid_header( + line_number: int, headers: Sequence[str]) -> Union[InvalidValue, None]: + if len(headers) < 2: + return InvalidValue( + line_number, 0, "".join(headers), + "The header MUST contain at least 2 columns") + +def invalid_headings( + line_number: int, strains: Sequence[str], + headings: Sequence[str]) -> Union[Tuple[InvalidValue, ...], None]: + return tuple( + InvalidValue( + line_number, col, header, f"'{header}' not a valid strain.") + for col, header in + enumerate(headings, start=2) if header not in strains) + +def duplicate_headings( + line_number: int, headers: Sequence[str]) -> Union[InvalidValue, None]: + def __update_columns__(acc, item): + if item[1] in acc.keys(): + return {**acc, item[1]: acc[item[1]] + (item[0],)} + return {**acc, item[1]: (item[0],)} + repeated = { + heading: columns for heading, columns in + reduce(__update_columns__, enumerate(headers, start=1), dict()).items() + if len(columns) > 1 + } + return tuple( + DuplicateHeading( + line_number, heading, columns, ( + f"Heading '{heading}', is repeated in columns " + f"{','.join(str(i) for i in columns)}")) + for heading, columns in repeated.items()) diff --git a/quality_control/standard_error.py b/quality_control/standard_error.py index f1e33c4..022cc9b 100644 --- a/quality_control/standard_error.py +++ b/quality_control/standard_error.py @@ -1,6 +1,8 @@ """Contain logic for checking standard error files""" import re +from typing import Union +from .errors import InvalidValue from .errors import InvalidCellValue def valid_value(val): @@ -11,3 +13,18 @@ def valid_value(val): f"Invalid value '{val}'. " "Expected string representing a number with at least six decimal " "places.") + +def invalid_value(line_number: int, column_number: int, val: str) -> Union[ + InvalidValue, None]: + """ + Returns a `quality_control.errors.InvalidValue` object in the case where + `val` is not a valid input for standard error files, otherwise, it returns + `None`. + """ + if re.search(r"^[0-9]+\.[0-9]{6,}$", val): + return None + return InvalidValue( + line_number, column_number, val, ( + f"Invalid value '{val}'. " + "Expected string representing a number with at least six decimal " + "places.")) diff --git a/tests/qc/test_cells.py b/tests/qc/test_cells.py index d4ef911..46aeb64 100644 --- a/tests/qc/test_cells.py +++ b/tests/qc/test_cells.py @@ -1,12 +1,18 @@ """Test that values in cells within a line fulfill the required criteria""" import pytest +from random import randint from hypothesis import given from hypothesis import strategies as st +from quality_control.errors import InvalidValue from quality_control.errors import InvalidCellValue -from quality_control.average import valid_value as avg_valid_value -from quality_control.standard_error import valid_value as se_valid_value +from quality_control.average import ( + valid_value as avg_valid_value, + invalid_value as avg_invalid_value) +from quality_control.standard_error import ( + valid_value as se_valid_value, + invalid_value as se_invalid_value) @given(num_str=st.from_regex( r"^(?!([0-9]+\.([0-9]{3}|[0-9]{6,}))).*", fullmatch=True)) @@ -51,3 +57,78 @@ def test_cell_standard_error_value_pass_if_six_or_more_decimal_places(num_str): assert ( isinstance(processed, float) and processed == float(num_str)) + +## ================================================================================ + +@given(num_str=st.from_regex( + r"^(?!([0-9]+\.([0-9]{3}|[0-9]{6,}))).*", fullmatch=True)) +def test_cell_value_errors_with_invalid_inputs2(num_str): + """ + GIVEN: `num_str` is an arbitrary string that is an invalid input, + WHEN: `num_str` is provided as an argument to `*_invalid_value` functions, + THEN: The `*_invalid_value` functions return a + `quality_control.errors.InvalidValue` object which holds the error + information. + """ + assert avg_invalid_value(0, 0, num_str) == InvalidValue( + 0, 0, num_str, ( + f"Invalid value '{num_str}'. Expected string representing a number " + "with exactly three decimal places.")) + assert se_invalid_value(0, 0, num_str) == InvalidValue( + 0, 0, num_str, ( + f"Invalid value '{num_str}'. Expected string representing a number " + "with at least six decimal places.")) + +@given(num_str=st.from_regex( + r"^[0-9]+\.([0-9]{1,2}|[0-9]{4,}$)", fullmatch=True)) +def test_cell_average_value_errors_if_not_three_decimal_places2(num_str): + """ + GIVEN: `num_str` is a string representing a number with less than or more + than three decimal places, e.g. 2.92, 39.483732 + WHEN: `num_str` is provided as an argument to `avg_invalid_value` function, + THEN: `avg_invalid_value` returns a `quality_control.errors.InvalidValue` + object with the information about the placement of the invalid value. + """ + line, col = randint(0, 100), randint(0, 20) + assert avg_invalid_value(line, col, num_str) == InvalidValue( + line, col, num_str, ( + f"Invalid value '{num_str}'. Expected string representing a number " + "with exactly three decimal places.")) + +@given(num_str=st.from_regex(r"^[0-9]+\.[0-9]{3}$", fullmatch=True)) +def test_cell_average_value_pass_if_three_decimal_places(num_str): + """ + GIVEN: `num_str` is a string representing a number with exactly three + decimal places, e.g. 2.924, 39.483 + WHEN: `num_str` is provided as an argument to `avg_invalid_value` function, + THEN: `avg_invalid_value` returns `None` + """ + line, col = randint(0, 100), randint(0, 20) + assert avg_invalid_value(line, col, num_str) is None + +@given(num_str=st.from_regex(r"^[0-9]+\.([0-9]{0,5}$)", fullmatch=True)) +def test_cell_standard_error_value_errors_if_less_than_six_decimal_places2(num_str): + """ + GIVEN: `num_str` is a string representing a number with less than six + decimal places, e.g. 2.9, 39.4837 + WHEN: `num_str` is provided as an argument to `se_invalid_value` function, + THEN: `se_invalid_value` returns a `quality_control.errors.InvalidValue` + object with the information about the placement of the invalid value. + """ + line, col = randint(0, 100), randint(0, 20) + assert se_invalid_value(line, col, num_str) == InvalidValue( + line, col, num_str, ( + f"Invalid value '{num_str}'. Expected string representing a number " + "with at least six decimal places.")) + + +@given(num_str=st.from_regex(r"^[0-9]+\.[0-9]{6,}$", fullmatch=True)) +def test_cell_standard_error_value_pass_if_six_or_more_decimal_places(num_str): + """ + GIVEN: `num_str` is a string representing a number with six or more + decimal places, e.g. 2.938434, 39.4837343 + WHEN: `num_str` is provided as an argument to `se_invalid_value` function, + THEN: `se_invalid_value` returns `None` + """ + line, col = randint(0, 100), randint(0, 20) + assert se_invalid_value(line, col, num_str) is None diff --git a/tests/qc/test_header.py b/tests/qc/test_header.py index 6ca9376..f860a71 100644 --- a/tests/qc/test_header.py +++ b/tests/qc/test_header.py @@ -4,7 +4,10 @@ from hypothesis import given from hypothesis import strategies as st from quality_control.headers import valid_header +from quality_control.errors import InvalidValue, DuplicateHeading from quality_control.errors import DuplicateHeader, InvalidHeaderValue +from quality_control.headers import ( + invalid_header, invalid_headings, duplicate_headings) @given(headers=st.lists(st.text(max_size=10))) def test_valid_header_errors_with_invalid_headers(headers): @@ -31,3 +34,58 @@ def test_valid_header_fails_with_duplicate_headers(strains, headers): """Check that parsing fails if any header is duplicated""" with pytest.raises(DuplicateHeader): valid_header(strains, headers) + +## ============================================================ + +@given(headers=st.lists(st.text(max_size=10), max_size=1)) +def test_invalid_header_with_list_of_one_value(headers): + assert invalid_header(0, headers) == InvalidValue( + 0, 0, "".join(headers), + "The header MUST contain at least 2 columns") + +@given(headings=st.lists(st.text(min_size=2, max_size=10), min_size=2)) +def test_invalid_headings_with_invalid_inputs(headings): + "Verify that the check for header validity works" + assert invalid_headings(0, ("BXD1", "BXD2", "BXD3"), headings) == tuple( + InvalidValue(0, col, heading, f"'{heading}' not a valid strain.") + for col, heading in enumerate(headings, start=2)) + +@pytest.mark.parametrize( + "strains,headers", [ + (("BXD1", "BXD2", "BXD3"), ("ProbeSet", "BXD3", "BXD1")), + (("AStrain", "AnotherStrain", "YetAnotherStrain"), + ("Individual", "AStrain", "AnotherStrain", "YetAnotherStrain"))]) +def test_invalid_header_with_valid_headers(strains, headers): + "Verify that the check for header validity works" + assert invalid_header(0, headers) == None + +@pytest.mark.parametrize( + "strains,headings", [ + (("BXD1", "BXD2", "BXD3"), ("BXD3", "BXD1")), + (("AStrain", "AnotherStrain", "YetAnotherStrain"), + ("AStrain", "AnotherStrain", "YetAnotherStrain"))]) +def test_invalid_headings_with_valid_headings(strains, headings): + "Verify that the check for header validity works" + assert invalid_headings(0, strains, headings) == tuple() + +@pytest.mark.parametrize( + "headers,repeated", [ + (("ProbeSet", "BXD3", "BXD1", "BXD1"), {"BXD1": (3, 4)}), + (("Individual", "AStrain", "AnotherStrain", "YetAnotherStrain", + "AStrain"), {"AStrain": (2, 5)})]) +def test_duplicate_headers_with_repeated_column_headings(headers, repeated): + """Check that parsing fails if any header is duplicated""" + assert duplicate_headings(0, headers) == tuple( + DuplicateHeading(0, head, cols, ( + f"Heading '{head}', is repeated in columns " + f"{','.join(str(i) for i in cols)}")) + for head, cols in repeated.items()) + +@pytest.mark.parametrize( + "headers", [ + (("ProbeSet", "BXD3", "BXD1")), + (("Individual", "AStrain", "AnotherStrain", "YetAnotherStrain",))]) +def test_duplicate_headers_with_unique_column_headings(headers): + """Check that parsing fails if any header is duplicated""" + assert duplicate_headings(0, headers) == tuple() + -- cgit v1.2.3