"""The R/qtl2 parsing and processing code.""" import io import csv import json from zipfile import ZipFile from functools import reduce from typing import Iterator, Iterable, Callable import yaml from quality_control.parsing import take from r_qtl.errors import InvalidFormat def thread_op(value, *functions): """Thread the `value` through the sequence of `functions`.""" return reduce(lambda result, func: func(result), functions, value) def control_data(zfile: ZipFile) -> dict: """Retrieve the control file from the zip file info.""" files = tuple(filename for filename in zfile.namelist() if (filename.endswith(".yaml") or filename.endswith(".json"))) num_files = len(files) if num_files == 0: raise InvalidFormat("Expected a json or yaml control file.") if num_files > 1: raise InvalidFormat("Found more than one possible control file.") return (json.loads(zfile.read(files[0])) if files[0].endswith(".json") else yaml.safe_load(zfile.read(files[0]))) def with_non_transposed(zfile: ZipFile, member_key: str, cdata: dict, process_value: Callable[ [dict], dict] = lambda val: val) -> Iterator[dict]: """Process non-transposed file values Arguments: zfile: A zipfile object from opening a R/qtl2 bundle. member_key: A key to retrieve the member file to process from the file. cdata: The control data from the R/qtl2 bundle read from the JSON/YAML file. process_value: A function to process the values from the file. """ def not_comment_line(line): return not line.startswith(cdata.get("comment.char", "#")) with zfile.open(cdata[member_key]) as innerfile: reader = csv.DictReader( filter(not_comment_line, io.TextIOWrapper(innerfile)), delimiter=cdata.get("sep", ",")) for row in reader: yield process_value(row) def __make_organise_by_id__(id_key): """Return a function to use with `reduce` to organise values by some identifier.""" def __organiser__(acc, item): row = acc.get(item[id_key], {}) return {**acc, item[id_key]: {**row, **item}} return __organiser__ def __batch_of_n__(iterable: Iterable, num): """Return a batch of `num` items or less from the `iterable`.""" while True: items = take(iterable, num) if len(items) <= 0: break yield items def with_transposed(zfile: ZipFile, member_key: str, cdata: dict, process_value: Callable[ [str, tuple[str, ...], tuple[str, ...]], tuple[dict, ...]]) -> Iterator[dict]: """Process transposed file values Arguments: zfile: A zipfile object from opening a R/qtl2 bundle. member_key: A key to retrieve the member file to process from the file. cdata: The control data from the R/qtl2 bundle read from the JSON/YAML file. process_value: A function to process the values from the file. """ with zfile.open(cdata[member_key]) as innerfile: lines = (tuple(field.strip() for field in line.strip().split(cdata.get("sep", ","))) for line in filter(lambda line: not line.startswith("#"), io.TextIOWrapper(innerfile))) try: id_line = next(lines) id_key, headers = id_line[0], id_line[1:] for _key, row in reduce(# type: ignore[var-annotated] __make_organise_by_id__(id_key), (row for batch in __batch_of_n__(lines, 300) for line in batch for row in process_value(id_key, headers, line)), {}).items(): yield row except StopIteration: pass def genotype_data(zfile: ZipFile, cdata: dict) -> Iterator[dict]: """Load the genotype file, making use of the control data.""" def replace_genotype_codes(val): return cdata["genotypes"].get(val, val) def replace_na_strings(val): nastrings = cdata.get("na.strings") if bool(nastrings): return (None if val in nastrings else val) return val if not cdata.get("geno_transposed", False): for line in with_non_transposed( zfile, "geno", cdata, lambda row: { key: thread_op(value, replace_genotype_codes, replace_na_strings) for key,value in row.items() }): yield line return None def __merge__(key, samples, line): marker = line[0] return tuple( dict(zip( [key, marker], (thread_op(item, replace_genotype_codes, replace_na_strings) for item in items))) for items in zip(samples, line[1:])) for row in with_transposed(zfile, "geno", cdata, __merge__): yield row def map_data(zfile: ZipFile, map_type: str, cdata: dict) -> Iterator[dict]: """Read gmap files to get the genome mapping data""" assert map_type in ("genetic-map", "physical-map"), "Invalid map type" map_file_key = { "genetic-map": "gmap", "physical-map": "pmap" }[map_type] transposed_dict = { "genetic-map": "gmap_transposed", "physical-map": "pmap_transposed" } if not cdata.get(transposed_dict[map_type], False): for row in with_non_transposed(zfile, map_file_key, cdata): yield row return None def __merge__(key, samples, line): marker = line[0] return tuple(dict(zip([key, marker], items)) for items in zip(samples, line[1:])) for row in with_transposed(zfile, map_file_key, cdata, __merge__): yield row def phenotype_data(zfile: ZipFile, cdata: dict) -> Iterator[dict]: """Load phenotype file data.""" if not cdata.get("pheno_transposed", False): for row in with_non_transposed(zfile, "pheno", cdata, lambda val: val): yield row return def __merge__(id_key, ids, vals): return tuple(dict(zip([id_key, vals[0]], items)) for items in zip(ids, vals[1:])) for row in with_transposed(zfile, "pheno", cdata, __merge__): yield row