diff options
Diffstat (limited to 'wqflask/wqflask/partial_correlations_views.py')
-rw-r--r-- | wqflask/wqflask/partial_correlations_views.py | 263 |
1 files changed, 263 insertions, 0 deletions
diff --git a/wqflask/wqflask/partial_correlations_views.py b/wqflask/wqflask/partial_correlations_views.py new file mode 100644 index 00000000..bee0a033 --- /dev/null +++ b/wqflask/wqflask/partial_correlations_views.py @@ -0,0 +1,263 @@ +from typing import Union, Tuple + +import MySQLdb +from gn3.db.traits import retrieve_trait_info +from flask import flash, request, current_app, render_template +from gn3.computations.partial_correlations import partial_correlations_entry + +from wqflask import app + +def parse_trait(trait_str: str) -> Union[dict, None]: + keys = ("name", "dataset", "symbol", "description", "data_hmac") + parts = tuple(part.strip() for part in trait_str.split(":::")) + if len(parts) == len(keys): + return dict(zip(keys, parts)) + return None + +def process_step_select_primary( + primary_trait: dict, control_traits: Tuple[dict, ...], + target_traits: Tuple[dict, ...], + traits_list: Tuple[dict, ...], corr_method: str) -> Tuple[ + str, dict, Tuple[dict, ...], Tuple[dict, ...], Tuple[dict, ...], + str]: + if primary_trait is None: + flash("You must select a primary trait", "alert-danger") + return ( + "select-primary", primary_trait, control_traits, target_traits, + traits_list, corr_method) + + return ( + "select-controls", primary_trait, control_traits, target_traits, + tuple( + trait for trait in traits_list + if trait["data_hmac"] != primary_trait["data_hmac"]), + corr_method) + +def process_step_select_controls( + primary_trait: dict, control_traits: Tuple[dict, ...], + target_traits: Tuple[dict, ...], + traits_list: Tuple[dict, ...], corr_method: str) -> Tuple[ + str, dict, Tuple[dict, ...], Tuple[dict, ...], Tuple[dict, ...], + str]: + if len(control_traits) == 0 or len(control_traits) > 3: + flash( + ("You must select a minimum of one control trait, up to a maximum " + "of three control traits."), + "alert-danger") + return ( + "select-controls", primary_trait, control_traits, target_traits, + traits_list, corr_method) + + hmacs =(primary_trait["data_hmac"],) + tuple( + trait["data_hmac"] for trait in control_traits) + return ( + "select-targets", primary_trait, control_traits, target_traits, + tuple( + trait for trait in traits_list if trait["data_hmac"] not in hmacs), + corr_method) + +def process_step_select_targets( + primary_trait: dict, control_traits: Tuple[dict, ...], + target_traits: Tuple[dict, ...], + traits_list: Tuple[dict, ...], corr_method: str) -> Tuple[ + str, dict, Tuple[dict, ...], Tuple[dict, ...], Tuple[dict, ...], + str]: + if len(target_traits) == 0: + flash( + "You must select at least one target trait.", "alert-danger") + return ( + "select-targets", primary_trait, control_traits, target_traits, + traits_list, corr_method) + + hmacs =(primary_trait["data_hmac"],) + tuple( + trait["data_hmac"] for trait in (control_traits + target_traits)) + return ( + "select-corr-method", primary_trait, control_traits, target_traits, + tuple( + trait for trait in traits_list if trait["data_hmac"] not in hmacs), + corr_method) + +def process_step_select_corr_method( + primary_trait: dict, control_traits: Tuple[dict, ...], + target_traits: Tuple[dict, ...], + traits_list: Tuple[dict, ...], corr_method: str) -> Tuple[ + str, dict, Tuple[dict, ...], Tuple[dict, ...], Tuple[dict, ...], + str]: + methods = ( + "genetic correlation, pearson's r", + "genetic correlation, spearman's rho", + "sgo literature correlation", + "tissue correlation, pearson's r", + "tissue correlation, spearman's rho") + if corr_method.lower() not in methods: + flash( + "Selected method is unknown.", "alert-danger") + return ( + "select-corr-method", primary_trait, control_traits, target_traits, + traits_list, corr_method) + + hmacs =(primary_trait["data_hmac"],) + tuple( + trait["data_hmac"] for trait in (control_traits + target_traits)) + return ( + "run-correlation", primary_trait, control_traits, target_traits, + tuple( + trait for trait in traits_list if trait["data_hmac"] not in hmacs), + corr_method) + +def process_step( + step: str, primary_trait: dict, control_traits: Tuple[dict, ...], + target_traits: Tuple[dict, ...], traits_list: Tuple[dict, ...], + corr_method: str) -> Tuple[ + str, dict, Tuple[dict, ...], Tuple[dict, ...], Tuple[dict, ...], + str]: + processor_functions = { + # "select-traits": lambda arg: arg, + "select-primary": process_step_select_primary, + "select-controls": process_step_select_controls, + "select-targets": process_step_select_targets, + "select-corr-method": process_step_select_corr_method + } + return processor_functions[(step or "select-primary")]( + primary_trait, control_traits, target_traits, traits_list, corr_method) + +def sequence_of_traits(trait_strs) -> Tuple[dict, ...]: + return tuple(filter( + lambda trt: trt is not None, + (parse_trait(tstr.strip()) for tstr in trait_strs))) + +def publish_target_dabases(conn, group, threshold): + query = ( + "SELECT PublishFreeze.FullName,PublishFreeze.Name " + "FROM PublishFreeze, InbredSet " + "WHERE PublishFreeze.InbredSetId = InbredSet.Id " + "AND InbredSet.Name = %(group)s " + "AND PublishFreeze.public > %(threshold)s") + with conn.cursor() as cursor: + cursor.execute(query, {"group": group, "threshold": threshold}) + res = cursor.fetchall() + if res: + return tuple( + dict(zip(("description", "value"), row)) for row in res) + + return tuple() + +def geno_target_databases(conn, group, threshold): + query = ( + "SELECT GenoFreeze.FullName,GenoFreeze.Name " + "FROM GenoFreeze, InbredSet " + "WHERE GenoFreeze.InbredSetId = InbredSet.Id " + "AND InbredSet.Name = %(group)s " + "AND GenoFreeze.public > %(threshold)s") + with conn.cursor() as cursor: + cursor.execute(query, {"group": group, "threshold": threshold}) + res = cursor.fetchall() + if res: + return tuple( + dict(zip(("description", "value"), row)) for row in res) + + return tuple() + +def probeset_target_databases(conn, group, threshold): + query1 = "SELECT Id, Name FROM Tissue order by Name" + query2 = ( + "SELECT ProbeFreeze.TissueId, ProbeSetFreeze.FullName, ProbeSetFreeze.Name " + "FROM ProbeSetFreeze, ProbeFreeze, InbredSet " + "WHERE ProbeSetFreeze.ProbeFreezeId = ProbeFreeze.Id " + "AND ProbeFreeze.TissueId IN %(tissue_ids)s " + "AND ProbeSetFreeze.public > %(threshold)s " + "AND ProbeFreeze.InbredSetId = InbredSet.Id " + "AND InbredSet.Name like %(group)s " + "ORDER BY ProbeSetFreeze.CreateTime desc, ProbeSetFreeze.AvgId") + with conn.cursor() as cursor: + cursor.execute(query1) + tissue_res = cursor.fetchall() + if tissue_res: + tissue_ids = tuple(row[0] for row in tissue_res) + cursor.execute( + query2,{ + "tissue_ids": tissue_ids, "group": f"{group}%%", + "threshold": threshold + }) + db_res = cursor.fetchall() + if db_res: + databases = tuple( + dict(zip(("tissue_id", "description", "value"), row)) + for row in db_res) + return tuple( + {tissue_name: tuple( + { + "value": item["value"], + "description": item["description"] + } for item in databases + if item["tissue_id"] == tissue_id)} + for tissue_id, tissue_name in tissue_res) + + return tuple() + +def target_databases(conn, step, trait, threshold): + """ + Retrieves the names of possible target databases from the database. + """ + if step != "select-corr-method": + return None + + trait_info = retrieve_trait_info( + threshold, f"{trait['dataset']}::{trait['name']}", conn) + group = trait_info["group"] + return ( + publish_target_dabases(conn, group, threshold) + + geno_target_databases(conn, group, threshold) + + probeset_target_databases(conn, group, threshold)) + +def pcorrelations(conn, values): + if values["step"] != "run-correlation": + return None + + def trait_fullname(trait): + return f"{trait['dataset']}::{trait['name']}" + + return partial_correlations_entry( + conn, trait_fullname(values["primary_trait"]), + tuple(trait_fullname(trait) for trait in values["control_traits"]), + values["method"], values["criteria"], values["target_db"]) + +@app.route("/partial_correlations", methods=("POST",)) +def partial_correlations(): + form = request.form + traits_list = tuple(filter( + lambda trt: trt is not None, + (parse_trait(tstr) for tstr in form.get("traits_list", "").split("|||")))) + + args_dict = dict(zip( + ("step", "primary_trait", "control_traits", "target_traits", + "traits_list", "method"), + process_step( + form.get("step", None), + parse_trait(form.get("primary_trait", "")), + sequence_of_traits( + form.getlist("control_traits[]") or + form.get("control_traits", "").split("|||")), + sequence_of_traits( + form.getlist("target_traits[]") or + form.get("target_traits", "").split("|||")), + sequence_of_traits(form.get("traits_list", "").split("|||")), + form.get("method")))) + + conn = MySQLdb.Connect( + db=current_app.config.get("DB_NAME"), + user=current_app.config.get("DB_USER"), + passwd=current_app.config.get("DB_PASS"), + host=current_app.config.get("DB_HOST")) + target_dbs = target_databases( + conn, args_dict["step"], args_dict["primary_trait"], 0) + + if args_dict["step"] == "run-correlation": + args_dict = { + **args_dict, "target_db": form.get("target_db"), + "criteria": int(form.get("criteria", 500))} + + corr_results = pcorrelations(conn, args_dict) + + return render_template( + "partial_correlations.html", **args_dict, target_dbs=target_dbs, + corr_results=corr_results) |