aboutsummaryrefslogtreecommitdiff
path: root/quality_control
diff options
context:
space:
mode:
Diffstat (limited to 'quality_control')
-rw-r--r--quality_control/average.py12
-rw-r--r--quality_control/errors.py23
-rw-r--r--quality_control/headers.py33
-rw-r--r--quality_control/parsing.py141
-rw-r--r--quality_control/standard_error.py15
5 files changed, 45 insertions, 179 deletions
diff --git a/quality_control/average.py b/quality_control/average.py
index 9ca16a9..47a04d9 100644
--- a/quality_control/average.py
+++ b/quality_control/average.py
@@ -3,19 +3,11 @@ import re
from typing import Union
from .errors import InvalidValue
-from .errors import InvalidCellValue
-
-def valid_value(val):
- """Checks whether `val` is a valid value for averages"""
- if re.search(r"^[0-9]+\.[0-9]{3}$", val):
- return float(val)
- raise InvalidCellValue(
- f"Invalid value '{val}'. "
- "Expected string representing a number with exactly three decimal "
- "places.")
def invalid_value(line_number: int, column_number: int, val: str) -> Union[
InvalidValue, None]:
+ """Return an `InvalidValue` object if `val` is not a valid "averages"
+ value."""
if re.search(r"^[0-9]+\.[0-9]{3}$", val):
return None
return InvalidValue(
diff --git a/quality_control/errors.py b/quality_control/errors.py
index 4206b5b..678fe09 100644
--- a/quality_control/errors.py
+++ b/quality_control/errors.py
@@ -2,29 +2,6 @@
from collections import namedtuple
-class InvalidCellValue(Exception):
- """Raised when a function encounters an invalid value"""
-
- def __init__(self, *args):
- super().__init__(*args)
-
-class InvalidHeaderValue(Exception):
- """Raised when a header contains values not in the reference file."""
-
- def __init__(self, *args):
- super().__init__(*args)
-
-class DuplicateHeader(Exception):
- """Raised when a header contains 2 similar headers."""
-
- def __init__(self, *args):
- super().__init__(*args)
-
-class ParseError(Exception):
- """Raised if any of the above exceptions are raised"""
- def __init__(self, *args):
- super().__init__(*args)
-
InvalidValue = namedtuple(
"InvalidValue", ("line", "column", "value", "message"))
diff --git a/quality_control/headers.py b/quality_control/headers.py
index 3b1e0e6..79d7e43 100644
--- a/quality_control/headers.py
+++ b/quality_control/headers.py
@@ -4,41 +4,22 @@ from functools import reduce
from typing import Union, Tuple, Sequence
from quality_control.errors import InvalidValue, DuplicateHeading
-from quality_control.errors import DuplicateHeader, InvalidHeaderValue
-
-def valid_header(strains, headers):
- "Return the valid headers with reference to strains or throw an error"
- if not bool(headers[1:]):
- raise InvalidHeaderValue(
- "The header MUST contain at least 2 columns")
- invalid_headers = tuple(
- header for header in headers[1:] if header not in strains)
- if invalid_headers:
- raise InvalidHeaderValue(
- *(f"'{header}' not a valid strain." for header in invalid_headers))
-
- unique_headers = set(headers)
- if len(unique_headers) != len(headers):
- repeated = (
- (header, headers.count(header))
- for header in unique_headers if headers.count(header) > 1)
- raise DuplicateHeader(*(
- f"'{header}' is present in the header row {times} times."
- for header, times in repeated))
-
- return headers
-
def invalid_header(
line_number: int, headers: Sequence[str]) -> Union[InvalidValue, None]:
+ """Return an `InvalidValue` object if the header row has less than 2
+ items."""
if len(headers) < 2:
return InvalidValue(
line_number, 0, "<TAB>".join(headers),
"The header MUST contain at least 2 columns")
+ return None
def invalid_headings(
line_number: int, strains: Sequence[str],
headings: Sequence[str]) -> Union[Tuple[InvalidValue, ...], None]:
+ """Return tuple of `InvalidValue` objects for each error found for every
+ column heading."""
return tuple(
InvalidValue(
line_number, col, header, f"'{header}' not a valid strain.")
@@ -47,13 +28,15 @@ def invalid_headings(
def duplicate_headings(
line_number: int, headers: Sequence[str]) -> Union[InvalidValue, None]:
+ """Return a tuple of `DuplicateHeading` objects for each column heading that
+ is a duplicate of another column heading."""
def __update_columns__(acc, item):
if item[1] in acc.keys():
return {**acc, item[1]: acc[item[1]] + (item[0],)}
return {**acc, item[1]: (item[0],)}
repeated = {
heading: columns for heading, columns in
- reduce(__update_columns__, enumerate(headers, start=1), dict()).items()
+ reduce(__update_columns__, enumerate(headers, start=1), {}).items()
if len(columns) > 1
}
return tuple(
diff --git a/quality_control/parsing.py b/quality_control/parsing.py
index 70a85ed..655b98a 100644
--- a/quality_control/parsing.py
+++ b/quality_control/parsing.py
@@ -1,40 +1,22 @@
"""Module handling the high-level parsing of the files"""
-import csv
+import os
import collections
from enum import Enum
-from functools import reduce, partial
-from typing import Iterable, Generator
+from functools import partial
+from typing import Union, Iterable, Generator, Callable
import quality_control.average as avg
import quality_control.standard_error as se
-from quality_control.headers import valid_header
+from quality_control.errors import InvalidValue
from quality_control.headers import (
invalid_header, invalid_headings, duplicate_headings)
-from quality_control.errors import (
- ParseError, DuplicateHeader, InvalidCellValue, InvalidHeaderValue)
class FileType(Enum):
"""Enumerate the expected file types"""
AVERAGE = 1
STANDARD_ERROR = 2
-def __parse_header(line, strains):
- return valid_header(
- set(strains),
- tuple(header.strip() for header in line.split("\t")))
-
-def __parse_average_line(line):
- return (line[0],) + tuple(avg.valid_value(field) for field in line[1:])
-
-def __parse_standard_error_line(line):
- return (line[0],) + tuple(se.valid_value(field) for field in line[1:])
-
-LINE_PARSERS = {
- FileType.AVERAGE: __parse_average_line,
- FileType.STANDARD_ERROR: __parse_standard_error_line
-}
-
def strain_names(filepath):
"""Retrieve the strains names from given file"""
strains = set()
@@ -51,90 +33,22 @@ def strain_names(filepath):
return strains
-def parse_file(filepath: str, filetype: FileType, strains: list):
- """Parse the given file"""
- seek_pos = 0
- try:
- with open(filepath, encoding="utf-8") as input_file:
- for line_number, line in enumerate(input_file):
- if line_number == 0:
- yield __parse_header(line, strains), seek_pos + len(line)
- seek_pos = seek_pos + len(line)
- continue
-
- yield (
- LINE_PARSERS[filetype](
- tuple(field.strip() for field in line.split("\t"))),
- seek_pos + len(line))
- seek_pos = seek_pos + len(line)
- except (DuplicateHeader, InvalidCellValue, InvalidHeaderValue) as err:
- raise ParseError({
- "filepath": filepath,
- "filetype": filetype,
- "position": seek_pos,
- "line_number": line_number,
- "error": err
- }) from err
-
-def parse_errors(filepath: str, filetype: FileType, strains: list,
- seek_pos: int = 0) -> Generator:
- """Retrieve ALL the parse errors"""
- assert seek_pos >= 0, "The seek position must be at least zero (0)"
-
- def __error_type(error):
- """Return a nicer string representatiton for the error type."""
- if isinstance(error, DuplicateHeader):
- return "Duplicated Headers"
- if isinstance(error, InvalidCellValue):
- return "Invalid Value"
- if isinstance(error, InvalidHeaderValue):
- return "Invalid Strain"
-
- def __errors(filepath, filetype, strains, seek_pos):
- """Return only the errors as values"""
- with open(filepath, encoding="utf-8") as input_file:
- ## TODO: Seek the file to the given seek position
- for line_number, line in enumerate(input_file):
- if seek_pos > 0:
- input_file.seek(seek_pos, 0)
- try:
- if seek_pos == 0 and line_number == 0:
- header = __parse_header(line, strains)
- yield None
- seek_pos = seek_pos + len(line)
- continue
-
- parsed_line = LINE_PARSERS[filetype](
- tuple(field.strip() for field in line.split("\t")))
- yield None
- seek_pos = seek_pos + len(line)
- except (DuplicateHeader, InvalidCellValue, InvalidHeaderValue) as err:
- yield {
- "filepath": filepath,
- "filetype": filetype,
- "position": seek_pos,
- "line_number": line_number,
- "error": __error_type(err),
- "message": err.args
- }
- seek_pos = seek_pos + len(line)
-
- return (
- error for error in __errors(filepath, filetype, strains, seek_pos)
- if error is not None)
-
def header_errors(line_number, fields, strains):
+ """Gather all header row errors."""
return (
(invalid_header(line_number, fields),) +
invalid_headings(line_number, strains, fields[1:]) +
duplicate_headings(line_number, fields))
def empty_value(line_number, column_number, value):
+ """Check for empty field values."""
if value == "":
return InvalidValue(
line_number, column_number, value, "Empty value for column")
+ return None
def average_errors(line_number, fields):
+ """Gather all errors for a line in a averages file."""
return (
(empty_value(line_number, 1, fields[0]),) +
tuple(
@@ -142,6 +56,7 @@ def average_errors(line_number, fields):
for field in enumerate(fields[1:], start=2)))
def se_errors(line_number, fields):
+ """Gather all errors for a line in a standard-errors file."""
return (
(empty_value(line_number, 1, fields[0]),) +
tuple(
@@ -149,7 +64,8 @@ def se_errors(line_number, fields):
for field in enumerate(fields[1:], start=2)))
def collect_errors(
- filepath: str, filetype: FileType, strains: list, count: int = 10) -> Generator:
+ filepath: str, filetype: FileType, strains: list,
+ updater: Union[Callable, None] = None) -> Generator:
"""Run checks against file and collect all the errors"""
errors = tuple()
def __process_errors__(line_number, line, error_checker_fn, errors = tuple()):
@@ -162,30 +78,37 @@ def collect_errors(
return errors + tuple(error for error in errs if error is not None)
return errors + (errs,)
+ filesize = os.stat(filepath).st_size
+ processed_size = 0
with open(filepath, encoding="utf-8") as input_file:
for line_number, line in enumerate(input_file, start=1):
if line_number == 1:
- errors = __process_errors__(
- line_number, line, partial(header_errors, strains=strains),
- errors)
- if line_number != 1:
- errors = __process_errors__(
- line_number, line, (
- average_errors if filetype == FileType.AVERAGE
- else se_errors),
- errors)
+ for error in __process_errors__(
+ line_number, line, partial(header_errors, strains=strains),
+ errors):
+ yield error
- if count > 0 and len(errors) >= count:
- break
-
- return errors[0:count]
+ if line_number != 1:
+ for error in __process_errors__(
+ line_number, line, (
+ average_errors if filetype == FileType.AVERAGE
+ else se_errors),
+ errors):
+ yield error
+
+ processed_size = processed_size + len(line)
+ if updater:
+ updater({
+ "line_number": line_number,
+ "percent": (processed_size/filesize) * 100
+ })
def take(iterable: Iterable, num: int) -> list:
"""Take at most `num` items from `iterable`."""
iterator = iter(iterable)
items = []
try:
- for i in range(0, num):
+ for i in range(0, num): # pylint: disable=[unused-variable]
items.append(next(iterator))
return items
diff --git a/quality_control/standard_error.py b/quality_control/standard_error.py
index 022cc9b..c866993 100644
--- a/quality_control/standard_error.py
+++ b/quality_control/standard_error.py
@@ -3,19 +3,10 @@ import re
from typing import Union
from .errors import InvalidValue
-from .errors import InvalidCellValue
-def valid_value(val):
- """Checks whether `val` is a valid value for standard errors"""
- if re.search(r"^[0-9]+\.[0-9]{6,}$", val):
- return float(val)
- raise InvalidCellValue(
- f"Invalid value '{val}'. "
- "Expected string representing a number with at least six decimal "
- "places.")
-
-def invalid_value(line_number: int, column_number: int, val: str) -> Union[
- InvalidValue, None]:
+def invalid_value(
+ line_number: int, column_number: int, val: str) -> Union[
+ InvalidValue, None]:
"""
Returns a `quality_control.errors.InvalidValue` object in the case where
`val` is not a valid input for standard error files, otherwise, it returns