aboutsummaryrefslogtreecommitdiff
path: root/gn3/db/traits.py
diff options
context:
space:
mode:
Diffstat (limited to 'gn3/db/traits.py')
-rw-r--r--gn3/db/traits.py195
1 files changed, 74 insertions, 121 deletions
diff --git a/gn3/db/traits.py b/gn3/db/traits.py
index 1c6aaa7..f722e24 100644
--- a/gn3/db/traits.py
+++ b/gn3/db/traits.py
@@ -1,7 +1,7 @@
"""This class contains functions relating to trait data manipulation"""
import os
from functools import reduce
-from typing import Any, Dict, Union, Sequence
+from typing import Any, Dict, Sequence
from gn3.settings import TMPDIR
from gn3.random import random_string
@@ -67,7 +67,7 @@ def export_trait_data(
return accumulator + (trait_data["data"][sample]["ndata"], )
if dtype == "all":
return accumulator + __export_all_types(trait_data["data"], sample)
- raise KeyError("Type `%s` is incorrect" % dtype)
+ raise KeyError(f"Type `{dtype}` is incorrect")
if var_exists and n_exists:
return accumulator + (None, None, None)
if var_exists or n_exists:
@@ -76,80 +76,6 @@ def export_trait_data(
return reduce(__exporter, samplelist, tuple())
-def get_trait_csv_sample_data(conn: Any,
- trait_name: int, phenotype_id: int):
- """Fetch a trait and return it as a csv string"""
- sql = ("SELECT DISTINCT Strain.Id, PublishData.Id, Strain.Name, "
- "PublishData.value, "
- "PublishSE.error, NStrain.count FROM "
- "(PublishData, Strain, PublishXRef, PublishFreeze) "
- "LEFT JOIN PublishSE ON "
- "(PublishSE.DataId = PublishData.Id AND "
- "PublishSE.StrainId = PublishData.StrainId) "
- "LEFT JOIN NStrain ON (NStrain.DataId = PublishData.Id AND "
- "NStrain.StrainId = PublishData.StrainId) WHERE "
- "PublishXRef.InbredSetId = PublishFreeze.InbredSetId AND "
- "PublishData.Id = PublishXRef.DataId AND "
- "PublishXRef.Id = %s AND PublishXRef.PhenotypeId = %s "
- "AND PublishData.StrainId = Strain.Id Order BY Strain.Name")
- csv_data = ["Strain Id,Strain Name,Value,SE,Count"]
- publishdata_id = ""
- with conn.cursor() as cursor:
- cursor.execute(sql, (trait_name, phenotype_id,))
- for record in cursor.fetchall():
- (strain_id, publishdata_id,
- strain_name, value, error, count) = record
- csv_data.append(
- ",".join([str(val) if val else "x"
- for val in (strain_id, strain_name,
- value, error, count)]))
- return f"# Publish Data Id: {publishdata_id}\n\n" + "\n".join(csv_data)
-
-
-def update_sample_data(conn: Any,
- strain_name: str,
- strain_id: int,
- publish_data_id: int,
- value: Union[int, float, str],
- error: Union[int, float, str],
- count: Union[int, str]):
- """Given the right parameters, update sample-data from the relevant
- table."""
- # pylint: disable=[R0913, R0914, C0103]
- STRAIN_ID_SQL: str = "UPDATE Strain SET Name = %s WHERE Id = %s"
- PUBLISH_DATA_SQL: str = ("UPDATE PublishData SET value = %s "
- "WHERE StrainId = %s AND Id = %s")
- PUBLISH_SE_SQL: str = ("UPDATE PublishSE SET error = %s "
- "WHERE StrainId = %s AND DataId = %s")
- N_STRAIN_SQL: str = ("UPDATE NStrain SET count = %s "
- "WHERE StrainId = %s AND DataId = %s")
-
- updated_strains: int = 0
- updated_published_data: int = 0
- updated_se_data: int = 0
- updated_n_strains: int = 0
-
- with conn.cursor() as cursor:
- # Update the Strains table
- cursor.execute(STRAIN_ID_SQL, (strain_name, strain_id))
- updated_strains = cursor.rowcount
- # Update the PublishData table
- cursor.execute(PUBLISH_DATA_SQL,
- (None if value == "x" else value,
- strain_id, publish_data_id))
- updated_published_data = cursor.rowcount
- # Update the PublishSE table
- cursor.execute(PUBLISH_SE_SQL,
- (None if error == "x" else error,
- strain_id, publish_data_id))
- updated_se_data = cursor.rowcount
- # Update the NStrain table
- cursor.execute(N_STRAIN_SQL,
- (None if count == "x" else count,
- strain_id, publish_data_id))
- updated_n_strains = cursor.rowcount
- return (updated_strains, updated_published_data,
- updated_se_data, updated_n_strains)
def retrieve_publish_trait_info(trait_data_source: Dict[str, Any], conn: Any):
"""Retrieve trait information for type `Publish` traits.
@@ -177,24 +103,24 @@ def retrieve_publish_trait_info(trait_data_source: Dict[str, Any], conn: Any):
"PublishXRef.comments")
query = (
"SELECT "
- "{columns} "
+ f"{columns} "
"FROM "
- "PublishXRef, Publication, Phenotype, PublishFreeze "
+ "PublishXRef, Publication, Phenotype "
"WHERE "
"PublishXRef.Id = %(trait_name)s AND "
"Phenotype.Id = PublishXRef.PhenotypeId AND "
"Publication.Id = PublishXRef.PublicationId AND "
- "PublishXRef.InbredSetId = PublishFreeze.InbredSetId AND "
- "PublishFreeze.Id =%(trait_dataset_id)s").format(columns=columns)
+ "PublishXRef.InbredSetId = %(trait_dataset_id)s")
with conn.cursor() as cursor:
cursor.execute(
query,
{
- k:v for k, v in trait_data_source.items()
+ k: v for k, v in trait_data_source.items()
if k in ["trait_name", "trait_dataset_id"]
})
return dict(zip([k.lower() for k in keys], cursor.fetchone()))
+
def set_confidential_field(trait_type, trait_info):
"""Post processing function for 'Publish' trait types.
@@ -207,6 +133,7 @@ def set_confidential_field(trait_type, trait_info):
and not trait_info.get("pubmed_id", None)) else 0}
return trait_info
+
def retrieve_probeset_trait_info(trait_data_source: Dict[str, Any], conn: Any):
"""Retrieve trait information for type `ProbeSet` traits.
@@ -219,67 +146,68 @@ def retrieve_probeset_trait_info(trait_data_source: Dict[str, Any], conn: Any):
"probe_set_specificity", "probe_set_blat_score",
"probe_set_blat_mb_start", "probe_set_blat_mb_end", "probe_set_strand",
"probe_set_note_by_rw", "flag")
+ columns = (f"ProbeSet.{x}" for x in keys)
query = (
- "SELECT "
- "{columns} "
+ f"SELECT {', '.join(columns)} "
"FROM "
"ProbeSet, ProbeSetFreeze, ProbeSetXRef "
"WHERE "
"ProbeSetXRef.ProbeSetFreezeId = ProbeSetFreeze.Id AND "
"ProbeSetXRef.ProbeSetId = ProbeSet.Id AND "
"ProbeSetFreeze.Name = %(trait_dataset_name)s AND "
- "ProbeSet.Name = %(trait_name)s").format(
- columns=", ".join(["ProbeSet.{}".format(x) for x in keys]))
+ "ProbeSet.Name = %(trait_name)s")
with conn.cursor() as cursor:
cursor.execute(
query,
{
- k:v for k, v in trait_data_source.items()
+ k: v for k, v in trait_data_source.items()
if k in ["trait_name", "trait_dataset_name"]
})
return dict(zip(keys, cursor.fetchone()))
+
def retrieve_geno_trait_info(trait_data_source: Dict[str, Any], conn: Any):
"""Retrieve trait information for type `Geno` traits.
https://github.com/genenetwork/genenetwork1/blob/master/web/webqtl/base/webqtlTrait.py#L438-L449"""
keys = ("name", "chr", "mb", "source2", "sequence")
+ columns = ", ".join(f"Geno.{x}" for x in keys)
query = (
- "SELECT "
- "{columns} "
+ f"SELECT {columns} "
"FROM "
- "Geno, GenoFreeze, GenoXRef "
+ "Geno INNER JOIN GenoXRef ON GenoXRef.GenoId = Geno.Id "
+ "INNER JOIN GenoFreeze ON GenoFreeze.Id = GenoXRef.GenoFreezeId "
"WHERE "
- "GenoXRef.GenoFreezeId = GenoFreeze.Id AND GenoXRef.GenoId = Geno.Id AND "
"GenoFreeze.Name = %(trait_dataset_name)s AND "
- "Geno.Name = %(trait_name)s").format(
- columns=", ".join(["Geno.{}".format(x) for x in keys]))
+ "Geno.Name = %(trait_name)s")
with conn.cursor() as cursor:
cursor.execute(
query,
{
- k:v for k, v in trait_data_source.items()
+ k: v for k, v in trait_data_source.items()
if k in ["trait_name", "trait_dataset_name"]
})
return dict(zip(keys, cursor.fetchone()))
+
def retrieve_temp_trait_info(trait_data_source: Dict[str, Any], conn: Any):
"""Retrieve trait information for type `Temp` traits.
https://github.com/genenetwork/genenetwork1/blob/master/web/webqtl/base/webqtlTrait.py#L450-452"""
keys = ("name", "description")
query = (
- "SELECT {columns} FROM Temp "
- "WHERE Name = %(trait_name)s").format(columns=", ".join(keys))
+ f"SELECT {', '.join(keys)} FROM Temp "
+ "WHERE Name = %(trait_name)s")
with conn.cursor() as cursor:
cursor.execute(
query,
{
- k:v for k, v in trait_data_source.items()
+ k: v for k, v in trait_data_source.items()
if k in ["trait_name"]
})
return dict(zip(keys, cursor.fetchone()))
+
def set_haveinfo_field(trait_info):
"""
Common postprocessing function for all trait types.
@@ -287,6 +215,7 @@ def set_haveinfo_field(trait_info):
Sets the value for the 'haveinfo' field."""
return {**trait_info, "haveinfo": 1 if trait_info else 0}
+
def set_homologene_id_field_probeset(trait_info, conn):
"""
Postprocessing function for 'ProbeSet' traits.
@@ -302,7 +231,7 @@ def set_homologene_id_field_probeset(trait_info, conn):
cursor.execute(
query,
{
- k:v for k, v in trait_info.items()
+ k: v for k, v in trait_info.items()
if k in ["geneid", "group"]
})
res = cursor.fetchone()
@@ -310,12 +239,13 @@ def set_homologene_id_field_probeset(trait_info, conn):
return {**trait_info, "homologeneid": res[0]}
return {**trait_info, "homologeneid": None}
+
def set_homologene_id_field(trait_type, trait_info, conn):
"""
Common postprocessing function for all trait types.
Sets the value for the 'homologene' key."""
- set_to_null = lambda ti: {**ti, "homologeneid": None}
+ def set_to_null(ti): return {**ti, "homologeneid": None} # pylint: disable=[C0103, C0321]
functions_table = {
"Temp": set_to_null,
"Geno": set_to_null,
@@ -324,6 +254,7 @@ def set_homologene_id_field(trait_type, trait_info, conn):
}
return functions_table[trait_type](trait_info)
+
def load_publish_qtl_info(trait_info, conn):
"""
Load extra QTL information for `Publish` traits
@@ -344,6 +275,7 @@ def load_publish_qtl_info(trait_info, conn):
return dict(zip(["locus", "lrs", "additive"], cursor.fetchone()))
return {"locus": "", "lrs": "", "additive": ""}
+
def load_probeset_qtl_info(trait_info, conn):
"""
Load extra QTL information for `ProbeSet` traits
@@ -366,6 +298,7 @@ def load_probeset_qtl_info(trait_info, conn):
["locus", "lrs", "pvalue", "mean", "additive"], cursor.fetchone()))
return {"locus": "", "lrs": "", "pvalue": "", "mean": "", "additive": ""}
+
def load_qtl_info(qtl, trait_type, trait_info, conn):
"""
Load extra QTL information for traits
@@ -389,11 +322,12 @@ def load_qtl_info(qtl, trait_type, trait_info, conn):
"Publish": load_publish_qtl_info,
"ProbeSet": load_probeset_qtl_info
}
- if trait_info["name"] not in qtl_info_functions.keys():
+ if trait_info["name"] not in qtl_info_functions:
return trait_info
return qtl_info_functions[trait_type](trait_info, conn)
+
def build_trait_name(trait_fullname):
"""
Initialises the trait's name, and other values from the search data provided
@@ -408,7 +342,7 @@ def build_trait_name(trait_fullname):
return "ProbeSet"
name_parts = trait_fullname.split("::")
- assert len(name_parts) >= 2, "Name format error"
+ assert len(name_parts) >= 2, f"Name format error: '{trait_fullname}'"
dataset_name = name_parts[0]
dataset_type = dataset_type(dataset_name)
return {
@@ -420,6 +354,7 @@ def build_trait_name(trait_fullname):
"cellid": name_parts[2] if len(name_parts) == 3 else ""
}
+
def retrieve_probeset_sequence(trait, conn):
"""
Retrieve a 'ProbeSet' trait's sequence information
@@ -441,6 +376,7 @@ def retrieve_probeset_sequence(trait, conn):
seq = cursor.fetchone()
return {**trait, "sequence": seq[0] if seq else ""}
+
def retrieve_trait_info(
threshold: int, trait_full_name: str, conn: Any,
qtl=None):
@@ -496,6 +432,7 @@ def retrieve_trait_info(
}
return trait_info
+
def retrieve_temp_trait_data(trait_info: dict, conn: Any):
"""
Retrieve trait data for `Temp` traits.
@@ -514,10 +451,12 @@ def retrieve_temp_trait_data(trait_info: dict, conn: Any):
query,
{"trait_name": trait_info["trait_name"]})
return [dict(zip(
- ["sample_name", "value", "se_error", "nstrain", "id"], row))
+ ["sample_name", "value", "se_error", "nstrain", "id"],
+ row))
for row in cursor.fetchall()]
return []
+
def retrieve_species_id(group, conn: Any):
"""
Retrieve a species id given the Group value
@@ -529,6 +468,7 @@ def retrieve_species_id(group, conn: Any):
return cursor.fetchone()[0]
return None
+
def retrieve_geno_trait_data(trait_info: Dict, conn: Any):
"""
Retrieve trait data for `Geno` traits.
@@ -552,11 +492,14 @@ def retrieve_geno_trait_data(trait_info: Dict, conn: Any):
"dataset_name": trait_info["db"]["dataset_name"],
"species_id": retrieve_species_id(
trait_info["db"]["group"], conn)})
- return [dict(zip(
- ["sample_name", "value", "se_error", "id"], row))
- for row in cursor.fetchall()]
+ return [
+ dict(zip(
+ ["sample_name", "value", "se_error", "id"],
+ row))
+ for row in cursor.fetchall()]
return []
+
def retrieve_publish_trait_data(trait_info: Dict, conn: Any):
"""
Retrieve trait data for `Publish` traits.
@@ -565,17 +508,16 @@ def retrieve_publish_trait_data(trait_info: Dict, conn: Any):
"SELECT "
"Strain.Name, PublishData.value, PublishSE.error, NStrain.count, "
"PublishData.Id "
- "FROM (PublishData, Strain, PublishXRef, PublishFreeze) "
+ "FROM (PublishData, Strain, PublishXRef) "
"LEFT JOIN PublishSE ON "
"(PublishSE.DataId = PublishData.Id "
"AND PublishSE.StrainId = PublishData.StrainId) "
"LEFT JOIN NStrain ON "
"(NStrain.DataId = PublishData.Id "
"AND NStrain.StrainId = PublishData.StrainId) "
- "WHERE PublishXRef.InbredSetId = PublishFreeze.InbredSetId "
- "AND PublishData.Id = PublishXRef.DataId "
+ "WHERE PublishData.Id = PublishXRef.DataId "
"AND PublishXRef.Id = %(trait_name)s "
- "AND PublishFreeze.Id = %(dataset_id)s "
+ "AND PublishXRef.InbredSetId = %(dataset_id)s "
"AND PublishData.StrainId = Strain.Id "
"ORDER BY Strain.Name")
with conn.cursor() as cursor:
@@ -583,11 +525,13 @@ def retrieve_publish_trait_data(trait_info: Dict, conn: Any):
query,
{"trait_name": trait_info["trait_name"],
"dataset_id": trait_info["db"]["dataset_id"]})
- return [dict(zip(
- ["sample_name", "value", "se_error", "nstrain", "id"], row))
- for row in cursor.fetchall()]
+ return [
+ dict(zip(
+ ["sample_name", "value", "se_error", "nstrain", "id"], row))
+ for row in cursor.fetchall()]
return []
+
def retrieve_cellid_trait_data(trait_info: Dict, conn: Any):
"""
Retrieve trait data for `Probe Data` types.
@@ -616,11 +560,13 @@ def retrieve_cellid_trait_data(trait_info: Dict, conn: Any):
{"cellid": trait_info["cellid"],
"trait_name": trait_info["trait_name"],
"dataset_id": trait_info["db"]["dataset_id"]})
- return [dict(zip(
- ["sample_name", "value", "se_error", "id"], row))
- for row in cursor.fetchall()]
+ return [
+ dict(zip(
+ ["sample_name", "value", "se_error", "id"], row))
+ for row in cursor.fetchall()]
return []
+
def retrieve_probeset_trait_data(trait_info: Dict, conn: Any):
"""
Retrieve trait data for `ProbeSet` traits.
@@ -645,11 +591,13 @@ def retrieve_probeset_trait_data(trait_info: Dict, conn: Any):
query,
{"trait_name": trait_info["trait_name"],
"dataset_name": trait_info["db"]["dataset_name"]})
- return [dict(zip(
- ["sample_name", "value", "se_error", "id"], row))
- for row in cursor.fetchall()]
+ return [
+ dict(zip(
+ ["sample_name", "value", "se_error", "id"], row))
+ for row in cursor.fetchall()]
return []
+
def with_samplelist_data_setup(samplelist: Sequence[str]):
"""
Build function that computes the trait data from provided list of samples.
@@ -676,6 +624,7 @@ def with_samplelist_data_setup(samplelist: Sequence[str]):
return None
return setup_fn
+
def without_samplelist_data_setup():
"""
Build function that computes the trait data.
@@ -696,6 +645,7 @@ def without_samplelist_data_setup():
return None
return setup_fn
+
def retrieve_trait_data(trait: dict, conn: Any, samplelist: Sequence[str] = tuple()):
"""
Retrieve trait data
@@ -735,14 +685,16 @@ def retrieve_trait_data(trait: dict, conn: Any, samplelist: Sequence[str] = tupl
"data": dict(map(
lambda x: (
x["sample_name"],
- {k:v for k, v in x.items() if x != "sample_name"}),
+ {k: v for k, v in x.items() if x != "sample_name"}),
data))}
return {}
+
def generate_traits_filename(base_path: str = TMPDIR):
"""Generate a unique filename for use with generated traits files."""
- return "{}/traits_test_file_{}.txt".format(
- os.path.abspath(base_path), random_string(10))
+ return (
+ f"{os.path.abspath(base_path)}/traits_test_file_{random_string(10)}.txt")
+
def export_informative(trait_data: dict, inc_var: bool = False) -> tuple:
"""
@@ -765,5 +717,6 @@ def export_informative(trait_data: dict, inc_var: bool = False) -> tuple:
return acc
return reduce(
__exporter__,
- filter(lambda td: td["value"] is not None, trait_data["data"].values()),
+ filter(lambda td: td["value"] is not None,
+ trait_data["data"].values()),
(tuple(), tuple(), tuple()))