"""The R/qtl2 parsing and processing code.""" import io import csv import json from zipfile import ZipFile from functools import reduce, partial from typing import Iterator, Iterable, Callable import yaml from quality_control.parsing import take from r_qtl.errors import InvalidFormat def thread_op(value, *functions): """Thread the `value` through the sequence of `functions`.""" return reduce(lambda result, func: func(result), functions, value) def control_data(zfile: ZipFile) -> dict: """Retrieve the control file from the zip file info.""" files = tuple(filename for filename in zfile.namelist() if (filename.endswith(".yaml") or filename.endswith(".json"))) num_files = len(files) if num_files == 0: raise InvalidFormat("Expected a json or yaml control file.") if num_files > 1: raise InvalidFormat("Found more than one possible control file.") return (json.loads(zfile.read(files[0])) if files[0].endswith(".json") else yaml.safe_load(zfile.read(files[0]))) def with_non_transposed(zfile: ZipFile, member_key: str, cdata: dict, process_value: Callable[ [dict], dict] = lambda val: val) -> Iterator[dict]: """Process non-transposed file values Arguments: zfile: A zipfile object from opening a R/qtl2 bundle. member_key: A key to retrieve the member file to process from the file. cdata: The control data from the R/qtl2 bundle read from the JSON/YAML file. process_value: A function to process the values from the file. """ def not_comment_line(line): return not line.startswith(cdata.get("comment.char", "#")) sep = cdata.get("sep", ",") with zfile.open(cdata[member_key]) as innerfile: wrapped_file = io.TextIOWrapper(innerfile) firstrow = tuple( field.strip() for field in next(filter(not_comment_line, wrapped_file)).strip().split(sep)) id_key = firstrow[0] wrapped_file.seek(0) reader = csv.DictReader(filter(not_comment_line, wrapped_file), delimiter=sep) for row in reader: processed = process_value(row) yield { "id": processed[id_key], **{ key: value for key, value in processed.items() if key != id_key } } def __make_organise_by_id__(id_key): """Return a function to use with `reduce` to organise values by some identifier.""" def __organiser__(acc, item): row = acc.get(item[id_key], {}) return {**acc, item[id_key]: {**row, **item}} return __organiser__ def __batch_of_n__(iterable: Iterable, num): """Return a batch of `num` items or less from the `iterable`.""" while True: items = take(iterable, num) if len(items) <= 0: break yield items def with_transposed(zfile: ZipFile, member_key: str, cdata: dict, process_value: Callable[ [str, tuple[str, ...], tuple[str, ...]], tuple[dict, ...]]) -> Iterator[dict]: """Process transposed file values Arguments: zfile: A zipfile object from opening a R/qtl2 bundle. member_key: A key to retrieve the member file to process from the file. cdata: The control data from the R/qtl2 bundle read from the JSON/YAML file. process_value: A function to process the values from the file. """ with zfile.open(cdata[member_key]) as innerfile: lines = (tuple(field.strip() for field in line.strip().split(cdata.get("sep", ","))) for line in filter(lambda line: not line.startswith("#"), io.TextIOWrapper(innerfile))) try: id_line = next(lines) id_key, headers = id_line[0], id_line[1:] for _key, row in reduce(# type: ignore[var-annotated] __make_organise_by_id__(id_key), (row for batch in __batch_of_n__(lines, 300) for line in batch for row in process_value(id_key, headers, line)), {}).items(): yield { "id": row[id_key], **{ key: value for key, value in row.items() if key != id_key }} except StopIteration: pass def make_process_data_geno(cdata) -> tuple[ Callable[[dict], dict], Callable[[str, tuple[str, ...], tuple[str, ...]], tuple[dict, ...]]]: """Build functions to process genotype data.""" def replace_genotype_codes(val): return cdata["genotypes"].get(val, val) def replace_na_strings(val): nastrings = cdata.get("na.strings") if bool(nastrings): return (None if val in nastrings else val) return val def __non_transposed__(row: dict) -> dict: return { key: thread_op(value, replace_genotype_codes, replace_na_strings) for key,value in row.items() } def __transposed__(id_key: str, ids: tuple[str, ...], vals: tuple[str, ...]) -> tuple[dict, ...]: return tuple( dict(zip( [id_key, vals[0]], (thread_op(item, replace_genotype_codes, replace_na_strings) for item in items))) for items in zip(ids, vals[1:])) return (__non_transposed__, __transposed__) def replace_sex_info(val, cdata: dict): """Replace sex information in files with values in the control data.""" sex_info = cdata.get("sex", False) if bool(sex_info): return sex_info.get(val, val) return val def replace_cross_info(val, cdata: dict): """ Replace cross information in files with the values in the control data. """ cross_info = cdata.get("cross_info", False) if bool(cross_info): return cross_info.get(val, val) return val def make_process_data_covar(cdata) -> tuple[ Callable[[dict], dict], Callable[[str, tuple[str, ...], tuple[str, ...]], tuple[dict, ...]]]: """Build functions to process sex and cross information in covar files.""" rep_sex_info = partial(replace_sex_info, cdata=cdata) rep_cross_info = partial(replace_cross_info, cdata=cdata) def non_transposed(row: dict) -> dict: return { key: thread_op(value, rep_sex_info, rep_cross_info) for key,value in row.items() } def transposed(id_key: str, ids: tuple[str, ...], vals: tuple[str, ...]) -> tuple[dict, ...]: return tuple( dict(zip( [id_key, vals[0]], (thread_op(item, rep_sex_info, rep_cross_info) for item in items))) for items in zip(ids, vals[1:])) return (non_transposed, transposed) def __default_process_value_transposed__( id_key: str, ids: tuple[str, ...], vals: tuple[str, ...]) -> tuple[dict, ...]: """Default values processor for transposed files.""" return tuple( dict(zip([id_key, vals[0]], items)) for items in zip(ids, vals[1:])) def file_data(zfile: ZipFile, member_key: str, cdata: dict, process_value: Callable[[dict], dict] = lambda val: val, process_transposed_value: Callable[ [str, tuple[str, ...], tuple[str, ...]], tuple[dict, ...]] = __default_process_value_transposed__) -> Iterator[dict]: """Load data from files in R/qtl2 zip bundle.""" try: if isinstance(cdata[member_key], list): for row in (line for lines in (file_data( zfile, member_key, {**cdata, member_key: innerfile}, process_value, process_transposed_value) for innerfile in cdata[member_key]) for line in lines): yield row return if not cdata.get(f"{member_key}_transposed", False): for row in with_non_transposed(zfile, member_key, cdata, process_value): yield row return for row in with_transposed( zfile, member_key, cdata, process_transposed_value): yield row except KeyError as exc: raise InvalidFormat(*exc.args) from exc def cross_information(zfile: ZipFile, cdata: dict) -> Iterator[dict]: """Load cross information where present.""" cdata_cross_info = cdata.get("cross_info", {}) cross_info_file_key = "covar" new_cdata = {**cdata} sex_fields = (cdata.get("sex",{}).get("covar",""),) if "file" in cdata_cross_info: cross_info_file_key = "gnqc_cross_info_file" new_cdata = {**cdata, "gnqc_cross_info_file": cdata_cross_info["file"]} for row in file_data(zfile, cross_info_file_key, new_cdata, *make_process_data_covar(cdata)): yield { key: thread_op(value, partial(replace_cross_info, cdata=cdata)) for key, value in row.items() if key not in sex_fields} def sex_information(zfile: ZipFile, cdata: dict) -> Iterator[dict]: """Load cross information where present.""" cdata_sex_info = cdata.get("sex", {}) sex_info_file_key = "covar" new_cdata = {**cdata} ci_fields = (cdata.get("cross_info",{}).get("covar",""),) if "file" in cdata_sex_info: sex_info_file_key = "gnqc_sex_info_file" new_cdata = {**cdata, "gnqc_sex_info_file": cdata_sex_info["file"]} for row in file_data(zfile, sex_info_file_key, new_cdata, *make_process_data_covar(cdata)): yield { key: thread_op(value, partial(replace_sex_info, cdata=cdata)) for key, value in row.items() if key not in ci_fields} def validate_bundle(zfile: ZipFile): """Ensure the R/qtl2 bundle is valid.""" cdata = control_data(zfile) def __member_exists_p__(zfile, member): if isinstance(member, str): zfile.getinfo(member) else: for inner in member: zfile.getinfo(inner) try: for member in (key for key in cdata.keys() if key in ( "geno", "founder_geno", "pheno", "covar", "phenocovar", "gmap", "pmap")): __member_exists_p__(zfile, cdata[member]) if "file" in cdata.get("sex", {}): __member_exists_p__(zfile, cdata["sex"]["file"]) if "file" in cdata.get("cross_info", {}): __member_exists_p__(zfile, cdata["cross_info"]["file"]) except KeyError as kerr: raise InvalidFormat(*kerr.args) from kerr