aboutsummaryrefslogtreecommitdiff
path: root/quality_control/parsing.py
blob: eda918127b99afac0eef1a8fec37299016dddca9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import csv

from enum import Enum

import quality_control.average as avg
import quality_control.standard_error as se
from quality_control.headers import valid_header
from quality_control.errors import (
    ParseError, InvalidCellValue, InvalidHeaderValue)

class FileType(Enum):
    AVERAGE = 1
    STANDARD_ERROR = 2

def parse_strains(filepath):
    with open(filepath) as strains_file:
        reader = csv.DictReader(
            strains_file,
            fieldnames=[
                header.strip() for header
                in strains_file.readline().split("\t")],
            delimiter="\t")
        for row in reader:
            yield {
                key: (value if value != "\\N" else None)
                for key, value in row.items()
            }

def __parse_header(line, strains):
    return valid_header(
        strains,
        tuple(header.strip() for header in line.split("\t")))

def __parse_average_line(line):
    return (line[0],) + tuple(avg.valid_value(field) for field in line[1:])

def __parse_standard_error_line(line):
    return (line[0],) + tuple(se.valid_value(field) for field in line[1:])

LINE_PARSERS = {
    FileType.AVERAGE: __parse_average_line,
    FileType.STANDARD_ERROR: __parse_standard_error_line
}

def parse_file(filepath: str, filetype: FileType, strains_filepath: str):
    seek_pos = 0
    try:
        with open(filepath, encoding="utf-8") as input_file:
            for line_number, line in enumerate(input_file):
                if line_number == 0:
                    yield __parse_header(
                        line,
                        tuple(strain["Name"] for strain
                              in parse_strains(strains_filepath)))
                    seek_pos = seek_pos + len(line)

                yield LINE_PARSERS[filetype](
                    tuple(field.strip() for field in line.split("\t")))
                seek_pos = seek_pos + len(line)
    except (InvalidCellValue, InvalidHeaderValue) as err:
        raise ParseError({
            "filepath": filepath,
            "filetype": filetype,
            "position": seek_pos,
            "line_number": line_number
        })