about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2024-08-09 12:09:49 -0500
committerFrederick Muriuki Muriithi2024-08-09 12:09:49 -0500
commitd29a205d83a3eeba49483cc2f3fd372e461b5a9c (patch)
tree26af0b314d40fb1200490652e606c8a84ea227ce
parent4468d5b2ba238975c67e71f41dc23f96f8811d00 (diff)
downloadgn-uploader-d29a205d83a3eeba49483cc2f3fd372e461b5a9c.tar.gz
Read R/qtl2 control data from a directory with extracted files.
-rw-r--r--r_qtl/r_qtl2.py68
1 files changed, 66 insertions, 2 deletions
diff --git a/r_qtl/r_qtl2.py b/r_qtl/r_qtl2.py
index 23c016d..c2e1148 100644
--- a/r_qtl/r_qtl2.py
+++ b/r_qtl/r_qtl2.py
@@ -3,8 +3,8 @@ import io
 import csv
 import json
 from pathlib import Path
-from zipfile import ZipFile
 from functools import reduce, partial
+from zipfile import ZipFile, is_zipfile
 from typing import Union, Iterator, Iterable, Callable, Optional
 
 import yaml
@@ -81,7 +81,7 @@ def transpose_csv(
             outfile.write(line)
 
 
-def control_data(zfile: ZipFile) -> dict:
+def __control_data_from_zipfile__(zfile: ZipFile) -> dict:
     """Retrieve the control file from the zip file info."""
     files = tuple(filename
                   for filename in zfile.namelist()
@@ -107,6 +107,70 @@ def control_data(zfile: ZipFile) -> dict:
             else yaml.safe_load(zfile.read(files[0])))
     }
 
+
+def __control_data_from_dirpath__(dirpath: Path):
+    """Load control data from a given directory path."""
+    files = tuple(path for path in dirpath.iterdir()
+                  if (not __special_file__(path.name)
+                      and (path.suffix in (".yaml", ".json"))))
+    num_files = len(files)
+    if num_files == 0:
+        raise InvalidFormat("Expected a json or yaml control file.")
+
+    if num_files > 1:
+        raise InvalidFormat("Found more than one possible control file.")
+
+    with open(files[0], "r", encoding="utf8") as infile:
+        return {
+            "na.strings": ["NA"],
+            "comment.char": "#",
+            "sep": ",",
+            **{
+                f"{key}_transposed": False for key in FILE_TYPES
+            },
+            **(json.loads(infile.read())
+               if files[0].suffix == ".json"
+               else yaml.safe_load(infile.read()))
+        }
+
+
+def control_data(control_src: Union[Path, ZipFile]) -> dict:
+    """Read the R/qtl2 bundle control file.
+
+    Parameters
+    ----------
+    control_src: Path object of ZipFile object.
+        If a directory path is provided, this function will read the control
+        data from the control file in that directory.
+        It is importand that the Path be a directory and contain data from one
+        and only one R/qtl2 bundle.
+
+        If a ZipFile object is provided, then the control data is read from the
+        control file within the zip file. We are moving away from parsing data
+        directly from ZipFile objects, and this is retained only until the
+        transition to using extracted files is complete.
+
+    Returns
+    -------
+    Returns a dict object with the control data that determines what the files
+    in the bundle are and how to parse them.
+
+    Raises
+    ------
+    r_qtl.errors.InvalidFormat
+    """
+    if isinstance(control_src, ZipFile):
+        return __control_data_from_zipfile__(control_src)
+    if isinstance(control_src, Path):
+        if is_zipfile(control_src):
+            return __control_data_from_zipfile__(ZipFile(control_src))
+        if control_src.is_dir():
+            return __control_data_from_dirpath__(control_src)
+    raise InvalidFormat(
+        "Expects either a zipfile.ZipFile object or a pathlib.Path object "
+        "pointing to a directory containing the R/qtl2 bundle.")
+
+
 def replace_na_strings(cdata, val):
     """Replace values indicated in `na.strings` with `None`."""
     return (None if val in cdata.get("na.strings", ["NA"]) else val)