aboutsummaryrefslogtreecommitdiff
path: root/r_qtl
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2024-08-12 10:46:01 -0500
committerFrederick Muriuki Muriithi2024-08-12 16:15:13 -0500
commit42d6423578fe61857d1afbfc3be37d052b4bb39f (patch)
tree4600769617a5596a55c9cb074ec0b0fffcd0f059 /r_qtl
parente3e3fb29b5e609723ece8b709fdebda6822eb891 (diff)
downloadgn-uploader-42d6423578fe61857d1afbfc3be37d052b4bb39f.tar.gz
Update check for missing files: Check from directory.
Enable the check for missing files to act upon a directory where the R/qtl2 bundle has been extracted into.
Diffstat (limited to 'r_qtl')
-rw-r--r--r_qtl/r_qtl2_qc.py53
1 files changed, 46 insertions, 7 deletions
diff --git a/r_qtl/r_qtl2_qc.py b/r_qtl/r_qtl2_qc.py
index 7e93d23..6f7b374 100644
--- a/r_qtl/r_qtl2_qc.py
+++ b/r_qtl/r_qtl2_qc.py
@@ -1,6 +1,7 @@
"""Quality control checks for R/qtl2 data bundles."""
-from zipfile import ZipFile
+from pathlib import Path
from functools import reduce, partial
+from zipfile import ZipFile, is_zipfile
from typing import Union, Iterator, Optional, Callable
from r_qtl import r_qtl2 as rqtl2
@@ -39,11 +40,10 @@ def bundle_files_list(cdata: dict) -> tuple[tuple[str, str], ...]:
return fileslist
-def missing_files(zfile: ZipFile) -> tuple[tuple[str, str], ...]:
- """
- Retrieve a list of files listed in the control file that do not exist in the
- bundle.
- """
+
+def __missing_from_zipfile__(
+ zfile: ZipFile, cdata: dict) -> tuple[tuple[str, str], ...]:
+ """Check for missing files from a still-compressed zip file."""
def __missing_p__(filedetails: tuple[str, str]):
_cfkey, thefile = filedetails
try:
@@ -52,9 +52,48 @@ def missing_files(zfile: ZipFile) -> tuple[tuple[str, str], ...]:
except KeyError:
return True
- return tuple(afile for afile in bundle_files_list(rqtl2.control_data(zfile))
+ return tuple(afile for afile in bundle_files_list(cdata)
if __missing_p__(afile))
+
+def __missing_from_dirpath__(
+ dirpath: Path, cdata: dict) -> tuple[tuple[str, str], ...]:
+ """Check for missing files from an extracted bundle."""
+ allfiles = tuple(_file.name for _file in dirpath.iterdir())
+ return tuple(afile for afile in bundle_files_list(cdata)
+ if afile[1] not in allfiles)
+
+
+def missing_files(bundlesrc: Union[Path, ZipFile]) -> tuple[tuple[str, str], ...]:
+ """
+ Retrieve a list of files listed in the control file that do not exist in the
+ bundle.
+
+ Parameters
+ ----------
+ bundlesrc: Path object of ZipFile object: This is the bundle under check.
+
+ Returns
+ -------
+ A tuple of names listed in the control file that do not exist in the bundle.
+
+ Raises
+ ------
+ r_qtl.exceptions.InvalidFormat
+ """
+ cdata = rqtl2.control_data(bundlesrc)
+ if isinstance(bundlesrc, ZipFile):
+ return __missing_from_zipfile__(bundlesrc, cdata)
+ if isinstance(bundlesrc, Path):
+ if is_zipfile(bundlesrc):
+ return __missing_from_zipfile__(ZipFile(bundlesrc, cdata))
+ if bundlesrc.is_dir():
+ return __missing_from_dirpath__(bundlesrc, cdata)
+ raise InvalidFormat(
+ "Expects either a zipfile.ZipFile object or a pathlib.Path object "
+ "pointing to a directory containing the R/qtl2 bundle.")
+
+
def validate_bundle(zfile: ZipFile):
"""Ensure the R/qtl2 bundle is valid."""
missing = missing_files(zfile)