aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2024-08-09 12:09:49 -0500
committerFrederick Muriuki Muriithi2024-08-09 12:09:49 -0500
commitd29a205d83a3eeba49483cc2f3fd372e461b5a9c (patch)
tree26af0b314d40fb1200490652e606c8a84ea227ce
parent4468d5b2ba238975c67e71f41dc23f96f8811d00 (diff)
downloadgn-uploader-d29a205d83a3eeba49483cc2f3fd372e461b5a9c.tar.gz
Read R/qtl2 control data from a directory with extracted files.
-rw-r--r--r_qtl/r_qtl2.py68
1 files changed, 66 insertions, 2 deletions
diff --git a/r_qtl/r_qtl2.py b/r_qtl/r_qtl2.py
index 23c016d..c2e1148 100644
--- a/r_qtl/r_qtl2.py
+++ b/r_qtl/r_qtl2.py
@@ -3,8 +3,8 @@ import io
import csv
import json
from pathlib import Path
-from zipfile import ZipFile
from functools import reduce, partial
+from zipfile import ZipFile, is_zipfile
from typing import Union, Iterator, Iterable, Callable, Optional
import yaml
@@ -81,7 +81,7 @@ def transpose_csv(
outfile.write(line)
-def control_data(zfile: ZipFile) -> dict:
+def __control_data_from_zipfile__(zfile: ZipFile) -> dict:
"""Retrieve the control file from the zip file info."""
files = tuple(filename
for filename in zfile.namelist()
@@ -107,6 +107,70 @@ def control_data(zfile: ZipFile) -> dict:
else yaml.safe_load(zfile.read(files[0])))
}
+
+def __control_data_from_dirpath__(dirpath: Path):
+ """Load control data from a given directory path."""
+ files = tuple(path for path in dirpath.iterdir()
+ if (not __special_file__(path.name)
+ and (path.suffix in (".yaml", ".json"))))
+ num_files = len(files)
+ if num_files == 0:
+ raise InvalidFormat("Expected a json or yaml control file.")
+
+ if num_files > 1:
+ raise InvalidFormat("Found more than one possible control file.")
+
+ with open(files[0], "r", encoding="utf8") as infile:
+ return {
+ "na.strings": ["NA"],
+ "comment.char": "#",
+ "sep": ",",
+ **{
+ f"{key}_transposed": False for key in FILE_TYPES
+ },
+ **(json.loads(infile.read())
+ if files[0].suffix == ".json"
+ else yaml.safe_load(infile.read()))
+ }
+
+
+def control_data(control_src: Union[Path, ZipFile]) -> dict:
+ """Read the R/qtl2 bundle control file.
+
+ Parameters
+ ----------
+ control_src: Path object of ZipFile object.
+ If a directory path is provided, this function will read the control
+ data from the control file in that directory.
+ It is importand that the Path be a directory and contain data from one
+ and only one R/qtl2 bundle.
+
+ If a ZipFile object is provided, then the control data is read from the
+ control file within the zip file. We are moving away from parsing data
+ directly from ZipFile objects, and this is retained only until the
+ transition to using extracted files is complete.
+
+ Returns
+ -------
+ Returns a dict object with the control data that determines what the files
+ in the bundle are and how to parse them.
+
+ Raises
+ ------
+ r_qtl.errors.InvalidFormat
+ """
+ if isinstance(control_src, ZipFile):
+ return __control_data_from_zipfile__(control_src)
+ if isinstance(control_src, Path):
+ if is_zipfile(control_src):
+ return __control_data_from_zipfile__(ZipFile(control_src))
+ if control_src.is_dir():
+ return __control_data_from_dirpath__(control_src)
+ raise InvalidFormat(
+ "Expects either a zipfile.ZipFile object or a pathlib.Path object "
+ "pointing to a directory containing the R/qtl2 bundle.")
+
+
def replace_na_strings(cdata, val):
"""Replace values indicated in `na.strings` with `None`."""
return (None if val in cdata.get("na.strings", ["NA"]) else val)