import requests import itertools from gn2.utility import genofile_parser from gn2.utility.tools import GN3_LOCAL_URL from gn2.utility.tools import locate from gn2.base.trait import create_trait from gn2.base.trait import retrieve_sample_data from gn2.base import data_set def process_significance_data(dataset): col_names = ["trait", "marker", "trait_2", "LOD", "dcor"] dataset_rows = [[] for _ in range(len(dataset["trait"]))] for col in col_names: for (index, col_data) in enumerate(dataset[col]): if col in ["dcor", "LOD"]: dataset_rows[index].append(round(float(col_data), 2)) else: dataset_rows[index].append(col_data) return { "col_names": col_names, "data_set_rows": dataset_rows } def parse_geno_data(dataset_group_name) -> dict: """ Args: dataset_group_name: string name @returns : dict with keys genotypes,markernames & individuals """ genofile_location = locate(dataset_group_name + ".geno", "genotype") parser = genofile_parser.ConvertGenoFile(genofile_location) parser.process_csv() markers = [] markernames = [] for marker in parser.markers: markernames.append(marker["name"]) markers.append(marker["genotypes"]) return { "genotypes": list(itertools.chain(*markers)), "markernames": markernames, "individuals": parser.individuals } def parse_phenotype_data(trait_list, dataset, individuals): """ Args: trait_list:list contains the traits dataset: object individuals:a list contains the individual vals Returns: traits_db_List:parsed list of traits traits: list contains trait names individuals """ traits = [] for trait in trait_list: if trait != "": ts = trait.split(':') gt = create_trait(name=ts[0], dataset_name=ts[1]) gt = retrieve_sample_data(gt, dataset, individuals) for ind in individuals: if ind in list(gt.data.keys()): traits.append(gt.data[ind].value) else: traits.append("-999") return { "trait_db_list": trait_list, "traits": traits, "individuals": individuals } def parse_form_data(form_data: dict): trait_db_list = [trait.strip() for trait in form_data['trait_list'].split(',')] form_data["trait_db_list"] = [x for x in trait_db_list if x] form_data["nperm"] = int(form_data["nperm"]) form_data["significance"] = float(form_data["significance"]) form_data["strategy"] = form_data["strategy"].capitalize() return form_data def run_ctl(requestform): """function to make an api call to gn3 and run ctl""" ctl_api = f"{GN3_LOCAL_URL}/api/ctl/run_ctl" form_data = parse_form_data(requestform.to_dict()) trait_db_list = form_data["trait_db_list"] dataset = data_set.create_dataset(trait_db_list[0].split(":")[1]) geno_data = parse_geno_data(dataset.group.name) pheno_data = parse_phenotype_data( trait_db_list, dataset, geno_data["individuals"]) try: response = requests.post(ctl_api, json={ "genoData": geno_data, "phenoData": pheno_data, **form_data, }) if response.status_code != 200: return {"error": response.json()} response = response.json()["results"] response["significance_data"] = process_significance_data( response["significance_data"]) return response except requests.exceptions.ConnectionError: return { "error": "A connection error to perform computation occurred" }