about summary refs log tree commit diff
path: root/quality_control
diff options
context:
space:
mode:
Diffstat (limited to 'quality_control')
-rw-r--r--quality_control/parsing.py48
1 files changed, 48 insertions, 0 deletions
diff --git a/quality_control/parsing.py b/quality_control/parsing.py
index abed22e..ac53642 100644
--- a/quality_control/parsing.py
+++ b/quality_control/parsing.py
@@ -3,6 +3,7 @@
 import csv
 from enum import Enum
 from functools import reduce
+from typing import Iterator, Generator
 
 import quality_control.average as avg
 import quality_control.standard_error as se
@@ -76,3 +77,50 @@ def parse_file(filepath: str, filetype: FileType, strains: list):
             "line_number": line_number,
             "error": err
         }) from err
+
+def parse_errors(filepath: str, filetype: FileType, strains: list,
+                 seek_pos: int = 0) -> Generator:
+    """Retrieve ALL the parse errors"""
+    print(f"seek_pos: {seek_pos}, {type(seek_pos)}")
+    assert seek_pos >= 0, "The seek position must be at least zero (0)"
+
+    def __error_type(error):
+        """Return a nicer string representatiton for the error type."""
+        if isinstance(error, DuplicateHeader):
+            return "Duplicated Headers"
+        if isinstance(error, InvalidCellValue):
+            return "Invalid Value"
+        if isinstance(error, InvalidHeaderValue):
+            return "Invalid Strain"
+
+    def __errors(filepath, filetype, strains, seek_pos):
+        """Return only the errors as values"""
+        with open(filepath, encoding="utf-8") as input_file:
+            ## TODO: Seek the file to the given seek position
+            for line_number, line in enumerate(input_file):
+                if seek_pos > 0:
+                    input_file.seek(seek_pos, 0)
+                try:
+                    if seek_pos == 0 and line_number == 0:
+                        header = __parse_header(line, strains)
+                        yield None
+                        seek_pos = seek_pos + len(line)
+                        continue
+
+                    parsed_line = LINE_PARSERS[filetype](
+                        tuple(field.strip() for field in line.split("\t")))
+                    yield None
+                    seek_pos = seek_pos + len(line)
+                except (DuplicateHeader, InvalidCellValue, InvalidHeaderValue) as err:
+                    yield {
+                        "filepath": filepath,
+                        "filetype": filetype,
+                        "position": seek_pos,
+                        "line_number": line_number,
+                        "error": __error_type(err),
+                        "message": err.args
+                    }
+
+    return (
+        error for error in __errors(filepath, filetype, strains, seek_pos)
+        if error is not None)