aboutsummaryrefslogtreecommitdiff
"""Procedures related rqtl computations"""
import os
from bisect import bisect
from typing import Dict, List, Tuple, Union

import numpy as np

from flask import current_app

from gn3.commands import compose_rqtl_cmd
from gn3.computations.gemma import generate_hash_of_string
from gn3.fs_helpers import get_hash_of_files


def generate_rqtl_cmd(
    rqtl_wrapper_cmd: str,
    rqtl_wrapper_kwargs: Dict,
    rqtl_wrapper_bool_kwargs: list,
) -> Dict:
    """Given the base rqtl_wrapper command and
    dict of keyword arguments, return the full rqtl_wrapper command and an
    output filename generated from a hash of the genotype and phenotype files"""

    # Generate a hash from contents of the genotype and phenotype files
    _hash = get_hash_of_files(
        [v for k, v in rqtl_wrapper_kwargs.items() if k in ["g", "p"]]
    )

    # Append to hash a hash of keyword arguments
    _hash += generate_hash_of_string(
        ",".join(
            [
                f"{k}:{v}"
                for k, v in rqtl_wrapper_kwargs.items()
                if k not in ["g", "p"]
            ]
        )
    )

    # Append to hash a hash of boolean keyword arguments
    _hash += generate_hash_of_string(",".join(rqtl_wrapper_bool_kwargs))

    # Temporarily substitute forward-slashes in hash with underscores
    _hash = _hash.replace("/", "_")

    _output_filename = f"{_hash}-output.csv"
    rqtl_wrapper_kwargs["filename"] = _output_filename

    return {
        "output_file": _output_filename,
        "rqtl_cmd": compose_rqtl_cmd(
            rqtl_wrapper_cmd=rqtl_wrapper_cmd,
            rqtl_wrapper_kwargs=rqtl_wrapper_kwargs,
            rqtl_wrapper_bool_kwargs=rqtl_wrapper_bool_kwargs,
        ),
    }


def process_rqtl_mapping(file_name: str) -> List:
    """Given an output file name, read in R/qtl results and return
    a List of marker objects"""
    marker_obs = []
    # Later I should probably redo this using csv.read to avoid the
    # awkwardness with removing quotes with [1:-1]
    with open(
        os.path.join(
            current_app.config.get("TMPDIR", "/tmp"), "output", file_name
        ),
        "r",
        encoding="utf-8",
    ) as the_file:
        for line in the_file:
            line_items = line.split(",")
            if line_items[1][1:-1] == "chr" or not line_items:
                # Skip header line
                continue

            # Convert chr to int if possible
            the_chr: Union[int, str]
            try:
                the_chr = int(line_items[1][1:-1])
            except ValueError:
                the_chr = line_items[1][1:-1]
            this_marker = {
                "name": line_items[0][1:-1],
                "chr": the_chr,
                "cM": float(line_items[2]),
                "Mb": float(line_items[2]),
                "lod_score": float(line_items[3]),
            }
            marker_obs.append(this_marker)

    return marker_obs


def process_rqtl_pairscan(file_name: str, geno_file: str) -> List:
    """Given an output file name, read in R/qtl pair-scan results and return
    a list of both the JSON needed for the d3panels figure and a list of results
    to be used when generating the results table (which will include marker names)"""
    figure_data = pairscan_for_figure(file_name)
    table_data = pairscan_for_table(file_name, geno_file)

    return [figure_data, table_data]


def pairscan_for_figure(file_name: str) -> Dict:
    """Given an output file name, read in R/qtl pair-scan results and return
    the JSON needed for the d3panels figure"""
    figure_data = {}

    # Open the file with the actual results, written as a list of lists
    with open(
        os.path.join(
            current_app.config.get("TMPDIR", "/tmp"), "output", file_name
        ),
        "r",
        encoding="utf8",
    ) as the_file:
        lod_results = []
        for i, line in enumerate(the_file):
            if i == 0:  # Skip first line
                continue
            line_items = [item.rstrip("\n") for item in line.split(",")]
            # Append all but first item in line
            lod_results.append(line_items[1:])
        figure_data["lod"] = lod_results

    # Open the map file with the list of markers/pseudomarkers and their
    # positions
    with open(
        os.path.join(
            current_app.config.get("TMPDIR", "/tmp"),
            "output",
            "MAP_" + file_name,
        ),
        "r",
        encoding="utf8",
    ) as the_file:
        chr_list = []  # type: List
        pos_list = []  # type: List
        for i, line in enumerate(the_file):
            if i == 0:  # Skip first line
                continue
            line_items = [item.rstrip("\n") for item in line.split(",")]
            chr_list.append(line_items[1][1:-1])
            pos_list.append(line_items[2])
        figure_data["chr"] = chr_list
        figure_data["pos"] = pos_list

    return figure_data


def get_marker_list(map_file: str) -> List:
    """
    Open the map file with the list of markers/pseudomarkers and create list of marker obs

    PARAMETERS:
    map_file: The map file output by R/qtl containing marker/pseudomarker positions
    """

    marker_list = []
    with open(
        os.path.join(
            current_app.config.get("TMPDIR", "/tmp"), "output", map_file
        ),
        "r",
        encoding="utf8",
    ) as map_fh:
        for line in map_fh.readlines()[1:]:
            line_items = [item.rstrip("\n") for item in line.split(",")]
            this_marker = {
                "name": line_items[0],
                "chr": line_items[1][
                    1:-1
                ],  # Strip quotes from beginning and end of chr string
                "pos": line_items[2],
            }

            marker_list.append(this_marker)

    return marker_list


