aboutsummaryrefslogtreecommitdiff
path: root/quality_control/headers.py
blob: a5a5065d36d2dcce71acea1a25a630ceca4694a0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
"""Validate the headers"""

from functools import reduce
from typing import Union, Tuple, Sequence

from quality_control.errors import InvalidValue, DuplicateHeading
from quality_control.errors import DuplicateHeader, InvalidHeaderValue

def valid_header(strains, headers):
    "Return the valid headers with reference to strains or throw an error"
    if not bool(headers[1:]):
        raise InvalidHeaderValue(
        "The header MUST contain at least 2 columns")
    invalid_headers = tuple(
        header for header in headers[1:] if header not in strains)
    if invalid_headers:
        raise InvalidHeaderValue(
            *(f"'{header}' not a valid strain." for header in invalid_headers))

    unique_headers = set(headers)
    if len(unique_headers) != len(headers):
        repeated = (
            (header, headers.count(header))
            for header in unique_headers if headers.count(header) > 1)
        raise DuplicateHeader(*(
            f"'{header}' is present in the header row {times} times."
            for header, times in repeated))

    return headers


def invalid_header(
        line_number: int, headers: Sequence[str]) -> Union[InvalidValue, None]:
    if len(headers) < 2:
        return InvalidValue(
            line_number, 0, "<TAB>".join(headers),
            "The header MUST contain at least 2 columns")

def invalid_headings(
        line_number: int, strains: Sequence[str],
        headings: Sequence[str]) -> Union[Tuple[InvalidValue, ...], None]:
    return tuple(
        InvalidValue(
            line_number, col, header, f"'{header}' not a valid strain.")
        for col, header in
        enumerate(headings, start=2) if header not in strains)

def duplicate_headings(
        line_number: int, headers: Sequence[str]) -> Union[InvalidValue, None]:
    def __update_columns__(acc, item):
        if item[1] in acc.keys():
            return {**acc, item[1]: acc[item[1]] + (item[0],)}
        return {**acc, item[1]: (item[0],)}
    repeated = {
        heading: columns for heading, columns in
        reduce(__update_columns__, enumerate(headers, start=1), dict()).items()
        if len(columns) > 1
    }
    return tuple(
        DuplicateHeading(
            line_number, heading, columns, (
                f"Heading '{heading}', is repeated in columns "
                f"{','.join(str(i) for i in columns)}"))
        for heading, columns in repeated.items())