diff options
Diffstat (limited to 'gn3/computations/rqtl2.py')
| -rw-r--r-- | gn3/computations/rqtl2.py | 228 |
1 files changed, 228 insertions, 0 deletions
diff --git a/gn3/computations/rqtl2.py b/gn3/computations/rqtl2.py new file mode 100644 index 0000000..5d5f68e --- /dev/null +++ b/gn3/computations/rqtl2.py @@ -0,0 +1,228 @@ +"""Module contains functions to parse and process rqtl2 input and output""" +import os +import csv +import uuid +import json +from pathlib import Path +from typing import List +from typing import Dict +from typing import Any + +def generate_rqtl2_files(data, workspace_dir): + """Prepare data and generate necessary CSV files + required to write to control_file + """ + file_to_name_map = { + "geno_file": "geno_data", + "pheno_file": "pheno_data", + "geno_map_file": "geno_map_data", + "physical_map_file": "physical_map_data", + "phenocovar_file": "phenocovar_data", + "founder_geno_file" : "founder_geno_data", + "covar_file" : "covar_data" + } + parsed_files = {} + for file_name, data_key in file_to_name_map.items(): + if data_key in data: + file_path = write_to_csv( + workspace_dir, f"{file_name}.csv", data[data_key]) + if file_path: + parsed_files[file_name] = file_path + return {**data, **parsed_files} + + +def write_to_csv(work_dir, file_name, data: list[dict], + headers=None, delimiter=","): + """Functions to write data list to csv file + if headers is not provided use the keys for first boject. + """ + if not data: + return "" + if headers is None: + headers = data[0].keys() + file_path = os.path.join(work_dir, file_name) + with open(file_path, "w", encoding="utf-8") as file_handler: + writer = csv.DictWriter(file_handler, fieldnames=headers, + delimiter=delimiter) + writer.writeheader() + for row in data: + writer.writerow(row) + # return the relative file to the workspace see rqtl2 docs + return file_name + + +def validate_required_keys(required_keys: list, data: dict) -> tuple[bool, str]: + """Check for missing keys in data object""" + missing_keys = [key for key in required_keys if key not in data] + if missing_keys: + return False, f"Required key(s) missing: {', '.join(missing_keys)}" + return True, "" + + +def compose_rqtl2_cmd(# pylint: disable=[too-many-positional-arguments] + rqtl_path, input_file, output_file, workspace_dir, data, config): + """Compose the command for running the R/QTL2 analysis.""" + # pylint: disable=R0913 + params = { + "input_file": input_file, + "directory": workspace_dir, + "output_file": output_file, + "nperm": data.get("nperm", 0), + "method": data.get("method", "HK"), + "threshold": data.get("threshold", 1), + "cores": config.get('MULTIPROCESSOR_PROCS', 1) + } + rscript_path = config.get("RSCRIPT", os.environ.get("RSCRIPT", "Rscript")) + return f"{rscript_path} { rqtl_path } " + " ".join( + [f"--{key} {val}" for key, val in params.items()]) + + +def create_file(file_path): + """Utility function to create file given a file_path""" + try: + with open(file_path, "x", encoding="utf-8") as _file_handler: + return True, f"File created at {file_path}" + except FileExistsError: + return False, "File Already Exists" + + +def prepare_files(tmpdir): + """Prepare necessary files and workspace dir for computation.""" + workspace_dir = os.path.join(tmpdir, str(uuid.uuid4())) + Path(workspace_dir).mkdir(parents=False, exist_ok=True) + input_file = os.path.join( + workspace_dir, f"rqtl2-input-{uuid.uuid4()}.json") + output_file = os.path.join( + workspace_dir, f"rqtl2-output-{uuid.uuid4()}.json") + + # to ensure streaming api has access to file even after computation ends + # .. Create the log file outside the workspace_dir + log_file = os.path.join(tmpdir, f"rqtl2-log-{uuid.uuid4()}") + for file_path in [input_file, output_file, log_file]: + create_file(file_path) + return workspace_dir, input_file, output_file, log_file + + +def write_input_file(input_file, workspace_dir, data): + """ + Write input data to a json file to be passed + as input to the rqtl2 script + """ + with open(input_file, "w+", encoding="UTF-8") as file_handler: + # todo choose a better variable name + rqtl2_files = generate_rqtl2_files(data, workspace_dir) + json.dump(rqtl2_files, file_handler) + + +def read_output_file(output_path: str) -> dict: + """function to read output file json generated from rqtl2 + see rqtl2_wrapper.R script for the expected output + """ + with open(output_path, "r", encoding="utf-8") as file_handler: + results = json.load(file_handler) + return results + + +def process_permutation(data): + """ This function processses output data from the output results. + input: data object extracted from the output_file + returns: + dict: A dict containing + * phenotypes array + * permutations as dict with keys as permutation_id + * significance_results with keys as threshold values + """ + + perm_file = data.get("permutation_file") + with open(perm_file, "r", encoding="utf-8") as file_handler: + reader = csv.reader(file_handler) + phenotypes = next(reader)[1:] + perm_results = {_id: float(val) for _id, val, *_ in reader} + _, significance = fetch_significance_results(data.get("significance_file")) + return { + "phenotypes": phenotypes, + "perm_results": perm_results, + "significance": significance, + } + + +def fetch_significance_results(file_path: str): + """ + Processes the 'significance_file' from the given data object to extract + phenotypes and significance values. + thresholds values are: (0.05, 0.01) + Args: + file_path (str): file_Path for the significance output + + Returns: + tuple: A tuple containing + * phenotypes (list): List of phenotypes + * significances (dict): A dictionary where keys + ...are threshold values and values are lists + of significant results corresponding to each threshold. + """ + with open(file_path, "r", encoding="utf-8") as file_handler: + reader = csv.reader(file_handler) + results = {} + phenotypes = next(reader)[1:] + for line in reader: + threshold, significance = line[0], line[1:] + results[threshold] = significance + return (phenotypes, results) + + +def process_scan_results(qtl_file_path: str, map_file_path: str) -> List[Dict[str, Any]]: + """Function to process genome scanning results and obtain marker_name, Lod score, + marker_position, and chromosome. + Args: + qtl_file_path (str): Path to the QTL scan results CSV file. + map_file_path (str): Path to the map file from the script. + + Returns: + List[Dict[str, str]]: A list of dictionaries containing the marker data. + """ + map_data = {} + # read the genetic map + with open(map_file_path, "r", encoding="utf-8") as file_handler: + reader = csv.reader(file_handler) + next(reader) + for line in reader: + marker, chr_, cm_, mb_ = line + cm: float | None = float(cm_) if cm_ and cm_ != "NA" else None + mb: float | None = float(mb_) if mb_ and mb_ != "NA" else None + map_data[marker] = {"chr": chr_, "cM": cm, "Mb": mb} + + # Process QTL scan results and merge the positional data + results = [] + with open(qtl_file_path, "r", encoding="utf-8") as file_handler: + reader = csv.reader(file_handler) + next(reader) + for line in reader: + marker = line[0] + lod_score = line[1] + results.append({ + "name": marker, + "lod_score": float(lod_score), + **map_data.get(marker, {}) # Add chromosome and positions if available + }) + return results + + +def process_qtl2_results(output_file: str) -> Dict[str, Any]: + """Function provides abstraction for processing all QTL2 mapping results. + + Args: * File path to to the output generated + + Returns: + Dict[str, any]: A dictionary containing both QTL + and permutation results along with input data. + """ + results = read_output_file(output_file) + qtl_results = process_scan_results(results["scan_file"], + results["map_file"]) + permutation_results = process_permutation(results) if results["permutations"] > 0 else {} + return { + **results, + "qtl_results": qtl_results, + "permutation_results": permutation_results + } |
