aboutsummaryrefslogtreecommitdiff
path: root/quality_control/parsing.py
diff options
context:
space:
mode:
Diffstat (limited to 'quality_control/parsing.py')
-rw-r--r--quality_control/parsing.py60
1 files changed, 57 insertions, 3 deletions
diff --git a/quality_control/parsing.py b/quality_control/parsing.py
index 52124f9..eda9181 100644
--- a/quality_control/parsing.py
+++ b/quality_control/parsing.py
@@ -1,12 +1,66 @@
+import csv
+
from enum import Enum
import quality_control.average as avg
import quality_control.standard_error as se
-from quality_control.errors import InvalidCellValue, InvalidHeaderValue
+from quality_control.headers import valid_header
+from quality_control.errors import (
+ ParseError, InvalidCellValue, InvalidHeaderValue)
class FileType(Enum):
AVERAGE = 1
STANDARD_ERROR = 2
-def parse_file(filepath: str, filetype: FileType):
- pass
+def parse_strains(filepath):
+ with open(filepath) as strains_file:
+ reader = csv.DictReader(
+ strains_file,
+ fieldnames=[
+ header.strip() for header
+ in strains_file.readline().split("\t")],
+ delimiter="\t")
+ for row in reader:
+ yield {
+ key: (value if value != "\\N" else None)
+ for key, value in row.items()
+ }
+
+def __parse_header(line, strains):
+ return valid_header(
+ strains,
+ tuple(header.strip() for header in line.split("\t")))
+
+def __parse_average_line(line):
+ return (line[0],) + tuple(avg.valid_value(field) for field in line[1:])
+
+def __parse_standard_error_line(line):
+ return (line[0],) + tuple(se.valid_value(field) for field in line[1:])
+
+LINE_PARSERS = {
+ FileType.AVERAGE: __parse_average_line,
+ FileType.STANDARD_ERROR: __parse_standard_error_line
+}
+
+def parse_file(filepath: str, filetype: FileType, strains_filepath: str):
+ seek_pos = 0
+ try:
+ with open(filepath, encoding="utf-8") as input_file:
+ for line_number, line in enumerate(input_file):
+ if line_number == 0:
+ yield __parse_header(
+ line,
+ tuple(strain["Name"] for strain
+ in parse_strains(strains_filepath)))
+ seek_pos = seek_pos + len(line)
+
+ yield LINE_PARSERS[filetype](
+ tuple(field.strip() for field in line.split("\t")))
+ seek_pos = seek_pos + len(line)
+ except (InvalidCellValue, InvalidHeaderValue) as err:
+ raise ParseError({
+ "filepath": filepath,
+ "filetype": filetype,
+ "position": seek_pos,
+ "line_number": line_number
+ })