def generate_table_rows(
    results_file: str, marker_list: List, original_markers: Dict
) -> List:
    """
    Open the file with the actual R/qtl pair-scan results and write them as
    they will be displayed in the results table

    PARAMETERS:
    results_file: The filename containing R/qtl pair-scan results
    marker_list: List of marker/pseudomarker names/positions from results
    original_markers: Dict of markers from the .geno file, for finding proximal/distal
                      markers to each pseudomarker
    """
    table_data = []
    with open(
        os.path.join(
            current_app.config.get("TMPDIR", "/tmp"), "output", results_file
        ),
        "r",
        encoding="utf8",
    ) as the_file:
        for i, line in enumerate(the_file.readlines()[1:]):
            marker_1 = marker_list[i]
            marker_1["proximal"], marker_1["distal"] = find_nearest_marker(
                marker_1["chr"], marker_1["pos"], original_markers
            )
            line_items = [item.rstrip("\n") for item in line.split(",")]
            for j, item in enumerate(line_items[1:]):
                marker_2 = marker_list[j]
                marker_2["proximal"], marker_2["distal"] = find_nearest_marker(
                    marker_2["chr"], marker_2["pos"], original_markers
                )
                try:
                    lod_score = f"{float(item):.3f}"
                except ValueError:
                    lod_score = f"{item}"

                table_data.append(
                    {
                        "proximal1": marker_1["proximal"],
                        "distal1": marker_1["distal"],
                        "pos1": f"Chr {marker_1['chr']} @ {float(marker_1['pos']):.1f} cM",
                        "lod": lod_score,
                        "proximal2": marker_2["proximal"],
                        "distal2": marker_2["distal"],
                        "pos2": f"Chr {marker_2['chr']} @ {float(marker_2['pos']):.1f} cM",
                    }
                )

    return table_data


def pairscan_for_table(file_name: str, geno_file: str) -> List:
    """
    Read in R/qtl pair-scan results and return as List representing
    table row contents

    PARAMETERS:
    file_name: The filename containing R/qtl pair-scan results
    geno_file: Filename of the genotype file (used to get marker positions)
    """

    # Open the map file with the list of markers/pseudomarkers and create list of marker obs
    marker_list = get_marker_list("MAP_" + file_name)

    # Get the list of original markers from the .geno file
    original_markers = build_marker_pos_dict(geno_file)

    # Open the file with the actual results and write the results as
    # they will be displayed in the results table
    table_data = generate_table_rows(file_name, marker_list, original_markers)

    return sorted(table_data, key=lambda i: float(i["lod"]), reverse=True)[:500]


def build_marker_pos_dict(genotype_file: str) -> Dict:
    """Gets list of markers and their positions from .geno file

    Basically a pared-down version of parse_genotype_file for R/qtl pair-scan"""

    with open(genotype_file, "r", encoding="utf8") as infile:
        contents = infile.readlines()

    # Get all lines after the metadata
    lines = tuple(
        line
        for line in contents
        if (
            (not line.strip().startswith("#"))
            and (not line.strip().startswith("@"))
            and (not line.strip() == "")
        )
    )

    header_items = lines[0].split("\t")
    mb_exists = "Mb" in header_items
    pos_column = (
        header_items.index("Mb") if mb_exists else header_items.index("cM")
    )

    the_markers = {"1": {}}  # type: Dict[str, Dict]
    for line in lines[1:]:  # The lines with markers
        line_items = line.split("\t")
        this_chr = line_items[0]
        if this_chr not in the_markers:
            the_markers[this_chr] = {}
        the_markers[this_chr][str(float(line_items[pos_column]))] = line_items[
            1
        ]

    return the_markers


def find_nearest_marker(
    the_chr: str, the_pos: str, marker_list: Dict
) -> Tuple[str, str]:
    """Given a chromosome and position of a pseudomarker (from R/qtl pair-scan results),
    return the nearest real marker"""

    pos_list = [float(pos) for pos in marker_list[the_chr]]

    # Get the position of the pseudomarker in the list of markers for the chr
    the_pos_index = bisect(pos_list, float(the_pos))

    proximal_marker = marker_list[the_chr][str(pos_list[the_pos_index - 1])]
    distal_marker = marker_list[the_chr][str(pos_list[the_pos_index])]

    return proximal_marker, distal_marker


def process_perm_output(file_name: str) -> Tuple[List, float, float]:
    """Given base filename, read in R/qtl permutation output and calculate
    suggestive and significant thresholds"""

    perm_results = []
    with open(
        os.path.join(
            current_app.config.get("TMPDIR", "/tmp"),
            "output",
            "PERM_" + file_name,
        ),
        "r",
        encoding="utf-8",
    ) as the_file:
        for i, line in enumerate(the_file):
            if i == 0:
                # Skip header line
                continue

            line_items = line.split(",")
            perm_results.append(float(line_items[1]))

    suggestive = np.percentile(np.array(perm_results), 67)
    significant = np.percentile(np.array(perm_results), 95)

    return perm_results, suggestive, significant