aboutsummaryrefslogtreecommitdiff
path: root/quality_control
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2022-04-21 10:24:29 +0300
committerFrederick Muriuki Muriithi2022-04-21 10:24:29 +0300
commitd70dbd0addb861aa37c2f2574a537319a75411c7 (patch)
treefc29c01279a5a1d6485e9dcf44c6d169dddfc73a /quality_control
parent7b3dc9d36de1db28a6f36b03de85cf7f527231cc (diff)
downloadgn-uploader-d70dbd0addb861aa37c2f2574a537319a75411c7.tar.gz
Collect all the errors
Build a function to collect all the parsing errors into a "sequence" of dict objects containing the issues found.
Diffstat (limited to 'quality_control')
-rw-r--r--quality_control/parsing.py48
1 files changed, 48 insertions, 0 deletions
diff --git a/quality_control/parsing.py b/quality_control/parsing.py
index abed22e..ac53642 100644
--- a/quality_control/parsing.py
+++ b/quality_control/parsing.py
@@ -3,6 +3,7 @@
import csv
from enum import Enum
from functools import reduce
+from typing import Iterator, Generator
import quality_control.average as avg
import quality_control.standard_error as se
@@ -76,3 +77,50 @@ def parse_file(filepath: str, filetype: FileType, strains: list):
"line_number": line_number,
"error": err
}) from err
+
+def parse_errors(filepath: str, filetype: FileType, strains: list,
+ seek_pos: int = 0) -> Generator:
+ """Retrieve ALL the parse errors"""
+ print(f"seek_pos: {seek_pos}, {type(seek_pos)}")
+ assert seek_pos >= 0, "The seek position must be at least zero (0)"
+
+ def __error_type(error):
+ """Return a nicer string representatiton for the error type."""
+ if isinstance(error, DuplicateHeader):
+ return "Duplicated Headers"
+ if isinstance(error, InvalidCellValue):
+ return "Invalid Value"
+ if isinstance(error, InvalidHeaderValue):
+ return "Invalid Strain"
+
+ def __errors(filepath, filetype, strains, seek_pos):
+ """Return only the errors as values"""
+ with open(filepath, encoding="utf-8") as input_file:
+ ## TODO: Seek the file to the given seek position
+ for line_number, line in enumerate(input_file):
+ if seek_pos > 0:
+ input_file.seek(seek_pos, 0)
+ try:
+ if seek_pos == 0 and line_number == 0:
+ header = __parse_header(line, strains)
+ yield None
+ seek_pos = seek_pos + len(line)
+ continue
+
+ parsed_line = LINE_PARSERS[filetype](
+ tuple(field.strip() for field in line.split("\t")))
+ yield None
+ seek_pos = seek_pos + len(line)
+ except (DuplicateHeader, InvalidCellValue, InvalidHeaderValue) as err:
+ yield {
+ "filepath": filepath,
+ "filetype": filetype,
+ "position": seek_pos,
+ "line_number": line_number,
+ "error": __error_type(err),
+ "message": err.args
+ }
+
+ return (
+ error for error in __errors(filepath, filetype, strains, seek_pos)
+ if error is not None)