diff options
Diffstat (limited to 'r_qtl/r_qtl2.py')
-rw-r--r-- | r_qtl/r_qtl2.py | 198 |
1 files changed, 192 insertions, 6 deletions
diff --git a/r_qtl/r_qtl2.py b/r_qtl/r_qtl2.py index 87491d0..c6307f5 100644 --- a/r_qtl/r_qtl2.py +++ b/r_qtl/r_qtl2.py @@ -1,30 +1,124 @@ """The R/qtl2 parsing and processing code.""" import io +import os import csv import json from pathlib import Path -from zipfile import ZipFile from functools import reduce, partial +from zipfile import ZipFile, is_zipfile from typing import Union, Iterator, Iterable, Callable, Optional import yaml from functional_tools import take, chain -from r_qtl.errors import InvalidFormat, MissingFileError +from r_qtl.exceptions import InvalidFormat, MissingFileException FILE_TYPES = ( "geno", "founder_geno", "pheno", "covar", "phenocovar", "gmap", "pmap", "phenose") -def control_data(zfile: ZipFile) -> dict: +__CONTROL_FILE_ERROR_MESSAGE__ = ( + "The zipped bundle that was provided does not contain a valid control file " + "in either JSON or YAML format.") + + +def __special_file__(filename): + """ + Check whether the file is special in some ways, e.g. MacOSX seems to include + files in a directory `__MACOSX` that share parts of the name, and extensions + with the main files in the bundle. + """ + is_macosx_special_file = filename.startswith("__MACOSX") + is_nix_hidden_file = Path(filename).name.startswith(".") + + return (is_macosx_special_file or is_nix_hidden_file) + + +def extract(zfile: ZipFile, outputdir: Path) -> tuple[Path, ...]: + """Extract a ZipFile + + This function will extract a zipfile `zfile` to the directory `outputdir`. + + Parameters + ---------- + zfile: zipfile.ZipFile object - the zipfile to extract. + outputdir: Optional pathlib.Path object - where the extracted files go. + + Returns + ------- + A tuple of Path objects, each pointing to a member in the zipfile. + """ + outputdir.mkdir(parents=True, exist_ok=True) + return tuple(Path(zfile.extract(member, outputdir)) + for member in zfile.namelist() + if not __special_file__(member)) + + +def transpose_csv( + inpath: Path, + linesplitterfn: Callable, + linejoinerfn: Callable, + outpath: Path) -> Path: + """Transpose a file: Make its rows into columns and its columns into rows. + + This function will create a new file, `outfile`, with the same content as + the original, `infile`, except transposed i.e. The rows of `infile` are the + columns of `outfile` and the columns of `infile` are the rows of `outfile`. + + Parameters + ---------- + inpath: The CSV file to transpose. + linesplitterfn: A function to use for splitting each line into columns + linejoinerfn: A function to use to rebuild the lines + outpath: The path where the transposed data is stored + """ + def __read_by_line__(_path): + with open(_path, "r", encoding="utf8") as infile: + for line in infile: + if line.startswith("#"): + continue + yield line + + transposed_data= (f"{linejoinerfn(items)}\n" for items in zip(*( + linesplitterfn(line) for line in __read_by_line__(inpath)))) + + with open(outpath, "w", encoding="utf8") as outfile: + for line in transposed_data: + outfile.write(line) + + return outpath + + +def transpose_csv_with_rename(inpath: Path, + linesplitterfn: Callable, + linejoinerfn: Callable) -> Path: + """Renames input file and creates new transposed file with the original name + of the input file. + + Parameters + ---------- + inpath: Path to the input file. Should be a pathlib.Path object. + linesplitterfn: A function to use for splitting each line into columns + linejoinerfn: A function to use to rebuild the lines + """ + transposedfilepath = Path(inpath) + origbkp = inpath.parent.joinpath(f"{inpath.stem}___original{inpath.suffix}") + os.rename(inpath, origbkp) + return transpose_csv( + origbkp, linesplitterfn, linejoinerfn, transposedfilepath) + + +def __control_data_from_zipfile__(zfile: ZipFile) -> dict: """Retrieve the control file from the zip file info.""" files = tuple(filename for filename in zfile.namelist() - if (filename.endswith(".yaml") or filename.endswith(".json"))) + if (not __special_file__(filename) + and (filename.endswith(".yaml") + or filename.endswith(".json")))) num_files = len(files) if num_files == 0: - raise InvalidFormat("Expected a json or yaml control file.") + raise InvalidFormat(__CONTROL_FILE_ERROR_MESSAGE__) if num_files > 1: raise InvalidFormat("Found more than one possible control file.") @@ -41,6 +135,80 @@ def control_data(zfile: ZipFile) -> dict: else yaml.safe_load(zfile.read(files[0]))) } +def __control_data_from_dirpath__(dirpath: Path): + """Load control data from a given directory path.""" + files = tuple(path for path in dirpath.iterdir() + if (not __special_file__(path.name) + and (path.suffix in (".yaml", ".json")))) + num_files = len(files) + if num_files == 0: + raise InvalidFormat(__CONTROL_FILE_ERROR_MESSAGE__) + + if num_files > 1: + raise InvalidFormat("Found more than one possible control file.") + + with open(files[0], "r", encoding="utf8") as infile: + return { + "na.strings": ["NA"], + "comment.char": "#", + "sep": ",", + **{ + f"{key}_transposed": False for key in FILE_TYPES + }, + **(json.loads(infile.read()) + if files[0].suffix == ".json" + else yaml.safe_load(infile.read())) + } + + +def control_data(control_src: Union[Path, ZipFile]) -> dict: + """Read the R/qtl2 bundle control file. + + Parameters + ---------- + control_src: Path object of ZipFile object. + If a directory path is provided, this function will read the control + data from the control file in that directory. + It is importand that the Path be a directory and contain data from one + and only one R/qtl2 bundle. + + If a ZipFile object is provided, then the control data is read from the + control file within the zip file. We are moving away from parsing data + directly from ZipFile objects, and this is retained only until the + transition to using extracted files is complete. + + Returns + ------- + Returns a dict object with the control data that determines what the files + in the bundle are and how to parse them. + + Raises + ------ + r_qtl.exceptions.InvalidFormat + """ + def __cleanup__(cdata): + return { + **cdata, + **dict((filetype, + ([cdata[filetype]] if isinstance(cdata[filetype], str) + else cdata[filetype]) + ) for filetype in + (typ for typ in cdata.keys() if typ in FILE_TYPES)) + } + + if isinstance(control_src, ZipFile): + return __cleanup__(__control_data_from_zipfile__(control_src)) + if isinstance(control_src, Path): + if is_zipfile(control_src): + return __cleanup__( + __control_data_from_zipfile__(ZipFile(control_src))) + if control_src.is_dir(): + return __cleanup__(__control_data_from_dirpath__(control_src)) + raise InvalidFormat( + "Expects either a zipped bundle of files or a path-like object " + "pointing to the zipped R/qtl2 bundle.") + + def replace_na_strings(cdata, val): """Replace values indicated in `na.strings` with `None`.""" return (None if val in cdata.get("na.strings", ["NA"]) else val) @@ -252,7 +420,7 @@ def file_data(zfile: ZipFile, zfile, member_key, cdata, process_transposed_value): yield row except KeyError as exc: - raise MissingFileError(*exc.args) from exc + raise MissingFileException(*exc.args) from exc def cross_information(zfile: ZipFile, cdata: dict) -> Iterator[dict]: """Load cross information where present.""" @@ -386,3 +554,21 @@ def load_samples(zipfilepath: Union[str, Path], pass return tuple(samples) + + + +def read_text_file(filepath: Union[str, Path]) -> Iterator[str]: + """Read the raw text from a text file.""" + with open(filepath, "r", encoding="utf8") as _file: + for line in _file: + yield line + + +def read_csv_file(filepath: Union[str, Path], + separator: str = ",", + comment_char: str = "#") -> Iterator[tuple[str, ...]]: + """Read a file as a csv file.""" + for line in read_text_file(filepath): + if line.startswith(comment_char): + continue + yield tuple(field.strip() for field in line.split(separator)) |