import csv from enum import Enum from functools import reduce import quality_control.average as avg import quality_control.standard_error as se from quality_control.headers import valid_header from quality_control.errors import ( ParseError, DuplicateHeader, InvalidCellValue, InvalidHeaderValue) class FileType(Enum): AVERAGE = 1 STANDARD_ERROR = 2 def parse_strains(filepath): with open(filepath) as strains_file: reader = csv.DictReader( strains_file, fieldnames=[ header.strip() for header in strains_file.readline().split("\t")], delimiter="\t") for row in reader: yield { key: (value if value != "\\N" else None) for key, value in row.items() } def __parse_header(line, strains): return valid_header( set(strains), tuple(header.strip() for header in line.split("\t"))) def __parse_average_line(line): return (line[0],) + tuple(avg.valid_value(field) for field in line[1:]) def __parse_standard_error_line(line): return (line[0],) + tuple(se.valid_value(field) for field in line[1:]) LINE_PARSERS = { FileType.AVERAGE: __parse_average_line, FileType.STANDARD_ERROR: __parse_standard_error_line } def strain_names(strains): def __extract_strain_names(acc, strain): return acc + tuple( item for item in (strain["Name"], strain["Name2"]) if (item is not None and item != "")) return reduce(__extract_strain_names, strains, tuple()) def parse_file(filepath: str, filetype: FileType, strains: list): seek_pos = 0 try: with open(filepath, encoding="utf-8") as input_file: for line_number, line in enumerate(input_file): if line_number == 0: yield __parse_header(line, strains) seek_pos = seek_pos + len(line) yield LINE_PARSERS[filetype]( tuple(field.strip() for field in line.split("\t"))) seek_pos = seek_pos + len(line) except (DuplicateHeader, InvalidCellValue, InvalidHeaderValue) as err: raise ParseError({ "filepath": filepath, "filetype": filetype, "position": seek_pos, "line_number": line_number })