"""The R/qtl2 parsing and processing code.""" import io import csv import json from zipfile import ZipFile from functools import reduce, partial from typing import Iterator, Iterable, Callable, Optional import yaml from functional_tools import take, chain from r_qtl.errors import InvalidFormat def control_data(zfile: ZipFile) -> dict: """Retrieve the control file from the zip file info.""" files = tuple(filename for filename in zfile.namelist() if (filename.endswith(".yaml") or filename.endswith(".json"))) num_files = len(files) if num_files == 0: raise InvalidFormat("Expected a json or yaml control file.") if num_files > 1: raise InvalidFormat("Found more than one possible control file.") return { "na.strings": ["NA"], "comment.char": "#", "sep": ",", **{ f"{key}_transposed": False for key in ( "geno", "founder_geno", "pheno", "covar", "phenocovar", "gmap", "pmap", "phenose") }, **(json.loads(zfile.read(files[0])) if files[0].endswith(".json") else yaml.safe_load(zfile.read(files[0]))) } def replace_na_strings(cdata, val): """Replace values indicated in `na.strings` with `None`.""" return (None if val in cdata.get("na.strings", ["NA"]) else val) def with_non_transposed(zfile: ZipFile, member_key: str, cdata: dict, process_value: Callable[ [dict], dict] = lambda val: val) -> Iterator[dict]: """Process non-transposed file values Arguments: zfile: A zipfile object from opening a R/qtl2 bundle. member_key: A key to retrieve the member file to process from the file. cdata: The control data from the R/qtl2 bundle read from the JSON/YAML file. process_value: A function to process the values from the file. """ def not_comment_line(line): return not line.startswith(cdata.get("comment.char", "#")) sep = cdata.get("sep", ",") with zfile.open(cdata[member_key]) as innerfile: try: wrapped_file = io.TextIOWrapper(innerfile) firstrow = tuple( field.strip() for field in next(filter(not_comment_line, wrapped_file)).strip().split(sep)) id_key = firstrow[0] wrapped_file.seek(0) reader = csv.DictReader(filter(not_comment_line, wrapped_file), delimiter=sep) for row in reader: processed = process_value(row) yield { "id": processed[id_key], **{ key: value for key, value in processed.items() if key != id_key } } except StopIteration as exc: raise InvalidFormat("The file has no rows!") from exc def __make_organise_by_id__(id_key): """Return a function to use with `reduce` to organise values by some identifier.""" def __organiser__(acc, item): row = acc.get(item[id_key], {}) return {**acc, item[id_key]: {**row, **item}} return __organiser__ def __batch_of_n__(iterable: Iterable, num): """Return a batch of `num` items or less from the `iterable`.""" while True: items = take(iterable, num) if len(items) <= 0: break yield items def with_transposed(zfile: ZipFile, member_key: str, cdata: dict, process_value: Callable[ [str, tuple[str, ...], tuple[str, ...]], tuple[dict, ...]]) -> Iterator[dict]: """Process transposed file values Arguments: zfile: A zipfile object from opening a R/qtl2 bundle. member_key: A key to retrieve the member file to process from the file. cdata: The control data from the R/qtl2 bundle read from the JSON/YAML file. process_value: A function to process the values from the file. """ with zfile.open(cdata[member_key]) as innerfile: lines = (tuple(field.strip() for field in line.strip().split(cdata.get("sep", ","))) for line in filter(lambda line: not line.startswith("#"), io.TextIOWrapper(innerfile))) try: id_line = next(lines) id_key, headers = id_line[0], id_line[1:] for _key, row in reduce(# type: ignore[var-annotated] __make_organise_by_id__(id_key), (row for batch in __batch_of_n__(lines, 300) for line in batch for row in process_value(id_key, headers, line)), {}).items(): yield { "id": row[id_key], **{ key: value for key, value in row.items() if key != id_key }} except StopIteration: pass def make_process_data_geno(cdata) -> tuple[ Callable[[dict], dict], Callable[[str, tuple[str, ...], tuple[str, ...]], tuple[dict, ...]]]: """Build functions to process genotype data.""" def replace_genotype_codes(val): return cdata["genotypes"].get(val, val) def __non_transposed__(row: dict) -> dict: return { key: chain(value, replace_genotype_codes, partial(replace_na_strings, cdata)) for key,value in row.items() } def __transposed__(id_key: str, ids: tuple[str, ...], vals: tuple[str, ...]) -> tuple[dict, ...]: return tuple( dict(zip( [id_key, vals[0]], (chain(item, replace_genotype_codes, partial(replace_na_strings, cdata)) for item in items))) for items in zip(ids, vals[1:])) return (__non_transposed__, __transposed__) def replace_sex_info(val, cdata: dict): """Replace sex information in files with values in the control data.""" sex_info = cdata.get("sex", False) if bool(sex_info): return sex_info.get(val, val) return val def replace_cross_info(val, cdata: dict): """ Replace cross information in files with the values in the control data. """ cross_info = cdata.get("cross_info", False) if bool(cross_info): return cross_info.get(val, val) return val def make_process_data_covar(cdata) -> tuple[ Callable[[dict], dict], Callable[[str, tuple[str, ...], tuple[str, ...]], tuple[dict, ...]]]: """Build functions to process sex and cross information in covar files.""" rep_sex_info = partial(replace_sex_info, cdata=cdata) rep_cross_info = partial(replace_cross_info, cdata=cdata) def non_transposed(row: dict) -> dict: return { key: chain(value, rep_sex_info, rep_cross_info) for key,value in row.items() } def transposed(id_key: str, ids: tuple[str, ...], vals: tuple[str, ...]) -> tuple[dict, ...]: return tuple( dict(zip( [id_key, vals[0]], (chain(item, rep_sex_info, rep_cross_info) for item in items))) for items in zip(ids, vals[1:])) return (non_transposed, transposed) def file_data(zfile: ZipFile, member_key: str, cdata: dict, process_value: Optional[Callable[[dict], dict]] = None, process_transposed_value: Optional[Callable[ [str, tuple[str, ...], tuple[str, ...]], tuple[dict, ...]]] = None) -> Iterator[dict]: """Load data from files in R/qtl2 zip bundle.""" def __default_process_value_non_transposed__(val: dict) -> dict: return { key: replace_na_strings(cdata, value) for key,value in val.items() } def __default_process_value_transposed__( id_key: str, ids: tuple[str, ...], vals: tuple[str, ...]) -> tuple[dict, ...]: """Default values processor for transposed files.""" return tuple( dict(zip([id_key, replace_na_strings(cdata, vals[0])], items)) for items in zip( ids, (replace_na_strings(cdata, val) for val in vals[1:]))) process_value = process_value or __default_process_value_non_transposed__ process_transposed_value = ( process_transposed_value or __default_process_value_transposed__) try: if isinstance(cdata[member_key], list): for row in (line for lines in (file_data( zfile, member_key, {**cdata, member_key: innerfile}, process_value, process_transposed_value) for innerfile in cdata[member_key]) for line in lines): yield row return if not cdata.get(f"{member_key}_transposed", False): for row in with_non_transposed(zfile, member_key, cdata, process_value): yield row return for row in with_transposed( zfile, member_key, cdata, process_transposed_value): yield row except KeyError as exc: raise InvalidFormat(*exc.args) from exc def cross_information(zfile: ZipFile, cdata: dict) -> Iterator[dict]: """Load cross information where present.""" cdata_cross_info = cdata.get("cross_info", {}) cross_info_file_key = "covar" new_cdata = {**cdata} sex_fields = (cdata.get("sex",{}).get("covar",""),) if "file" in cdata_cross_info: cross_info_file_key = "gnqc_cross_info_file" new_cdata = {**cdata, "gnqc_cross_info_file": cdata_cross_info["file"]} for row in file_data(zfile, cross_info_file_key, new_cdata, *make_process_data_covar(cdata)): yield { key: chain(value, partial(replace_cross_info, cdata=cdata)) for key, value in row.items() if key not in sex_fields} def sex_information(zfile: ZipFile, cdata: dict) -> Iterator[dict]: """Load cross information where present.""" cdata_sex_info = cdata.get("sex", {}) sex_info_file_key = "covar" new_cdata = {**cdata} ci_fields = (cdata.get("cross_info",{}).get("covar",""),) if "file" in cdata_sex_info: sex_info_file_key = "gnqc_sex_info_file" new_cdata = {**cdata, "gnqc_sex_info_file": cdata_sex_info["file"]} for row in file_data(zfile, sex_info_file_key, new_cdata, *make_process_data_covar(cdata)): yield { key: chain(value, partial(replace_sex_info, cdata=cdata)) for key, value in row.items() if key not in ci_fields} def validate_bundle(zfile: ZipFile): """Ensure the R/qtl2 bundle is valid.""" cdata = control_data(zfile) def __member_exists_p__(zfile, member): if isinstance(member, str): zfile.getinfo(member) else: for inner in member: zfile.getinfo(inner) try: for member in (key for key in cdata.keys() if key in ( "geno", "founder_geno", "pheno", "covar", "phenocovar", "gmap", "pmap")): __member_exists_p__(zfile, cdata[member]) if "file" in cdata.get("sex", {}): __member_exists_p__(zfile, cdata["sex"]["file"]) if "file" in cdata.get("cross_info", {}): __member_exists_p__(zfile, cdata["cross_info"]["file"]) except KeyError as kerr: raise InvalidFormat(*kerr.args) from kerr def genotype_data(zfile: ZipFile): """Convenience function to genotype data from R/qtl2 bundle.""" cdata = control_data(zfile) return file_data(zfile, "geno", cdata, *make_process_data_geno(cdata))