"""The R/qtl2 parsing and processing code.""" import io import csv import json from zipfile import ZipFile from functools import reduce from typing import Iterator, Iterable, Callable import yaml from quality_control.parsing import take from r_qtl.errors import InvalidFormat def thread_op(value, *functions): """Thread the `value` through the sequence of `functions`.""" return reduce(lambda result, func: func(result), functions, value) def control_data(zfile: ZipFile) -> dict: """Retrieve the control file from the zip file info.""" files = tuple(filename for filename in zfile.namelist() if (filename.endswith(".yaml") or filename.endswith(".json"))) num_files = len(files) if num_files == 0: raise InvalidFormat("Expected a json or yaml control file.") if num_files > 1: raise InvalidFormat("Found more than one possible control file.") return (json.loads(zfile.read(files[0])) if files[0].endswith(".json") else yaml.safe_load(zfile.read(files[0]))) def genotype_data(zfile: ZipFile, cdata: dict) -> Iterator[dict]: """Load the genotype file, making use of the control data.""" def replace_genotype_codes(val): return cdata["genotypes"].get(val, val) def replace_na_strings(val): nastrings = cdata.get("na.strings") if bool(nastrings): return (None if val in nastrings else val) return val if not cdata.get("geno_transposed", False): with zfile.open(cdata["geno"]) as genofile: reader = csv.DictReader( filter(lambda line: not line.startswith("#"), io.TextIOWrapper(genofile)), delimiter=cdata.get("sep", ",")) for row in reader: yield { key: thread_op( value, replace_genotype_codes, replace_na_strings) for key,value in row.items() } def __merge__(key, samples, line): marker = line[0] return tuple( dict(zip( [key, marker], (thread_op(item, replace_genotype_codes, replace_na_strings) for item in items))) for items in zip(samples, line[1:])) def __n_batch__(iterable: Iterable, num): while True: items = take(iterable, num) if len(items) <= 0: break yield items if cdata.get("geno_transposed", False): with zfile.open(cdata["geno"]) as genofile: lines = (line.strip().split(cdata.get("sep", ",")) for line in filter(lambda line: not line.startswith("#"), io.TextIOWrapper(genofile))) try: id_line = next(lines) id_key, samples = id_line[0], id_line[1:] def __organise_by_id__(acc, item): row = acc.get(item[id_key], {}) return {**acc, item[id_key]: {**row, **item}} for _key, row in reduce(# type: ignore[var-annotated] __organise_by_id__, (row for batch in __n_batch__(lines, 300) for line in batch for row in __merge__(id_key, samples, line)), {}).items(): yield row except StopIteration: return None def map_data(zfile: ZipFile, map_type: str, cdata: dict) -> tuple[dict, ...]: """Read gmap files to get the genome mapping data""" assert map_type in ("genetic-map", "physical-map"), "Invalid map type" map_file = cdata[{ "genetic-map": "gmap", "physical-map": "pmap" }[map_type]] transposed_dict = { "genetic-map": "gmap_transposed", "physical-map": "pmap_transposed" } if not cdata.get(transposed_dict[map_type], False): with zfile.open(map_file) as gmapfile: reader = csv.DictReader( filter(lambda line: not line.startswith("#"), io.TextIOWrapper(gmapfile)), delimiter=cdata.get("sep", ",")) return tuple(row for row in reader) with zfile.open(map_file) as gmapfile: lines = [[field.strip() for field in line.strip().split(cdata.get("sep", ","))] for line in filter(lambda line: not line.startswith("#"), io.TextIOWrapper(gmapfile))] headers = tuple(line[0] for line in lines) return reduce( lambda gmap, row: gmap + (dict(zip(headers, row)),), zip(*(line[1:] for line in lines)), tuple())