aboutsummaryrefslogtreecommitdiff
path: root/quality_control
diff options
context:
space:
mode:
Diffstat (limited to 'quality_control')
-rw-r--r--quality_control/parsing.py62
1 files changed, 61 insertions, 1 deletions
diff --git a/quality_control/parsing.py b/quality_control/parsing.py
index 436c90c..70a85ed 100644
--- a/quality_control/parsing.py
+++ b/quality_control/parsing.py
@@ -1,13 +1,16 @@
"""Module handling the high-level parsing of the files"""
import csv
+import collections
from enum import Enum
-from functools import reduce
+from functools import reduce, partial
from typing import Iterable, Generator
import quality_control.average as avg
import quality_control.standard_error as se
from quality_control.headers import valid_header
+from quality_control.headers import (
+ invalid_header, invalid_headings, duplicate_headings)
from quality_control.errors import (
ParseError, DuplicateHeader, InvalidCellValue, InvalidHeaderValue)
@@ -120,6 +123,63 @@ def parse_errors(filepath: str, filetype: FileType, strains: list,
error for error in __errors(filepath, filetype, strains, seek_pos)
if error is not None)
+def header_errors(line_number, fields, strains):
+ return (
+ (invalid_header(line_number, fields),) +
+ invalid_headings(line_number, strains, fields[1:]) +
+ duplicate_headings(line_number, fields))
+
+def empty_value(line_number, column_number, value):
+ if value == "":
+ return InvalidValue(
+ line_number, column_number, value, "Empty value for column")
+
+def average_errors(line_number, fields):
+ return (
+ (empty_value(line_number, 1, fields[0]),) +
+ tuple(
+ avg.invalid_value(line_number, *field)
+ for field in enumerate(fields[1:], start=2)))
+
+def se_errors(line_number, fields):
+ return (
+ (empty_value(line_number, 1, fields[0]),) +
+ tuple(
+ se.invalid_value(line_number, *field)
+ for field in enumerate(fields[1:], start=2)))
+
+def collect_errors(
+ filepath: str, filetype: FileType, strains: list, count: int = 10) -> Generator:
+ """Run checks against file and collect all the errors"""
+ errors = tuple()
+ def __process_errors__(line_number, line, error_checker_fn, errors = tuple()):
+ errs = error_checker_fn(
+ line_number,
+ tuple(field.strip() for field in line.split("\t")))
+ if errs is None:
+ return errors
+ if isinstance(errs, collections.Sequence):
+ return errors + tuple(error for error in errs if error is not None)
+ return errors + (errs,)
+
+ with open(filepath, encoding="utf-8") as input_file:
+ for line_number, line in enumerate(input_file, start=1):
+ if line_number == 1:
+ errors = __process_errors__(
+ line_number, line, partial(header_errors, strains=strains),
+ errors)
+ if line_number != 1:
+ errors = __process_errors__(
+ line_number, line, (
+ average_errors if filetype == FileType.AVERAGE
+ else se_errors),
+ errors)
+
+ if count > 0 and len(errors) >= count:
+ break
+
+ return errors[0:count]
+
def take(iterable: Iterable, num: int) -> list:
"""Take at most `num` items from `iterable`."""
iterator = iter(iterable)