import csv from enum import Enum import quality_control.average as avg import quality_control.standard_error as se from quality_control.headers import valid_header from quality_control.errors import ( ParseError, DuplicateHeader, InvalidCellValue, InvalidHeaderValue) class FileType(Enum): AVERAGE = 1 STANDARD_ERROR = 2 def parse_strains(filepath): with open(filepath) as strains_file: reader = csv.DictReader( strains_file, fieldnames=[ header.strip() for header in strains_file.readline().split("\t")], delimiter="\t") for row in reader: yield { key: (value if value != "\\N" else None) for key, value in row.items() } def __parse_header(line, strains): return valid_header( strains, tuple(header.strip() for header in line.split("\t"))) def __parse_average_line(line): return (line[0],) + tuple(avg.valid_value(field) for field in line[1:]) def __parse_standard_error_line(line): return (line[0],) + tuple(se.valid_value(field) for field in line[1:]) LINE_PARSERS = { FileType.AVERAGE: __parse_average_line, FileType.STANDARD_ERROR: __parse_standard_error_line } def parse_file(filepath: str, filetype: FileType, strains_filepath: str): seek_pos = 0 try: with open(filepath, encoding="utf-8") as input_file: for line_number, line in enumerate(input_file): if line_number == 0: yield __parse_header( line, tuple(strain["Name"] for strain in parse_strains(strains_filepath))) seek_pos = seek_pos + len(line) yield LINE_PARSERS[filetype]( tuple(field.strip() for field in line.split("\t"))) seek_pos = seek_pos + len(line) except (DuplicateHeader, InvalidCellValue, InvalidHeaderValue) as err: raise ParseError({ "filepath": filepath, "filetype": filetype, "position": seek_pos, "line_number": line_number })