1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
"""Module handling the high-level parsing of the files"""
import csv
from enum import Enum
from functools import reduce
import quality_control.average as avg
import quality_control.standard_error as se
from quality_control.headers import valid_header
from quality_control.errors import (
ParseError, DuplicateHeader, InvalidCellValue, InvalidHeaderValue)
class FileType(Enum):
"""Enumerate the expected file types"""
AVERAGE = 1
STANDARD_ERROR = 2
def parse_strains(filepath):
"""Parse the strains file"""
with open(filepath, encoding="utf8") as strains_file:
reader = csv.DictReader(
strains_file,
fieldnames=[
header.strip() for header
in strains_file.readline().split("\t")],
delimiter="\t")
for row in reader:
yield {
key: (value if value != "\\N" else None)
for key, value in row.items()
}
def __parse_header(line, strains):
return valid_header(
set(strains),
tuple(header.strip() for header in line.split("\t")))
def __parse_average_line(line):
return (line[0],) + tuple(avg.valid_value(field) for field in line[1:])
def __parse_standard_error_line(line):
return (line[0],) + tuple(se.valid_value(field) for field in line[1:])
LINE_PARSERS = {
FileType.AVERAGE: __parse_average_line,
FileType.STANDARD_ERROR: __parse_standard_error_line
}
def strain_names(strains):
"""Retrieve a complete list of the names of the strains"""
def __extract_strain_names(acc, strain):
return acc + tuple(
item for item in (strain["Name"], strain["Name2"])
if (item is not None and item != ""))
return reduce(__extract_strain_names, strains, tuple())
def parse_file(filepath: str, filetype: FileType, strains: list):
"""Parse the given file"""
seek_pos = 0
try:
with open(filepath, encoding="utf-8") as input_file:
for line_number, line in enumerate(input_file):
if line_number == 0:
yield __parse_header(line, strains)
seek_pos = seek_pos + len(line)
yield LINE_PARSERS[filetype](
tuple(field.strip() for field in line.split("\t")))
seek_pos = seek_pos + len(line)
except (DuplicateHeader, InvalidCellValue, InvalidHeaderValue) as err:
raise ParseError({
"filepath": filepath,
"filetype": filetype,
"position": seek_pos,
"line_number": line_number,
"error": err
}) from err
|