1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
"""Module handling the high-level parsing of the files"""
import csv
from enum import Enum
from functools import reduce
from typing import Iterable, Generator
import quality_control.average as avg
import quality_control.standard_error as se
from quality_control.headers import valid_header
from quality_control.errors import (
ParseError, DuplicateHeader, InvalidCellValue, InvalidHeaderValue)
class FileType(Enum):
"""Enumerate the expected file types"""
AVERAGE = 1
STANDARD_ERROR = 2
def __parse_header(line, strains):
return valid_header(
set(strains),
tuple(header.strip() for header in line.split("\t")))
def __parse_average_line(line):
return (line[0],) + tuple(avg.valid_value(field) for field in line[1:])
def __parse_standard_error_line(line):
return (line[0],) + tuple(se.valid_value(field) for field in line[1:])
LINE_PARSERS = {
FileType.AVERAGE: __parse_average_line,
FileType.STANDARD_ERROR: __parse_standard_error_line
}
def strain_names(filepath):
"""Retrieve the strains names from given file"""
strains = set()
with open(filepath, encoding="utf8") as strains_file:
for idx, line in enumerate(strains_file.readlines()):
if idx > 0:
parts = line.split()
for name in (parts[1], parts[2]):
strains.add(name.strip())
if len(parts) >= 6:
alias = parts[5].strip()
if alias != "" and alias not in ("P", "\\N"):
strains.add(alias)
return strains
def parse_file(filepath: str, filetype: FileType, strains: list):
"""Parse the given file"""
seek_pos = 0
try:
with open(filepath, encoding="utf-8") as input_file:
for line_number, line in enumerate(input_file):
if line_number == 0:
yield __parse_header(line, strains), seek_pos + len(line)
seek_pos = seek_pos + len(line)
continue
yield (
LINE_PARSERS[filetype](
tuple(field.strip() for field in line.split("\t"))),
seek_pos + len(line))
seek_pos = seek_pos + len(line)
except (DuplicateHeader, InvalidCellValue, InvalidHeaderValue) as err:
raise ParseError({
"filepath": filepath,
"filetype": filetype,
"position": seek_pos,
"line_number": line_number,
"error": err
}) from err
def parse_errors(filepath: str, filetype: FileType, strains: list,
seek_pos: int = 0) -> Generator:
"""Retrieve ALL the parse errors"""
assert seek_pos >= 0, "The seek position must be at least zero (0)"
def __error_type(error):
"""Return a nicer string representatiton for the error type."""
if isinstance(error, DuplicateHeader):
return "Duplicated Headers"
if isinstance(error, InvalidCellValue):
return "Invalid Value"
if isinstance(error, InvalidHeaderValue):
return "Invalid Strain"
def __errors(filepath, filetype, strains, seek_pos):
"""Return only the errors as values"""
with open(filepath, encoding="utf-8") as input_file:
## TODO: Seek the file to the given seek position
for line_number, line in enumerate(input_file):
if seek_pos > 0:
input_file.seek(seek_pos, 0)
try:
if seek_pos == 0 and line_number == 0:
header = __parse_header(line, strains)
yield None
seek_pos = seek_pos + len(line)
continue
parsed_line = LINE_PARSERS[filetype](
tuple(field.strip() for field in line.split("\t")))
yield None
seek_pos = seek_pos + len(line)
except (DuplicateHeader, InvalidCellValue, InvalidHeaderValue) as err:
yield {
"filepath": filepath,
"filetype": filetype,
"position": seek_pos,
"line_number": line_number,
"error": __error_type(err),
"message": err.args
}
seek_pos = seek_pos + len(line)
return (
error for error in __errors(filepath, filetype, strains, seek_pos)
if error is not None)
def take(iterable: Iterable, num: int) -> list:
"""Take at most `num` items from `iterable`."""
iterator = iter(iterable)
items = []
try:
for i in range(0, num):
items.append(next(iterator))
return items
except StopIteration:
return items
|