From 3723cc8fe3977f292e636e98278b73c88b2b9677 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Wed, 20 Dec 2023 15:57:38 +0300 Subject: Read genotype files --- r_qtl/r_qtl2.py | 40 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) (limited to 'r_qtl') diff --git a/r_qtl/r_qtl2.py b/r_qtl/r_qtl2.py index 9c9d67b..508d3eb 100644 --- a/r_qtl/r_qtl2.py +++ b/r_qtl/r_qtl2.py @@ -1,13 +1,20 @@ """The R/qtl2 parsing and processing code.""" +import io +import csv import json import yaml from pathlib import Path -from typing import List, Union +from functools import reduce +from typing import Any, List, Union, Iterator from zipfile import ZipFile, ZipInfo, is_zipfile from quality_control.debug import __pk__ from r_qtl.errors import InvalidFormat +def thread_op(value, *functions): + """Thread the `value` through the sequence of `functions`.""" + return reduce(lambda result, func: func(result), functions, value) + def control_data(zfile: ZipFile) -> dict: """Retrieve the control file from the zip file info.""" files = tuple(filename @@ -24,6 +31,37 @@ def control_data(zfile: ZipFile) -> dict: if files[0].endswith(".json") else yaml.safe_load(zfile.read(files[0]))) +def genotype_metadata(zfile: ZipFile, cdata: dict) -> dict: + """Read Individual ID key and the marker names.""" + # TODO: Handle transposed files + line_num = 0 + with zfile.open(cdata["geno"]) as genofile: + for line in filter(lambda line: not line.startswith("#"), + io.TextIOWrapper(genofile)): + line_parts = line.strip().split(cdata.get("sep", ",")) + return { + "individual_id_key": line_parts[0].strip(), + "markers": tuple(marker.strip() for marker in line_parts[1:]) + } + +def genotype_data(zfile: ZipFile, cdata: dict) -> Iterator[dict]: + """Load the genotype file, making use of the control data.""" + # TODO: Handle transposed files + with zfile.open(cdata["geno"]) as genofile: + reader = csv.DictReader(filter(lambda line: not line.startswith("#"), + io.TextIOWrapper(genofile)), + delimiter=cdata.get("sep", ",")) + for row in reader: + yield { + key: thread_op( + value, + # replace genotype codes + lambda val: cdata["genotypes"].get(val, val), + # replace N/A strings + lambda val: (None if val in cdata["na.strings"] else val)) + for key,value + in row.items() + } def read_r_qtl2_files(filepath: Path): """Read R/qtl2 format zip files.""" -- cgit v1.2.3