1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
"""Module handling the high-level parsing of the files"""
import csv
from enum import Enum
from functools import reduce
from typing import Iterator, Generator
import quality_control.average as avg
import quality_control.standard_error as se
from quality_control.headers import valid_header
from quality_control.errors import (
ParseError, DuplicateHeader, InvalidCellValue, InvalidHeaderValue)
class FileType(Enum):
"""Enumerate the expected file types"""
AVERAGE = 1
STANDARD_ERROR = 2
def parse_strains(filepath):
"""Parse the strains file"""
with open(filepath, encoding="utf8") as strains_file:
reader = csv.DictReader(
strains_file,
fieldnames=[
header.strip() for header
in strains_file.readline().split("\t")],
delimiter="\t")
for row in reader:
yield {
key: (value if value != "\\N" else None)
for key, value in row.items()
}
def __parse_header(line, strains):
return valid_header(
set(strains),
tuple(header.strip() for header in line.split("\t")))
def __parse_average_line(line):
return (line[0],) + tuple(avg.valid_value(field) for field in line[1:])
def __parse_standard_error_line(line):
return (line[0],) + tuple(se.valid_value(field) for field in line[1:])
LINE_PARSERS = {
FileType.AVERAGE: __parse_average_line,
FileType.STANDARD_ERROR: __parse_standard_error_line
}
def strain_names(strains):
"""Retrieve a complete list of the names of the strains"""
def __extract_strain_names(acc, strain):
return acc + tuple(
item for item in (strain["Name"], strain["Name2"])
if (item is not None and item != ""))
return reduce(__extract_strain_names, strains, tuple())
def parse_file(filepath: str, filetype: FileType, strains: list):
"""Parse the given file"""
seek_pos = 0
try:
with open(filepath, encoding="utf-8") as input_file:
for line_number, line in enumerate(input_file):
if line_number == 0:
yield __parse_header(line, strains)
seek_pos = seek_pos + len(line)
continue
yield LINE_PARSERS[filetype](
tuple(field.strip() for field in line.split("\t")))
seek_pos = seek_pos + len(line)
except (DuplicateHeader, InvalidCellValue, InvalidHeaderValue) as err:
raise ParseError({
"filepath": filepath,
"filetype": filetype,
"position": seek_pos,
"line_number": line_number,
"error": err
}) from err
def parse_errors(filepath: str, filetype: FileType, strains: list,
seek_pos: int = 0) -> Generator:
"""Retrieve ALL the parse errors"""
print(f"seek_pos: {seek_pos}, {type(seek_pos)}")
assert seek_pos >= 0, "The seek position must be at least zero (0)"
def __error_type(error):
"""Return a nicer string representatiton for the error type."""
if isinstance(error, DuplicateHeader):
return "Duplicated Headers"
if isinstance(error, InvalidCellValue):
return "Invalid Value"
if isinstance(error, InvalidHeaderValue):
return "Invalid Strain"
def __errors(filepath, filetype, strains, seek_pos):
"""Return only the errors as values"""
with open(filepath, encoding="utf-8") as input_file:
## TODO: Seek the file to the given seek position
for line_number, line in enumerate(input_file):
if seek_pos > 0:
input_file.seek(seek_pos, 0)
try:
if seek_pos == 0 and line_number == 0:
header = __parse_header(line, strains)
yield None
seek_pos = seek_pos + len(line)
continue
parsed_line = LINE_PARSERS[filetype](
tuple(field.strip() for field in line.split("\t")))
yield None
seek_pos = seek_pos + len(line)
except (DuplicateHeader, InvalidCellValue, InvalidHeaderValue) as err:
yield {
"filepath": filepath,
"filetype": filetype,
"position": seek_pos,
"line_number": line_number,
"error": __error_type(err),
"message": err.args
}
return (
error for error in __errors(filepath, filetype, strains, seek_pos)
if error is not None)
|