about summary refs log tree commit diff
path: root/r_qtl/r_qtl2.py
diff options
context:
space:
mode:
Diffstat (limited to 'r_qtl/r_qtl2.py')
-rw-r--r--r_qtl/r_qtl2.py228
1 files changed, 212 insertions, 16 deletions
diff --git a/r_qtl/r_qtl2.py b/r_qtl/r_qtl2.py
index 0a96e7c..0ef487f 100644
--- a/r_qtl/r_qtl2.py
+++ b/r_qtl/r_qtl2.py
@@ -1,21 +1,26 @@
 """The R/qtl2 parsing and processing code."""
 import io
+import os
 import csv
 import json
 from pathlib import Path
-from zipfile import ZipFile
 from functools import reduce, partial
+from zipfile import ZipFile, is_zipfile
 from typing import Union, Iterator, Iterable, Callable, Optional
 
 import yaml
 
 from functional_tools import take, chain
 
-from r_qtl.errors import InvalidFormat, MissingFileError
+from r_qtl.exceptions import InvalidFormat, MissingFileException
 
 FILE_TYPES = (
     "geno", "founder_geno", "pheno", "covar", "phenocovar", "gmap", "pmap",
-    "phenose")
+    "phenose", "phenonum")
+
+__CONTROL_FILE_ERROR_MESSAGE__ = (
+    "The zipped bundle that was provided does not contain a valid control file "
+    "in either JSON or YAML format.")
 
 
 def __special_file__(filename):
@@ -30,7 +35,81 @@ def __special_file__(filename):
     return (is_macosx_special_file or is_nix_hidden_file)
 
 
-def control_data(zfile: ZipFile) -> dict:
+def extract(zfile: ZipFile, outputdir: Path) -> tuple[Path, ...]:
+    """Extract a ZipFile
+
+    This function will extract a zipfile `zfile` to the directory `outputdir`.
+
+    Parameters
+    ----------
+    zfile: zipfile.ZipFile object - the zipfile to extract.
+    outputdir: Optional pathlib.Path object - where the extracted files go.
+
+    Returns
+    -------
+    A tuple of Path objects, each pointing to a member in the zipfile.
+    """
+    outputdir.mkdir(parents=True, exist_ok=True)
+    return tuple(Path(zfile.extract(member, outputdir))
+                 for member in zfile.namelist()
+                 if not __special_file__(member))
+
+
+def transpose_csv(
+        inpath: Path,
+        linesplitterfn: Callable,
+        linejoinerfn: Callable,
+        outpath: Path) -> Path:
+    """Transpose a file: Make its rows into columns and its columns into rows.
+
+    This function will create a new file, `outfile`, with the same content as
+    the original, `infile`, except transposed i.e. The rows of `infile` are the
+    columns of `outfile` and the columns of `infile` are the rows of `outfile`.
+
+    Parameters
+    ----------
+    inpath: The CSV file to transpose.
+    linesplitterfn: A function to use for splitting each line into columns
+    linejoinerfn: A function to use to rebuild the lines
+    outpath: The path where the transposed data is stored
+    """
+    def __read_by_line__(_path):
+        with open(_path, "r", encoding="utf8") as infile:
+            for line in infile:
+                if line.startswith("#"):
+                    continue
+                yield line
+
+    transposed_data= (f"{linejoinerfn(items)}\n" for items in zip(*(
+        linesplitterfn(line) for line in __read_by_line__(inpath))))
+
+    with open(outpath, "w", encoding="utf8") as outfile:
+        for line in transposed_data:
+            outfile.write(line)
+
+    return outpath
+
+
+def transpose_csv_with_rename(inpath: Path,
+                              linesplitterfn: Callable,
+                              linejoinerfn: Callable) -> Path:
+    """Renames input file and creates new transposed file with the original name
+    of the input file.
+
+    Parameters
+    ----------
+    inpath: Path to the input file. Should be a pathlib.Path object.
+    linesplitterfn: A function to use for splitting each line into columns
+    linejoinerfn: A function to use to rebuild the lines
+    """
+    transposedfilepath = Path(inpath)
+    origbkp = inpath.parent.joinpath(f"{inpath.stem}___original{inpath.suffix}")
+    os.rename(inpath, origbkp)
+    return transpose_csv(
+        origbkp, linesplitterfn, linejoinerfn, transposedfilepath)
+
+
+def __control_data_from_zipfile__(zfile: ZipFile) -> dict:
     """Retrieve the control file from the zip file info."""
     files = tuple(filename
                   for filename in zfile.namelist()
@@ -39,7 +118,7 @@ def control_data(zfile: ZipFile) -> dict:
                            or filename.endswith(".json"))))
     num_files = len(files)
     if num_files == 0:
-        raise InvalidFormat("Expected a json or yaml control file.")
+        raise InvalidFormat(__CONTROL_FILE_ERROR_MESSAGE__)
 
     if num_files > 1:
         raise InvalidFormat("Found more than one possible control file.")
@@ -56,6 +135,88 @@ def control_data(zfile: ZipFile) -> dict:
             else yaml.safe_load(zfile.read(files[0])))
     }
 
