aboutsummaryrefslogtreecommitdiff
path: root/r_qtl/r_qtl2.py
diff options
context:
space:
mode:
Diffstat (limited to 'r_qtl/r_qtl2.py')
-rw-r--r--r_qtl/r_qtl2.py156
1 files changed, 152 insertions, 4 deletions
diff --git a/r_qtl/r_qtl2.py b/r_qtl/r_qtl2.py
index 0a96e7c..9da4081 100644
--- a/r_qtl/r_qtl2.py
+++ b/r_qtl/r_qtl2.py
@@ -1,17 +1,18 @@
"""The R/qtl2 parsing and processing code."""
import io
+import os
import csv
import json
from pathlib import Path
-from zipfile import ZipFile
from functools import reduce, partial
+from zipfile import ZipFile, is_zipfile
from typing import Union, Iterator, Iterable, Callable, Optional
import yaml
from functional_tools import take, chain
-from r_qtl.errors import InvalidFormat, MissingFileError
+from r_qtl.exceptions import InvalidFormat, MissingFileException
FILE_TYPES = (
"geno", "founder_geno", "pheno", "covar", "phenocovar", "gmap", "pmap",
@@ -30,7 +31,79 @@ def __special_file__(filename):
return (is_macosx_special_file or is_nix_hidden_file)
-def control_data(zfile: ZipFile) -> dict:
+def extract(zfile: ZipFile, outputdir: Path) -> tuple[Path, ...]:
+ """Extract a ZipFile
+
+ This function will extract a zipfile `zfile` to the directory `outputdir`.
+
+ Parameters
+ ----------
+ zfile: zipfile.ZipFile object - the zipfile to extract.
+ outputdir: Optional pathlib.Path object - where the extracted files go.
+
+ Returns
+ -------
+ A tuple of Path objects, each pointing to a member in the zipfile.
+ """
+ outputdir.mkdir(parents=True, exist_ok=True)
+ return tuple(Path(zfile.extract(member, outputdir))
+ for member in zfile.namelist()
+ if not __special_file__(member))
+
+
+def transpose_csv(
+ inpath: Path,
+ linesplitterfn: Callable,
+ linejoinerfn: Callable,
+ outpath: Path) -> Path:
+ """Transpose a file: Make its rows into columns and its columns into rows.
+
+ This function will create a new file, `outfile`, with the same content as
+ the original, `infile`, except transposed i.e. The rows of `infile` are the
+ columns of `outfile` and the columns of `infile` are the rows of `outfile`.
+
+ Parameters
+ ----------
+ inpath: The CSV file to transpose.
+ linesplitterfn: A function to use for splitting each line into columns
+ linejoinerfn: A function to use to rebuild the lines
+ outpath: The path where the transposed data is stored
+ """
+ def __read_by_line__(_path):
+ with open(_path, "r", encoding="utf8") as infile:
+ for line in infile:
+ yield line
+
+ transposed_data= (f"{linejoinerfn(items)}\n" for items in zip(*(
+ linesplitterfn(line) for line in __read_by_line__(inpath))))
+
+ with open(outpath, "w", encoding="utf8") as outfile:
+ for line in transposed_data:
+ outfile.write(line)
+
+ return outpath
+
+
+def transpose_csv_with_rename(inpath: Path,
+ linesplitterfn: Callable,
+ linejoinerfn: Callable) -> Path:
+ """Renames input file and creates new transposed file with the original name
+ of the input file.
+
+ Parameters
+ ----------
+ inpath: Path to the input file. Should be a pathlib.Path object.
+ linesplitterfn: A function to use for splitting each line into columns
+ linejoinerfn: A function to use to rebuild the lines
+ """
+ transposedfilepath = Path(inpath)
+ origbkp = inpath.parent.joinpath(f"{inpath.stem}___original{inpath.suffix}")
+ os.rename(inpath, origbkp)
+ return transpose_csv(
+ origbkp, linesplitterfn, linejoinerfn, transposedfilepath)
+
+
+def __control_data_from_zipfile__(zfile: ZipFile) -> dict:
"""Retrieve the control file from the zip file info."""
files = tuple(filename
for filename in zfile.namelist()
@@ -56,6 +129,81 @@ def control_data(zfile: ZipFile) -> dict:
else yaml.safe_load(zfile.read(files[0])))
}
+
+def __control_data_from_dirpath__(dirpath: Path):
+ """Load control data from a given directory path."""
+ files = tuple(path for path in dirpath.iterdir()
+ if (not __special_file__(path.name)
+ and (path.suffix in (".yaml", ".json"))))
+ num_files = len(files)
+ if num_files == 0:
+ raise InvalidFormat("Expected a json or yaml control file.")
+
+ if num_files > 1:
+ raise InvalidFormat("Found more than one possible control file.")
+
+ with open(files[0], "r", encoding="utf8") as infile:
+ return {
+ "na.strings": ["NA"],
+ "comment.char": "#",
+ "sep": ",",
+ **{
+ f"{key}_transposed": False for key in FILE_TYPES
+ },
+ **(json.loads(infile.read())
+ if files[0].suffix == ".json"
+ else yaml.safe_load(infile.read()))
+ }
+
+
+def control_data(control_src: Union[Path, ZipFile]) -> dict:
+ """Read the R/qtl2 bundle control file.
+
+ Parameters
+ ----------
+ control_src: Path object of ZipFile object.
+ If a directory path is provided, this function will read the control
+ data from the control file in that directory.
+ It is importand that the Path be a directory and contain data from one
+ and only one R/qtl2 bundle.
+
+ If a ZipFile object is provided, then the control data is read from the
+ control file within the zip file. We are moving away from parsing data
+ directly from ZipFile objects, and this is retained only until the
+ transition to using extracted files is complete.
+
+ Returns
+ -------
+ Returns a dict object with the control data that determines what the files
+ in the bundle are and how to parse them.
+
+ Raises
+ ------
+ r_qtl.exceptions.InvalidFormat
+ """
+ def __cleanup__(cdata):
+ return {
+ **cdata,
+ **dict((filetype,
+ ([cdata[filetype]] if isinstance(cdata[filetype], str)
+ else cdata[filetype])
+ ) for filetype in
+ (typ for typ in cdata.keys() if typ in FILE_TYPES))
+ }
+
+ if isinstance(control_src, ZipFile):
+ return __cleanup__(__control_data_from_zipfile__(control_src))
+ if isinstance(control_src, Path):
+ if is_zipfile(control_src):
+ return __cleanup__(
+ __control_data_from_zipfile__(ZipFile(control_src)))
+ if control_src.is_dir():
+ return __cleanup__(__control_data_from_dirpath__(control_src))
+ raise InvalidFormat(
+ "Expects either a zipfile.ZipFile object or a pathlib.Path object "
+ "pointing to a directory containing the R/qtl2 bundle.")
+
+
def replace_na_strings(cdata, val):
"""Replace values indicated in `na.strings` with `None`."""
return (None if val in cdata.get("na.strings", ["NA"]) else val)
@@ -267,7 +415,7 @@ def file_data(zfile: ZipFile,
zfile, member_key, cdata, process_transposed_value):
yield row
except KeyError as exc:
- raise MissingFileError(*exc.args) from exc
+ raise MissingFileException(*exc.args) from exc
def cross_information(zfile: ZipFile, cdata: dict) -> Iterator[dict]:
"""Load cross information where present."""