about summary refs log tree commit diff
path: root/quality_control/headers.py
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2022-05-18 10:36:10 +0300
committerFrederick Muriuki Muriithi2022-05-18 10:36:10 +0300
commit582686e030b660f218cb7091aaab3cafa103465d (patch)
treee035d570c0a755031758770f4fcd3b240638e891 /quality_control/headers.py
parent4be0ad66b86e238dd92da191061ffc63bee3d09f (diff)
downloadgn-uploader-582686e030b660f218cb7091aaab3cafa103465d.tar.gz
Return errors when found or None otherwise
This commit adds a number of functions that return the error object
when an error is found, or `None` otherwise. It avoids the use of
exceptions as control flow constructs.
Diffstat (limited to 'quality_control/headers.py')
-rw-r--r--quality_control/headers.py39
1 files changed, 39 insertions, 0 deletions
diff --git a/quality_control/headers.py b/quality_control/headers.py
index b7bc01e..a5a5065 100644
--- a/quality_control/headers.py
+++ b/quality_control/headers.py
@@ -1,5 +1,9 @@
 """Validate the headers"""
 
+from functools import reduce
+from typing import Union, Tuple, Sequence
+
+from quality_control.errors import InvalidValue, DuplicateHeading
 from quality_control.errors import DuplicateHeader, InvalidHeaderValue
 
 def valid_header(strains, headers):
@@ -23,3 +27,38 @@ def valid_header(strains, headers):
             for header, times in repeated))
 
     return headers
+
+
+def invalid_header(
+        line_number: int, headers: Sequence[str]) -> Union[InvalidValue, None]:
+    if len(headers) < 2:
+        return InvalidValue(
+            line_number, 0, "<TAB>".join(headers),
+            "The header MUST contain at least 2 columns")
+
+def invalid_headings(
+        line_number: int, strains: Sequence[str],
+        headings: Sequence[str]) -> Union[Tuple[InvalidValue, ...], None]:
+    return tuple(
+        InvalidValue(
+            line_number, col, header, f"'{header}' not a valid strain.")
+        for col, header in
+        enumerate(headings, start=2) if header not in strains)
+
+def duplicate_headings(
+        line_number: int, headers: Sequence[str]) -> Union[InvalidValue, None]:
+    def __update_columns__(acc, item):
+        if item[1] in acc.keys():
+            return {**acc, item[1]: acc[item[1]] + (item[0],)}
+        return {**acc, item[1]: (item[0],)}
+    repeated = {
+        heading: columns for heading, columns in
+        reduce(__update_columns__, enumerate(headers, start=1), dict()).items()
+        if len(columns) > 1
+    }
+    return tuple(
+        DuplicateHeading(
+            line_number, heading, columns, (
+                f"Heading '{heading}', is repeated in columns "
+                f"{','.join(str(i) for i in columns)}"))
+        for heading, columns in repeated.items())