diff options
-rw-r--r-- | r_qtl/r_qtl2.py | 68 |
1 files changed, 66 insertions, 2 deletions
diff --git a/r_qtl/r_qtl2.py b/r_qtl/r_qtl2.py index 23c016d..c2e1148 100644 --- a/r_qtl/r_qtl2.py +++ b/r_qtl/r_qtl2.py @@ -3,8 +3,8 @@ import io import csv import json from pathlib import Path -from zipfile import ZipFile from functools import reduce, partial +from zipfile import ZipFile, is_zipfile from typing import Union, Iterator, Iterable, Callable, Optional import yaml @@ -81,7 +81,7 @@ def transpose_csv( outfile.write(line) -def control_data(zfile: ZipFile) -> dict: +def __control_data_from_zipfile__(zfile: ZipFile) -> dict: """Retrieve the control file from the zip file info.""" files = tuple(filename for filename in zfile.namelist() @@ -107,6 +107,70 @@ def control_data(zfile: ZipFile) -> dict: else yaml.safe_load(zfile.read(files[0]))) } + +def __control_data_from_dirpath__(dirpath: Path): + """Load control data from a given directory path.""" + files = tuple(path for path in dirpath.iterdir() + if (not __special_file__(path.name) + and (path.suffix in (".yaml", ".json")))) + num_files = len(files) + if num_files == 0: + raise InvalidFormat("Expected a json or yaml control file.") + + if num_files > 1: + raise InvalidFormat("Found more than one possible control file.") + + with open(files[0], "r", encoding="utf8") as infile: + return { + "na.strings": ["NA"], + "comment.char": "#", + "sep": ",", + **{ + f"{key}_transposed": False for key in FILE_TYPES + }, + **(json.loads(infile.read()) + if files[0].suffix == ".json" + else yaml.safe_load(infile.read())) + } + + +def control_data(control_src: Union[Path, ZipFile]) -> dict: + """Read the R/qtl2 bundle control file. + + Parameters + ---------- + control_src: Path object of ZipFile object. + If a directory path is provided, this function will read the control + data from the control file in that directory. + It is importand that the Path be a directory and contain data from one + and only one R/qtl2 bundle. + + If a ZipFile object is provided, then the control data is read from the + control file within the zip file. We are moving away from parsing data + directly from ZipFile objects, and this is retained only until the + transition to using extracted files is complete. + + Returns + ------- + Returns a dict object with the control data that determines what the files + in the bundle are and how to parse them. + + Raises + ------ + r_qtl.errors.InvalidFormat + """ + if isinstance(control_src, ZipFile): + return __control_data_from_zipfile__(control_src) + if isinstance(control_src, Path): + if is_zipfile(control_src): + return __control_data_from_zipfile__(ZipFile(control_src)) + if control_src.is_dir(): + return __control_data_from_dirpath__(control_src) + raise InvalidFormat( + "Expects either a zipfile.ZipFile object or a pathlib.Path object " + "pointing to a directory containing the R/qtl2 bundle.") + + def replace_na_strings(cdata, val): """Replace values indicated in `na.strings` with `None`.""" return (None if val in cdata.get("na.strings", ["NA"]) else val) |