"""The R/qtl2 parsing and processing code.""" import io import csv import json import yaml from pathlib import Path from functools import reduce from typing import Any, List, Union, Iterator from zipfile import ZipFile, ZipInfo, is_zipfile from r_qtl.errors import InvalidFormat def thread_op(value, *functions): """Thread the `value` through the sequence of `functions`.""" return reduce(lambda result, func: func(result), functions, value) def control_data(zfile: ZipFile) -> dict: """Retrieve the control file from the zip file info.""" files = tuple(filename for filename in zfile.namelist() if (filename.endswith(".yaml") or filename.endswith(".json"))) num_files = len(files) if num_files == 0: raise InvalidFormat("Expected a json or yaml control file.") if num_files > 1: raise InvalidFormat("Found more than one possible control file.") return (json.loads(zfile.read(files[0])) if files[0].endswith(".json") else yaml.safe_load(zfile.read(files[0]))) def genotype_metadata(zfile: ZipFile, cdata: dict) -> dict: """Read Individual ID key and the marker names.""" line_num = 0 with zfile.open(cdata["geno"]) as genofile: for line in filter(lambda line: not line.startswith("#"), io.TextIOWrapper(genofile)): line_parts = line.strip().split(cdata.get("sep", ",")) return { "individual_id_key": line_parts[0].strip(), "markers": tuple(marker.strip() for marker in line_parts[1:]) } def genotype_data(zfile: ZipFile, cdata: dict) -> Iterator[dict]: """Load the genotype file, making use of the control data.""" def replace_genotype_codes(val): return cdata["genotypes"].get(val, val) def replace_na_strings(val): return (None if val in cdata["na.strings"] else val) if not cdata.get("geno_transposed", False): with zfile.open(cdata["geno"]) as genofile: reader = csv.DictReader( filter(lambda line: not line.startswith("#"), io.TextIOWrapper(genofile)), delimiter=cdata.get("sep", ",")) for row in reader: yield { key: thread_op( value, replace_genotype_codes, replace_na_strings) for key,value in row.items() } def __merge__(key, samples, line): marker = line[0] return ( dict(zip( [key, marker], (thread_op(item, replace_genotype_codes, replace_na_strings) for item in items))) for items in zip(samples, line[1:])) if cdata.get("geno_transposed", False): with zfile.open(cdata["geno"]) as genofile: lines = (line.strip().split(cdata.get("sep", ",")) for line in filter(lambda line: not line.startswith("#"), io.TextIOWrapper(genofile))) id_line = next(lines) id_key, samples = id_line[0], id_line[1:] for line in lines: for row in __merge__(id_key, samples, line): yield row def map_data(zfile: ZipFile, map_type: str, cdata: dict) -> dict: """Read gmap files to get the genome mapping data""" assert map_type in ("genetic-map", "physical-map"), "Invalid map type" map_file = cdata[{ "genetic-map": "gmap", "physical-map": "pmap" }[map_type]] transposed_dict = { "genetic-map": "gmap_transposed", "physical-map": "pmap_transposed" } if not cdata.get(transposed_dict[map_type], False): with zfile.open(map_file) as gmapfile: reader = csv.DictReader( filter(lambda line: not line.startswith("#"), io.TextIOWrapper(gmapfile)), delimiter=cdata.get("sep", ",")) return tuple(row for row in reader) with zfile.open(map_file) as gmapfile: lines = [[field.strip() for field in line.strip().split(cdata.get("sep", ","))] for line in filter(lambda line: not line.startswith("#"), io.TextIOWrapper(gmapfile))] headers = tuple(line[0] for line in lines) return reduce( lambda gmap, row: gmap + (dict(zip(headers, row)),), zip(*(line[1:] for line in lines)), tuple()) def read_r_qtl2_files(filepath: Path): """Read R/qtl2 format zip files.""" with ZipFile(filepath, "r") as zfile: cf = control_data(zfile) raise NotImplementedError("Implementation is incomplete.")