"""Module handling the high-level parsing of the files""" import csv from enum import Enum from functools import reduce from typing import Iterable, Generator import quality_control.average as avg import quality_control.standard_error as se from quality_control.headers import valid_header from quality_control.errors import ( ParseError, DuplicateHeader, InvalidCellValue, InvalidHeaderValue) class FileType(Enum): """Enumerate the expected file types""" AVERAGE = 1 STANDARD_ERROR = 2 def parse_strains(filepath): """Parse the strains file""" with open(filepath, encoding="utf8") as strains_file: reader = csv.DictReader( strains_file, fieldnames=[ header.strip() for header in strains_file.readline().split("\t")], delimiter="\t") for row in reader: yield { key: (value if value != "\\N" else None) for key, value in row.items() } def __parse_header(line, strains): return valid_header( set(strains), tuple(header.strip() for header in line.split("\t"))) def __parse_average_line(line): return (line[0],) + tuple(avg.valid_value(field) for field in line[1:]) def __parse_standard_error_line(line): return (line[0],) + tuple(se.valid_value(field) for field in line[1:]) LINE_PARSERS = { FileType.AVERAGE: __parse_average_line, FileType.STANDARD_ERROR: __parse_standard_error_line } def strain_names(strains): """Retrieve a complete list of the names of the strains""" def __extract_strain_names(acc, strain): return acc + tuple( item for item in (strain["Name"], strain["Name2"]) if (item is not None and item != "")) return reduce(__extract_strain_names, strains, tuple()) def parse_file(filepath: str, filetype: FileType, strains: list): """Parse the given file""" seek_pos = 0 try: with open(filepath, encoding="utf-8") as input_file: for line_number, line in enumerate(input_file): if line_number == 0: yield __parse_header(line, strains) seek_pos = seek_pos + len(line) continue yield LINE_PARSERS[filetype]( tuple(field.strip() for field in line.split("\t"))) seek_pos = seek_pos + len(line) except (DuplicateHeader, InvalidCellValue, InvalidHeaderValue) as err: raise ParseError({ "filepath": filepath, "filetype": filetype, "position": seek_pos, "line_number": line_number, "error": err }) from err def parse_errors(filepath: str, filetype: FileType, strains: list, seek_pos: int = 0) -> Generator: """Retrieve ALL the parse errors""" print(f"seek_pos: {seek_pos}, {type(seek_pos)}") assert seek_pos >= 0, "The seek position must be at least zero (0)" def __error_type(error): """Return a nicer string representatiton for the error type.""" if isinstance(error, DuplicateHeader): return "Duplicated Headers" if isinstance(error, InvalidCellValue): return "Invalid Value" if isinstance(error, InvalidHeaderValue): return "Invalid Strain" def __errors(filepath, filetype, strains, seek_pos): """Return only the errors as values""" with open(filepath, encoding="utf-8") as input_file: ## TODO: Seek the file to the given seek position for line_number, line in enumerate(input_file): if seek_pos > 0: input_file.seek(seek_pos, 0) try: if seek_pos == 0 and line_number == 0: header = __parse_header(line, strains) yield None seek_pos = seek_pos + len(line) continue parsed_line = LINE_PARSERS[filetype]( tuple(field.strip() for field in line.split("\t"))) yield None seek_pos = seek_pos + len(line) except (DuplicateHeader, InvalidCellValue, InvalidHeaderValue) as err: yield { "filepath": filepath, "filetype": filetype, "position": seek_pos, "line_number": line_number, "error": __error_type(err), "message": err.args } seek_pos = seek_pos + len(line) return ( error for error in __errors(filepath, filetype, strains, seek_pos) if error is not None) def take(iterable: Iterable, num: int) -> list: """Take at most `num` items from `iterable`.""" iterator = iter(iterable) items = [] try: for i in range(0, num): items.append(next(iterator)) return items except StopIteration: return items