diff options
author | Frederick Muriuki Muriithi | 2022-04-21 10:24:29 +0300 |
---|---|---|
committer | Frederick Muriuki Muriithi | 2022-04-21 10:24:29 +0300 |
commit | d70dbd0addb861aa37c2f2574a537319a75411c7 (patch) | |
tree | fc29c01279a5a1d6485e9dcf44c6d169dddfc73a /quality_control | |
parent | 7b3dc9d36de1db28a6f36b03de85cf7f527231cc (diff) | |
download | gn-uploader-d70dbd0addb861aa37c2f2574a537319a75411c7.tar.gz |
Collect all the errors
Build a function to collect all the parsing errors into a "sequence"
of dict objects containing the issues found.
Diffstat (limited to 'quality_control')
-rw-r--r-- | quality_control/parsing.py | 48 |
1 files changed, 48 insertions, 0 deletions
diff --git a/quality_control/parsing.py b/quality_control/parsing.py index abed22e..ac53642 100644 --- a/quality_control/parsing.py +++ b/quality_control/parsing.py @@ -3,6 +3,7 @@ import csv from enum import Enum from functools import reduce +from typing import Iterator, Generator import quality_control.average as avg import quality_control.standard_error as se @@ -76,3 +77,50 @@ def parse_file(filepath: str, filetype: FileType, strains: list): "line_number": line_number, "error": err }) from err + +def parse_errors(filepath: str, filetype: FileType, strains: list, + seek_pos: int = 0) -> Generator: + """Retrieve ALL the parse errors""" + print(f"seek_pos: {seek_pos}, {type(seek_pos)}") + assert seek_pos >= 0, "The seek position must be at least zero (0)" + + def __error_type(error): + """Return a nicer string representatiton for the error type.""" + if isinstance(error, DuplicateHeader): + return "Duplicated Headers" + if isinstance(error, InvalidCellValue): + return "Invalid Value" + if isinstance(error, InvalidHeaderValue): + return "Invalid Strain" + + def __errors(filepath, filetype, strains, seek_pos): + """Return only the errors as values""" + with open(filepath, encoding="utf-8") as input_file: + ## TODO: Seek the file to the given seek position + for line_number, line in enumerate(input_file): + if seek_pos > 0: + input_file.seek(seek_pos, 0) + try: + if seek_pos == 0 and line_number == 0: + header = __parse_header(line, strains) + yield None + seek_pos = seek_pos + len(line) + continue + + parsed_line = LINE_PARSERS[filetype]( + tuple(field.strip() for field in line.split("\t"))) + yield None + seek_pos = seek_pos + len(line) + except (DuplicateHeader, InvalidCellValue, InvalidHeaderValue) as err: + yield { + "filepath": filepath, + "filetype": filetype, + "position": seek_pos, + "line_number": line_number, + "error": __error_type(err), + "message": err.args + } + + return ( + error for error in __errors(filepath, filetype, strains, seek_pos) + if error is not None) |