aboutsummaryrefslogtreecommitdiff
path: root/r_qtl/r_qtl2_qc.py
diff options
context:
space:
mode:
Diffstat (limited to 'r_qtl/r_qtl2_qc.py')
-rw-r--r--r_qtl/r_qtl2_qc.py62
1 files changed, 51 insertions, 11 deletions
diff --git a/r_qtl/r_qtl2_qc.py b/r_qtl/r_qtl2_qc.py
index be1eac4..2d9e9a8 100644
--- a/r_qtl/r_qtl2_qc.py
+++ b/r_qtl/r_qtl2_qc.py
@@ -1,12 +1,14 @@
"""Quality control checks for R/qtl2 data bundles."""
-from zipfile import ZipFile
+from pathlib import Path
from functools import reduce, partial
+from zipfile import ZipFile, is_zipfile
from typing import Union, Iterator, Optional, Callable
-from r_qtl import errors as rqe
from r_qtl import r_qtl2 as rqtl2
+from r_qtl import exceptions as rqe
from r_qtl.r_qtl2 import FILE_TYPES
from r_qtl.fileerrors import MissingFile
+from r_qtl.exceptions import InvalidFormat
from quality_control.errors import InvalidValue
from quality_control.checks import decimal_points_error
@@ -39,11 +41,10 @@ def bundle_files_list(cdata: dict) -> tuple[tuple[str, str], ...]:
return fileslist
-def missing_files(zfile: ZipFile) -> tuple[tuple[str, str], ...]:
- """
- Retrieve a list of files listed in the control file that do not exist in the
- bundle.
- """
+
+def __missing_from_zipfile__(
+ zfile: ZipFile, cdata: dict) -> tuple[tuple[str, str], ...]:
+ """Check for missing files from a still-compressed zip file."""
def __missing_p__(filedetails: tuple[str, str]):
_cfkey, thefile = filedetails
try:
@@ -52,14 +53,53 @@ def missing_files(zfile: ZipFile) -> tuple[tuple[str, str], ...]:
except KeyError:
return True
- return tuple(afile for afile in bundle_files_list(rqtl2.control_data(zfile))
+ return tuple(afile for afile in bundle_files_list(cdata)
if __missing_p__(afile))
-def validate_bundle(zfile: ZipFile):
+
+def __missing_from_dirpath__(
+ dirpath: Path, cdata: dict) -> tuple[tuple[str, str], ...]:
+ """Check for missing files from an extracted bundle."""
+ allfiles = tuple(_file.name for _file in dirpath.iterdir())
+ return tuple(afile for afile in bundle_files_list(cdata)
+ if afile[1] not in allfiles)
+
+
+def missing_files(bundlesrc: Union[Path, ZipFile]) -> tuple[tuple[str, str], ...]:
+ """
+ Retrieve a list of files listed in the control file that do not exist in the
+ bundle.
+
+ Parameters
+ ----------
+ bundlesrc: Path object of ZipFile object: This is the bundle under check.
+
+ Returns
+ -------
+ A tuple of names listed in the control file that do not exist in the bundle.
+
+ Raises
+ ------
+ r_qtl.exceptions.InvalidFormat
+ """
+ cdata = rqtl2.control_data(bundlesrc)
+ if isinstance(bundlesrc, ZipFile):
+ return __missing_from_zipfile__(bundlesrc, cdata)
+ if isinstance(bundlesrc, Path):
+ if is_zipfile(bundlesrc):
+ return __missing_from_zipfile__(ZipFile(bundlesrc), cdata)
+ if bundlesrc.is_dir():
+ return __missing_from_dirpath__(bundlesrc, cdata)
+ raise InvalidFormat(
+ "Expects either a zipfile.ZipFile object or a pathlib.Path object "
+ "pointing to a directory containing the R/qtl2 bundle.")
+
+
+def validate_bundle(zfile: Union[Path, ZipFile]):
"""Ensure the R/qtl2 bundle is valid."""
missing = missing_files(zfile)
if len(missing) > 0:
- raise rqe.MissingFileError(
+ raise rqe.MissingFileException(
"The following files do not exist in the bundle: " +
", ".join(mfile[1] for mfile in missing))
@@ -111,6 +151,6 @@ def retrieve_errors(zfile: ZipFile, filetype: str, checkers: tuple[Callable]) ->
if value is not None:
for checker in checkers:
yield checker(lineno=lineno, field=field, value=value)
- except rqe.MissingFileError:
+ except rqe.MissingFileException:
fname = cdata.get(filetype)
yield MissingFile(filetype, fname, f"Missing '{filetype}' file '{fname}'.")