aboutsummaryrefslogtreecommitdiff
path: root/quality_control/parsing.py
blob: 6e5bb8f6b8e23d781d0b50d49dfc5cc4b7366015 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
"""Module handling the high-level parsing of the files"""

import csv
from enum import Enum
from functools import reduce

import quality_control.average as avg
import quality_control.standard_error as se
from quality_control.headers import valid_header
from quality_control.errors import (
    ParseError, DuplicateHeader, InvalidCellValue, InvalidHeaderValue)

class FileType(Enum):
    """Enumerate the expected file types"""
    AVERAGE = 1
    STANDARD_ERROR = 2

def parse_strains(filepath):
    """Parse the strains file"""
    with open(filepath, encoding="utf8") as strains_file:
        reader = csv.DictReader(
            strains_file,
            fieldnames=[
                header.strip() for header
                in strains_file.readline().split("\t")],
            delimiter="\t")
        for row in reader:
            yield {
                key: (value if value != "\\N" else None)
                for key, value in row.items()
            }

def __parse_header(line, strains):
    return valid_header(
        set(strains),
        tuple(header.strip() for header in line.split("\t")))

def __parse_average_line(line):
    return (line[0],) + tuple(avg.valid_value(field) for field in line[1:])

def __parse_standard_error_line(line):
    return (line[0],) + tuple(se.valid_value(field) for field in line[1:])

LINE_PARSERS = {
    FileType.AVERAGE: __parse_average_line,
    FileType.STANDARD_ERROR: __parse_standard_error_line
}

def strain_names(strains):
    """Retrieve a complete list of the names of the strains"""
    def __extract_strain_names(acc, strain):
        return acc + tuple(
            item for item in (strain["Name"], strain["Name2"])
            if (item is not None and item != ""))
    return reduce(__extract_strain_names, strains, tuple())

def parse_file(filepath: str, filetype: FileType, strains: list):
    """Parse the given file"""
    seek_pos = 0
    try:
        with open(filepath, encoding="utf-8") as input_file:
            for line_number, line in enumerate(input_file):
                if line_number == 0:
                    yield __parse_header(line, strains)
                    seek_pos = seek_pos + len(line)

                yield LINE_PARSERS[filetype](
                    tuple(field.strip() for field in line.split("\t")))
                seek_pos = seek_pos + len(line)
    except (DuplicateHeader, InvalidCellValue, InvalidHeaderValue) as err:
        raise ParseError({
            "filepath": filepath,
            "filetype": filetype,
            "position": seek_pos,
            "line_number": line_number,
            "error": err
        }) from err