+def __control_data_from_dirpath__(dirpath: Path):
+    """Load control data from a given directory path."""
+    files = tuple(path for path in dirpath.iterdir()
+                  if (not __special_file__(path.name)
+                      and (path.suffix in (".yaml", ".json"))))
+    num_files = len(files)
+    if num_files == 0:
+        raise InvalidFormat(__CONTROL_FILE_ERROR_MESSAGE__)
+
+    if num_files > 1:
+        raise InvalidFormat("Found more than one possible control file.")
+
+    with open(files[0], "r", encoding="utf8") as infile:
+        return {
+            "na.strings": ["NA"],
+            "comment.char": "#",
+            "sep": ",",
+            **{
+                f"{key}_transposed": False for key in FILE_TYPES
+            },
+            **(json.loads(infile.read())
+               if files[0].suffix == ".json"
+               else yaml.safe_load(infile.read()))
+        }
+
+
+def control_data(control_src: Union[Path, ZipFile]) -> dict:
+    """Read the R/qtl2 bundle control file.
+
+    Parameters
+    ----------
+    control_src: Path object of ZipFile object.
+        If a directory path is provided, this function will read the control
+        data from the control file in that directory.
+        It is importand that the Path be a directory and contain data from one
+        and only one R/qtl2 bundle.
+
+        If a ZipFile object is provided, then the control data is read from the
+        control file within the zip file. We are moving away from parsing data
+        directly from ZipFile objects, and this is retained only until the
+        transition to using extracted files is complete.
+
+    Returns
+    -------
+    Returns a dict object with the control data that determines what the files
+    in the bundle are and how to parse them.
+
+    Raises
+    ------
+    r_qtl.exceptions.InvalidFormat
+    """
+    def __cleanup__(cdata):
+        _cdata = {
+            **cdata,
+            **dict((filetype,
+                    ([cdata[filetype]] if isinstance(cdata[filetype], str)
+                else cdata[filetype])
+                    ) for filetype in
+                   (typ for typ in cdata.keys() if typ in FILE_TYPES))
+        }
+        if "na.string" in _cdata:# handle common error in file.
+            _cdata = {
+                **cdata,
+                "na.strings": list(set(
+                    _cdata["na.string"] + _cdata["na.strings"]))
+            }
+
+        return _cdata
+
+    if isinstance(control_src, ZipFile):
+        return __cleanup__(__control_data_from_zipfile__(control_src))
+    if isinstance(control_src, Path):
+        if is_zipfile(control_src):
+            return __cleanup__(
+                __control_data_from_zipfile__(ZipFile(control_src)))
+        if control_src.is_dir():
+            return __cleanup__(__control_data_from_dirpath__(control_src))
+    raise InvalidFormat(
+        "Expects either a zipped bundle of files or a path-like object "
+        "pointing to the zipped R/qtl2 bundle.")
+
+
 def replace_na_strings(cdata, val):
     """Replace values indicated in `na.strings` with `None`."""
     return (None if val in cdata.get("na.strings", ["NA"]) else val)
@@ -250,24 +411,21 @@ def file_data(zfile: ZipFile,
 
     try:
         if isinstance(cdata[member_key], list):
-            for row in (line for lines in
+            yield from (line for lines in
                         (file_data(
                             zfile, member_key, {**cdata, member_key: innerfile},
                             process_value, process_transposed_value)
                          for innerfile in cdata[member_key])
-                        for line in lines):
-                yield row
+                        for line in lines)
             return
         if not cdata.get(f"{member_key}_transposed", False):
-            for row in with_non_transposed(zfile, member_key, cdata, process_value):
-                yield row
+            yield from with_non_transposed(zfile, member_key, cdata, process_value)
             return
 
-        for row in with_transposed(
-                zfile, member_key, cdata, process_transposed_value):
-            yield row
+        yield from with_transposed(
+            zfile, member_key, cdata, process_transposed_value)
     except KeyError as exc:
-        raise MissingFileError(*exc.args) from exc
+        raise MissingFileException(*exc.args) from exc
 
 def cross_information(zfile: ZipFile, cdata: dict) -> Iterator[dict]:
     """Load cross information where present."""
@@ -316,8 +474,7 @@ def raw_file_data(zipfilepath: Union[str, Path],
     with (ZipFile(str(zipfilepath), "r") as zfile,
           zfile.open(memberfilename) as innerfile):
         wrappedfile = io.TextIOWrapper(innerfile)
-        for  line in wrappedfile:
-            yield line
+        yield from wrappedfile
 
 def strip_comments(rawdata: Iterator[str], commentchar) -> Iterator[str]:
     """Remove comments from raw text."""
@@ -401,3 +558,42 @@ def load_samples(zipfilepath: Union[str, Path],
             pass
 
     return tuple(samples)
+
+
+
+def read_text_file(filepath: Union[str, Path]) -> Iterator[str]:
+    """Read the raw text from a text file."""
+    with open(filepath, "r", encoding="utf8") as _file:
+        yield from _file
+
+
+def read_csv_file(filepath: Union[str, Path],
+                  separator: str = ",",
+                  comment_char: str = "#") -> Iterator[tuple[str, ...]]:
+    """Read a file as a csv file. This does not process the N/A values."""
+    for line in read_text_file(filepath):
+        if line.startswith(comment_char):
+            continue
+        yield tuple(field.strip() for field in line.split(separator))
+
+
+def read_csv_file_headers(
+        filepath: Union[str, Path],
+        transposed: bool,
+        separator: str = ",",
+        comment_char: str = "#"
+) -> tuple[str, ...]:
+    """Read the 'true' headers of a CSV file."""
+    headers = tuple()
+    for line in read_text_file(filepath):
+        if line.startswith(comment_char):
+            continue
+
+        line = tuple(field.strip() for field in line.split(separator))
+        if not transposed:
+            return line
+
+        headers = headers + (line[0],)
+        continue
+
+    return headers