"""The R/qtl2 parsing and processing code.""" import io import os import csv import json from pathlib import Path from functools import reduce, partial from zipfile import ZipFile, is_zipfile from typing import Union, Iterator, Iterable, Callable, Optional import yaml from functional_tools import take, chain from r_qtl.exceptions import InvalidFormat, MissingFileException FILE_TYPES = ( "geno", "founder_geno", "pheno", "covar", "phenocovar", "gmap", "pmap", "phenose") __CONTROL_FILE_ERROR_MESSAGE__ = ( "The zipped bundle that was provided does not contain a valid control file " "in either JSON or YAML format.") def __special_file__(filename): """ Check whether the file is special in some ways, e.g. MacOSX seems to include files in a directory `__MACOSX` that share parts of the name, and extensions with the main files in the bundle. """ is_macosx_special_file = filename.startswith("__MACOSX") is_nix_hidden_file = Path(filename).name.startswith(".") return (is_macosx_special_file or is_nix_hidden_file) def extract(zfile: ZipFile, outputdir: Path) -> tuple[Path, ...]: """Extract a ZipFile This function will extract a zipfile `zfile` to the directory `outputdir`. Parameters ---------- zfile: zipfile.ZipFile object - the zipfile to extract. outputdir: Optional pathlib.Path object - where the extracted files go. Returns ------- A tuple of Path objects, each pointing to a member in the zipfile. """ outputdir.mkdir(parents=True, exist_ok=True) return tuple(Path(zfile.extract(member, outputdir)) for member in zfile.namelist() if not __special_file__(member)) def transpose_csv( inpath: Path, linesplitterfn: Callable, linejoinerfn: Callable, outpath: Path) -> Path: """Transpose a file: Make its rows into columns and its columns into rows. This function will create a new file, `outfile`, with the same content as the original, `infile`, except transposed i.e. The rows of `infile` are the columns of `outfile` and the columns of `infile` are the rows of `outfile`. Parameters ---------- inpath: The CSV file to transpose. linesplitterfn: A function to use for splitting each line into columns linejoinerfn: A function to use to rebuild the lines outpath: The path where the transposed data is stored """ def __read_by_line__(_path): with open(_path, "r", encoding="utf8") as infile: for line in infile: if line.startswith("#"): continue yield line transposed_data= (f"{linejoinerfn(items)}\n" for items in zip(*( linesplitterfn(line) for line in __read_by_line__(inpath)))) with open(outpath, "w", encoding="utf8") as outfile: for line in transposed_data: outfile.write(line) return outpath def transpose_csv_with_rename(inpath: Path, linesplitterfn: Callable, linejoinerfn: Callable) -> Path: """Renames input file and creates new transposed file with the original name of the input file. Parameters ---------- inpath: Path to the input file. Should be a pathlib.Path object. linesplitterfn: A function to use for splitting each line into columns linejoinerfn: A function to use to rebuild the lines """ transposedfilepath = Path(inpath) origbkp = inpath.parent.joinpath(f"{inpath.stem}___original{inpath.suffix}") os.rename(inpath, origbkp) return transpose_csv( origbkp, linesplitterfn, linejoinerfn, transposedfilepath) def __control_data_from_zipfile__(zfile: ZipFile) -> dict: """Retrieve the control file from the zip file info.""" files = tuple(filename for filename in zfile.namelist() if (not __special_file__(filename) and (filename.endswith(".yaml") or filename.endswith(".json")))) num_files = len(files) if num_files == 0: raise InvalidFormat(__CONTROL_FILE_ERROR_MESSAGE__) if num_files > 1: raise InvalidFormat("Found more than one possible control file.") return { "na.strings": ["NA"], "comment.char": "#", "sep": ",", **{ f"{key}_transposed": False for key in FILE_TYPES }, **(json.loads(zfile.read(files[0])) if files[0].endswith(".json") else yaml.safe_load(zfile.read(files[0]))) } def __control_data_from_dirpath__(dirpath: Path): """Load control data from a given directory path.""" files = tuple(path for path in dirpath.iterdir() if (not __special_file__(path.name) and (path.suffix in (".yaml", ".json")))) num_files = len(files) if num_files == 0: raise InvalidFormat(__CONTROL_FILE_ERROR_MESSAGE__) if num_files > 1: raise InvalidFormat("Found more than one possible control file.") with open(files[0], "r", encoding="utf8") as infile: return { "na.strings": ["NA"], "comment.char": "#", "sep": ",", **{ f"{key}_transposed": False for key in FILE_TYPES }, **(json.loads(infile.read()) if files[0].suffix == ".json" else yaml.safe_load(infile.read())) } def control_data(control_src: Union[Path, ZipFile]) -> dict: """Read the R/qtl2 bundle control file. Parameters ---------- control_src: Path object of ZipFile object. If a directory path is provided, this function will read the control data from the control file in that directory. It is importand that the Path be a directory and contain data from one and only one R/qtl2 bundle. If a ZipFile object is provided, then the control data is read from the control file within the zip file. We are moving away from parsing data directly from ZipFile objects, and this is retained only until the transition to using extracted files is complete. Returns ------- Returns a dict object with the control data that determines what the files in the bundle are and how to parse them. Raises ------ r_qtl.exceptions.InvalidFormat """ def __cleanup__(cdata): return { **cdata, **dict((filetype, ([cdata[filetype]] if isinstance(cdata[filetype], str) else cdata[filetype]) ) for filetype in (typ for typ in cdata.keys() if typ in FILE_TYPES)) } if isinstance(control_src, ZipFile): return __cleanup__(__control_data_from_zipfile__(control_src)) if isinstance(control_src, Path): if is_zipfile(control_src): return __cleanup__( __control_data_from_zipfile__(ZipFile(control_src))) if control_src.is_dir(): return __cleanup__(__control_data_from_dirpath__(control_src)) raise InvalidFormat( "Expects either a zipped bundle of files or a path-like object " "pointing to the zipped R/qtl2 bundle.") def replace_na_strings(cdata, val): """Replace values indicated in `na.strings` with `None`.""" return (None if val in cdata.get("na.strings", ["NA"]) else val) def with_non_transposed(zfile: ZipFile, member_key: str, cdata: dict, process_value: Callable[ [dict], dict] = lambda val: val) -> Iterator[dict]: """Process non-transposed file values Arguments: zfile: A zipfile object from opening a R/qtl2 bundle. member_key: A key to retrieve the member file to process from the file. cdata: The control data from the R/qtl2 bundle read from the JSON/YAML file. process_value: A function to process the values from the file. """ def not_comment_line(line): return not line.startswith(cdata.get("comment.char", "#")) sep = cdata.get("sep", ",") with zfile.open(cdata[member_key]) as innerfile: try: wrapped_file = io.TextIOWrapper(innerfile) firstrow = tuple( field.strip() for field in next(filter(not_comment_line, wrapped_file)).strip().split(sep)) id_key = firstrow[0] wrapped_file.seek(0) reader = csv.DictReader(filter(not_comment_line, wrapped_file), delimiter=sep) for row in reader: processed = process_value(row) yield { "id": processed[id_key], **{ key: value for key, value in processed.items() if key != id_key } } except StopIteration as exc: raise InvalidFormat("The file has no rows!") from exc def __make_organise_by_id__(id_key): """Return a function to use with `reduce` to organise values by some identifier.""" def __organiser__(acc, item): row = acc.get(item[id_key], {}) return {**acc, item[id_key]: {**row, **item}} return __organiser__ def __batch_of_n__(iterable: Iterable, num): """Return a batch of `num` items or less from the `iterable`.""" while True: items = take(iterable, num) if len(items) <= 0: break yield items def with_transposed(zfile: ZipFile, member_key: str, cdata: dict, process_value: Callable[ [str, tuple[str, ...], tuple[str, ...]], tuple[dict, ...]]) -> Iterator[dict]: """Process transposed file values Arguments: zfile: A zipfile object from opening a R/qtl2 bundle. member_key: A key to retrieve the member file to process from the file. cdata: The control data from the R/qtl2 bundle read from the JSON/YAML file. process_value: A function to process the values from the file. """ with zfile.open(cdata[member_key]) as innerfile: lines = (tuple(field.strip() for field in line.strip().split(cdata.get("sep", ","))) for line in filter(lambda line: not line.startswith("#"), io.TextIOWrapper(innerfile))) try: id_line = next(lines) id_key, headers = id_line[0], id_line[1:] for _key, row in reduce(# type: ignore[var-annotated] __make_organise_by_id__(id_key), (row for batch in __batch_of_n__(lines, 300) for line in batch for row in process_value(id_key, headers, line)), {}).items(): yield { "id": row[id_key], **{ key: value for key, value in row.items() if key != id_key }} except StopIteration: pass def make_process_data_geno(cdata) -> tuple[ Callable[[dict], dict], Callable[[str, tuple[str, ...], tuple[str, ...]], tuple[dict, ...]]]: """Build functions to process genotype data.""" def replace_genotype_codes(val):#pylint: disable=[redefined-outer-name] # The rewrite will probably make this obsolete. return cdata["genotypes"].get(val, val) def __non_transposed__(row: dict) -> dict: return { key: chain(value, replace_genotype_codes, partial(replace_na_strings, cdata)) for key,value in row.items() } def __transposed__(id_key: str, ids: tuple[str, ...], vals: tuple[str, ...]) -> tuple[dict, ...]: return tuple( dict(zip( [id_key, vals[0]], (chain(item, replace_genotype_codes, partial(replace_na_strings, cdata)) for item in items))) for items in zip(ids, vals[1:])) return (__non_transposed__, __transposed__) def replace_sex_info(val, cdata: dict): """Replace sex information in files with values in the control data.""" sex_info = cdata.get("sex", False) if bool(sex_info): return sex_info.get(val, val) return val def replace_cross_info(val, cdata: dict): """ Replace cross information in files with the values in the control data. """ cross_info = cdata.get("cross_info", False) if bool(cross_info): return cross_info.get(val, val) return val def make_process_data_covar(cdata) -> tuple[ Callable[[dict], dict], Callable[[str, tuple[str, ...], tuple[str, ...]], tuple[dict, ...]]]: """Build functions to process sex and cross information in covar files.""" rep_sex_info = partial(replace_sex_info, cdata=cdata) rep_cross_info = partial(replace_cross_info, cdata=cdata) def non_transposed(row: dict) -> dict: return { key: chain(value, rep_sex_info, rep_cross_info) for key,value in row.items() } def transposed(id_key: str, ids: tuple[str, ...], vals: tuple[str, ...]) -> tuple[dict, ...]: return tuple( dict(zip( [id_key, vals[0]], (chain(item, rep_sex_info, rep_cross_info) for item in items))) for items in zip(ids, vals[1:])) return (non_transposed, transposed) def file_data(zfile: ZipFile, member_key: str, cdata: dict, process_value: Optional[Callable[[dict], dict]] = None, process_transposed_value: Optional[Callable[ [str, tuple[str, ...], tuple[str, ...]], tuple[dict, ...]]] = None) -> Iterator[dict]: """Load data from files in R/qtl2 zip bundle.""" def __default_process_value_non_transposed__(val: dict) -> dict: return { key: replace_na_strings(cdata, value) for key,value in val.items() } def __default_process_value_transposed__( id_key: str, ids: tuple[str, ...], vals: tuple[str, ...]) -> tuple[dict, ...]: """Default values processor for transposed files.""" return tuple( dict(zip([id_key, replace_na_strings(cdata, vals[0])], items)) for items in zip( ids, (replace_na_strings(cdata, val) for val in vals[1:]))) process_value = process_value or __default_process_value_non_transposed__ process_transposed_value = ( process_transposed_value or __default_process_value_transposed__) try: if isinstance(cdata[member_key], list): for row in (line for lines in (file_data( zfile, member_key, {**cdata, member_key: innerfile}, process_value, process_transposed_value) for innerfile in cdata[member_key]) for line in lines): yield row return if not cdata.get(f"{member_key}_transposed", False): for row in with_non_transposed(zfile, member_key, cdata, process_value): yield row return for row in with_transposed( zfile, member_key, cdata, process_transposed_value): yield row except KeyError as exc: raise MissingFileException(*exc.args) from exc def cross_information(zfile: ZipFile, cdata: dict) -> Iterator[dict]: """Load cross information where present.""" cdata_cross_info = cdata.get("cross_info", {}) cross_info_file_key = "covar" new_cdata = {**cdata} sex_fields = (cdata.get("sex",{}).get("covar",""),) if "file" in cdata_cross_info: cross_info_file_key = "gnqc_cross_info_file" new_cdata = {**cdata, "gnqc_cross_info_file": cdata_cross_info["file"]} for row in file_data(zfile, cross_info_file_key, new_cdata, *make_process_data_covar(cdata)): yield { key: chain(value, partial(replace_cross_info, cdata=cdata)) for key, value in row.items() if key not in sex_fields} def sex_information(zfile: ZipFile, cdata: dict) -> Iterator[dict]: """Load cross information where present.""" cdata_sex_info = cdata.get("sex", {}) sex_info_file_key = "covar" new_cdata = {**cdata} ci_fields = (cdata.get("cross_info",{}).get("covar",""),) if "file" in cdata_sex_info: sex_info_file_key = "gnqc_sex_info_file" new_cdata = {**cdata, "gnqc_sex_info_file": cdata_sex_info["file"]} for row in file_data(zfile, sex_info_file_key, new_cdata, *make_process_data_covar(cdata)): yield { key: chain(value, partial(replace_sex_info, cdata=cdata)) for key, value in row.items() if key not in ci_fields} def genotype_data(zfile: ZipFile): """Convenience function to genotype data from R/qtl2 bundle.""" cdata = control_data(zfile) return file_data(zfile, "geno", cdata, *make_process_data_geno(cdata)) def raw_file_data(zipfilepath: Union[str, Path], memberfilename: str) -> Iterator[str]: """Read the raw text from a file in the R/qtl2 bundle.""" with (ZipFile(str(zipfilepath), "r") as zfile, zfile.open(memberfilename) as innerfile): wrappedfile = io.TextIOWrapper(innerfile) for line in wrappedfile: yield line def strip_comments(rawdata: Iterator[str], commentchar) -> Iterator[str]: """Remove comments from raw text.""" return (line for line in rawdata if not line.startswith(commentchar)) def missing_value_codes_to_none(value: str, nastrings: tuple[str, ...]) -> Optional[str]: """ If 'value' is a missing value code, return `None`, otherwise return 'value'. """ return value if value not in nastrings else None def replace_genotype_codes(value: str, genocodes: dict): """Convert genotype codes into values specified in control file.""" return genocodes.get(value, value) def read_control_file(zipfilepath: Union[str, Path]) -> dict: """Read control data.""" with ZipFile(str(zipfilepath), "r") as zfile: # move `control_data` code here and replace existing function. cdata = control_data(zfile) return { **cdata, **{ ftype: ([cdata[ftype]] if isinstance(cdata[ftype], str) else cdata[ftype]) for ftype in FILE_TYPES if bool(cdata.get(ftype)) } } def read_file_data( zipfilepath: Union[str, Path], memberfilename: str, processfile: Callable[[Iterator[str]], Iterator[str]] = lambda itr: itr, processline: Callable[[str], str] = lambda line: line, processfield: Callable[ [Optional[str]], Optional[str]] = lambda val: val) -> Iterator[ tuple[Optional[str], ...]]: """Read a single file from the bundle processing each field.""" cdata = read_control_file(zipfilepath) return ( tuple(processfield(field.strip()) for field in processline(row.strip()).split(cdata["sep"])) for row in processfile( strip_comments( raw_file_data(zipfilepath, memberfilename), cdata["comment.char"]))) def read_geno_file_data( zipfilepath: Union[str, Path], memberfilename: str) -> Iterator[tuple[Optional[str], ...]]: """Read a 'geno' file from the R/qtl2 bundle.""" cdata = read_control_file(zipfilepath) return read_file_data( zipfilepath, memberfilename, processfield=partial( replace_genotype_codes, genocodes=cdata.get("genotypes", {}))) def load_samples(zipfilepath: Union[str, Path], member: str, transposed: bool) -> tuple[str, ...]: """Load the samples/cases/individuals from file 'member'.""" filedata = read_geno_file_data(zipfilepath, member) samples: set[str] = set() if transposed: samples.update( item for item in next(filedata)[1:] if item is not None) else: try: next(filedata)# Ignore first row. samples.update( line[0] for line in filedata if line[0] is not None) except StopIteration:# Empty file. pass return tuple(samples) def read_text_file(filepath: Union[str, Path]) -> Iterator[str]: """Read the raw text from a text file.""" with open(filepath, "r", encoding="utf8") as _file: for line in _file: yield line def read_csv_file(filepath: Union[str, Path], separator: str = ",", comment_char: str = "#") -> Iterator[str]: """Read a file as a csv file.""" for line in read_text_file(filepath): if line.startswith(comment_char): continue yield tuple(field.strip() for field in line.split(separator))