aboutsummaryrefslogtreecommitdiff
path: root/gn2/wqflask/ctl/gn3_ctl_analysis.py
blob: 64c2ff0def6038522e9f0d3783470b262806f262 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import requests
import itertools

from gn2.utility import genofile_parser
from gn2.utility.tools import GN3_LOCAL_URL
from gn2.utility.tools import locate

from gn2.base.trait import create_trait
from gn2.base.trait import retrieve_sample_data
from gn2.base import data_set


def process_significance_data(dataset):
    col_names = ["trait", "marker", "trait_2", "LOD", "dcor"]
    dataset_rows = [[] for _ in range(len(dataset["trait"]))]
    for col in col_names:
        for (index, col_data) in enumerate(dataset[col]):
            if col in ["dcor", "LOD"]:
                dataset_rows[index].append(round(float(col_data), 2))
            else:
                dataset_rows[index].append(col_data)

    return {
        "col_names": col_names,
        "data_set_rows": dataset_rows
    }


def parse_geno_data(dataset_group_name) -> dict:
    """
    Args:
        dataset_group_name: string name

    @returns : dict with keys genotypes,markernames & individuals
    """
    genofile_location = locate(dataset_group_name + ".geno", "genotype")
    parser = genofile_parser.ConvertGenoFile(genofile_location)
    parser.process_csv()
    markers = []
    markernames = []
    for marker in parser.markers:
        markernames.append(marker["name"])
        markers.append(marker["genotypes"])

    return {

        "genotypes": list(itertools.chain(*markers)),
        "markernames": markernames,
        "individuals": parser.individuals


    }


def parse_phenotype_data(trait_list, dataset, individuals):
    """
    Args:
        trait_list:list contains the traits
        dataset:  object
        individuals:a list contains the individual vals
    Returns:
           traits_db_List:parsed list of traits 
           traits: list contains trait names
           individuals

    """

    traits = []
    for trait in trait_list:
        if trait != "":
            ts = trait.split(':')
            gt = create_trait(name=ts[0], dataset_name=ts[1])
            gt = retrieve_sample_data(gt, dataset, individuals)
            for ind in individuals:
                if ind in list(gt.data.keys()):
                    traits.append(gt.data[ind].value)
                else:
                    traits.append("-999")

    return {
        "trait_db_list": trait_list,
        "traits": traits,
        "individuals": individuals
    }


def parse_form_data(form_data: dict):

    trait_db_list = [trait.strip()
                     for trait in form_data['trait_list'].split(',')]

    form_data["trait_db_list"] = [x for x in trait_db_list if x]
    form_data["nperm"] = int(form_data["nperm"])
    form_data["significance"] = float(form_data["significance"])
    form_data["strategy"] = form_data["strategy"].capitalize()

    return form_data


def run_ctl(requestform):
    """function to make an api call
    to gn3 and run ctl"""
    ctl_api = f"{GN3_LOCAL_URL}/api/ctl/run_ctl"

    form_data = parse_form_data(requestform.to_dict())
    trait_db_list = form_data["trait_db_list"]
    dataset = data_set.create_dataset(trait_db_list[0].split(":")[1])
    geno_data = parse_geno_data(dataset.group.name)
    pheno_data = parse_phenotype_data(
        trait_db_list, dataset, geno_data["individuals"])

    try:

        response = requests.post(ctl_api, json={

            "genoData": geno_data,
            "phenoData": pheno_data,
            **form_data,

        })
        if response.status_code != 200:
            return {"error": response.json()}
        response = response.json()["results"]
        response["significance_data"] = process_significance_data(
            response["significance_data"])

        return response

    except requests.exceptions.ConnectionError:
        return {
            "error": "A connection error to perform computation occurred"
        }