1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
"""Validate the headers"""
from functools import reduce
from typing import Union, Tuple, Sequence
from quality_control.errors import InvalidValue, DuplicateHeading
from quality_control.errors import DuplicateHeader, InvalidHeaderValue
def valid_header(strains, headers):
"Return the valid headers with reference to strains or throw an error"
if not bool(headers[1:]):
raise InvalidHeaderValue(
"The header MUST contain at least 2 columns")
invalid_headers = tuple(
header for header in headers[1:] if header not in strains)
if invalid_headers:
raise InvalidHeaderValue(
*(f"'{header}' not a valid strain." for header in invalid_headers))
unique_headers = set(headers)
if len(unique_headers) != len(headers):
repeated = (
(header, headers.count(header))
for header in unique_headers if headers.count(header) > 1)
raise DuplicateHeader(*(
f"'{header}' is present in the header row {times} times."
for header, times in repeated))
return headers
def invalid_header(
line_number: int, headers: Sequence[str]) -> Union[InvalidValue, None]:
if len(headers) < 2:
return InvalidValue(
line_number, 0, "<TAB>".join(headers),
"The header MUST contain at least 2 columns")
def invalid_headings(
line_number: int, strains: Sequence[str],
headings: Sequence[str]) -> Union[Tuple[InvalidValue, ...], None]:
return tuple(
InvalidValue(
line_number, col, header, f"'{header}' not a valid strain.")
for col, header in
enumerate(headings, start=2) if header not in strains)
def duplicate_headings(
line_number: int, headers: Sequence[str]) -> Union[InvalidValue, None]:
def __update_columns__(acc, item):
if item[1] in acc.keys():
return {**acc, item[1]: acc[item[1]] + (item[0],)}
return {**acc, item[1]: (item[0],)}
repeated = {
heading: columns for heading, columns in
reduce(__update_columns__, enumerate(headers, start=1), dict()).items()
if len(columns) > 1
}
return tuple(
DuplicateHeading(
line_number, heading, columns, (
f"Heading '{heading}', is repeated in columns "
f"{','.join(str(i) for i in columns)}"))
for heading, columns in repeated.items())
|