diff options
Diffstat (limited to 'r_qtl/r_qtl2_qc.py')
-rw-r--r-- | r_qtl/r_qtl2_qc.py | 62 |
1 files changed, 51 insertions, 11 deletions
diff --git a/r_qtl/r_qtl2_qc.py b/r_qtl/r_qtl2_qc.py index be1eac4..2d9e9a8 100644 --- a/r_qtl/r_qtl2_qc.py +++ b/r_qtl/r_qtl2_qc.py @@ -1,12 +1,14 @@ """Quality control checks for R/qtl2 data bundles.""" -from zipfile import ZipFile +from pathlib import Path from functools import reduce, partial +from zipfile import ZipFile, is_zipfile from typing import Union, Iterator, Optional, Callable -from r_qtl import errors as rqe from r_qtl import r_qtl2 as rqtl2 +from r_qtl import exceptions as rqe from r_qtl.r_qtl2 import FILE_TYPES from r_qtl.fileerrors import MissingFile +from r_qtl.exceptions import InvalidFormat from quality_control.errors import InvalidValue from quality_control.checks import decimal_points_error @@ -39,11 +41,10 @@ def bundle_files_list(cdata: dict) -> tuple[tuple[str, str], ...]: return fileslist -def missing_files(zfile: ZipFile) -> tuple[tuple[str, str], ...]: - """ - Retrieve a list of files listed in the control file that do not exist in the - bundle. - """ + +def __missing_from_zipfile__( + zfile: ZipFile, cdata: dict) -> tuple[tuple[str, str], ...]: + """Check for missing files from a still-compressed zip file.""" def __missing_p__(filedetails: tuple[str, str]): _cfkey, thefile = filedetails try: @@ -52,14 +53,53 @@ def missing_files(zfile: ZipFile) -> tuple[tuple[str, str], ...]: except KeyError: return True - return tuple(afile for afile in bundle_files_list(rqtl2.control_data(zfile)) + return tuple(afile for afile in bundle_files_list(cdata) if __missing_p__(afile)) -def validate_bundle(zfile: ZipFile): + +def __missing_from_dirpath__( + dirpath: Path, cdata: dict) -> tuple[tuple[str, str], ...]: + """Check for missing files from an extracted bundle.""" + allfiles = tuple(_file.name for _file in dirpath.iterdir()) + return tuple(afile for afile in bundle_files_list(cdata) + if afile[1] not in allfiles) + + +def missing_files(bundlesrc: Union[Path, ZipFile]) -> tuple[tuple[str, str], ...]: + """ + Retrieve a list of files listed in the control file that do not exist in the + bundle. + + Parameters + ---------- + bundlesrc: Path object of ZipFile object: This is the bundle under check. + + Returns + ------- + A tuple of names listed in the control file that do not exist in the bundle. + + Raises + ------ + r_qtl.exceptions.InvalidFormat + """ + cdata = rqtl2.control_data(bundlesrc) + if isinstance(bundlesrc, ZipFile): + return __missing_from_zipfile__(bundlesrc, cdata) + if isinstance(bundlesrc, Path): + if is_zipfile(bundlesrc): + return __missing_from_zipfile__(ZipFile(bundlesrc), cdata) + if bundlesrc.is_dir(): + return __missing_from_dirpath__(bundlesrc, cdata) + raise InvalidFormat( + "Expects either a zipfile.ZipFile object or a pathlib.Path object " + "pointing to a directory containing the R/qtl2 bundle.") + + +def validate_bundle(zfile: Union[Path, ZipFile]): """Ensure the R/qtl2 bundle is valid.""" missing = missing_files(zfile) if len(missing) > 0: - raise rqe.MissingFileError( + raise rqe.MissingFileException( "The following files do not exist in the bundle: " + ", ".join(mfile[1] for mfile in missing)) @@ -111,6 +151,6 @@ def retrieve_errors(zfile: ZipFile, filetype: str, checkers: tuple[Callable]) -> if value is not None: for checker in checkers: yield checker(lineno=lineno, field=field, value=value) - except rqe.MissingFileError: + except rqe.MissingFileException: fname = cdata.get(filetype) yield MissingFile(filetype, fname, f"Missing '{filetype}' file '{fname}'.") |