"""This class contains functions relating to trait data manipulation"""
import os
from functools import reduce
from typing import Any, Dict, Union, Sequence
import MySQLdb
from gn3.settings import TMPDIR
from gn3.random import random_string
from gn3.function_helpers import compose
from gn3.db.datasets import retrieve_trait_dataset
def export_trait_data(
trait_data: dict, samplelist: Sequence[str], dtype: str = "val",
var_exists: bool = False, n_exists: bool = False):
"""
Export data according to `samplelist`. Mostly used in calculating
correlations.
DESCRIPTION:
Migrated from
https://github.com/genenetwork/genenetwork1/blob/master/web/webqtl/base/webqtlTrait.py#L166-L211
PARAMETERS
trait: (dict)
The dictionary of key-value pairs representing a trait
samplelist: (list)
A list of sample names
dtype: (str)
... verify what this is ...
var_exists: (bool)
A flag indicating existence of variance
n_exists: (bool)
A flag indicating existence of ndata
"""
def __export_all_types(tdata, sample):
sample_data = []
if tdata[sample]["value"]:
sample_data.append(tdata[sample]["value"])
if var_exists:
if tdata[sample]["variance"]:
sample_data.append(tdata[sample]["variance"])
else:
sample_data.append(None)
if n_exists:
if tdata[sample]["ndata"]:
sample_data.append(tdata[sample]["ndata"])
else:
sample_data.append(None)
else:
if var_exists and n_exists:
sample_data += [None, None, None]
elif var_exists or n_exists:
sample_data += [None, None]
else:
sample_data.append(None)
return tuple(sample_data)
def __exporter(accumulator, sample):
# pylint: disable=[R0911]
if sample in trait_data["data"]:
if dtype == "val":
return accumulator + (trait_data["data"][sample]["value"], )
if dtype == "var":
return accumulator + (trait_data["data"][sample]["variance"], )
if dtype == "N":
return accumulator + (trait_data["data"][sample]["ndata"], )
if dtype == "all":
return accumulator + __export_all_types(trait_data["data"], sample)
raise KeyError("Type `%s` is incorrect" % dtype)
if var_exists and n_exists:
return accumulator + (None, None, None)
if var_exists or n_exists:
return accumulator + (None, None)
return accumulator + (None,)
return reduce(__exporter, samplelist, tuple())
def get_trait_csv_sample_data(conn: Any,
trait_name: int, phenotype_id: int):
"""Fetch a trait and return it as a csv string"""
def __float_strip(num_str):
if str(num_str)[-2:] == ".0":
return str(int(num_str))
return str(num_str)
def __process_for_csv__(record):
return ",".join([
__float_strip(record[key]) if record[key] else "x"
for key in ("sample_name", "value", "se_error", "nstrain")])
csv_data = ["Strain Name,Value,SE,Count"] + [
__process_for_csv__(record) for record in
retrieve_publish_trait_data(
{"trait_name": trait_name, "db": {"dataset_id": phenotype_id}},
conn)]
return "\n".join(csv_data)
def update_sample_data(conn: Any, #pylint: disable=[R0913]
trait_name: str,
strain_name: str,
phenotype_id: int,
value: Union[int, float, str],
error: Union[int, float, str],
count: Union[int, str]):
"""Given the right parameters, update sample-data from the relevant
table."""
strain_id, data_id = "", ""
with conn.cursor() as cursor:
cursor.execute(
("SELECT Strain.Id, PublishData.Id FROM "
"(PublishData, Strain, PublishXRef, PublishFreeze) "
"LEFT JOIN PublishSE ON "
"(PublishSE.DataId = PublishData.Id AND "
"PublishSE.StrainId = PublishData.StrainId) "
"LEFT JOIN NStrain ON "
"(NStrain.DataId = PublishData.Id AND "
"NStrain.StrainId = PublishData.StrainId) "
"WHERE PublishXRef.InbredSetId = "
"PublishFreeze.InbredSetId AND "
"PublishData.Id = PublishXRef.DataId AND "
"PublishXRef.Id = %s AND "
"PublishXRef.PhenotypeId = %s "
"AND PublishData.StrainId = Strain.Id "
"AND Strain.Name = \"%s\"") % (trait_name,
phenotype_id,
str(strain_name)))
strain_id, data_id = cursor.fetchone()
updated_published_data: int = 0
updated_se_data: int = 0
updated_n_strains: int = 0
with conn.cursor() as cursor:
# Update the PublishData table
cursor.execute(("UPDATE PublishData SET value = %s "
"WHERE StrainId = %s AND Id = %s"),
(None if value == "x" else value,
strain_id, data_id))
updated_published_data = cursor.rowcount
# Update the PublishSE table
cursor.execute(("UPDATE PublishSE SET error = %s "
"WHERE StrainId = %s AND DataId = %s"),
(None if error == "x" else error,
strain_id, data_id))
updated_se_data = cursor.rowcount
# Update the NStrain table
cursor.execute(("UPDATE NStrain SET count = %s "
"WHERE StrainId = %s AND DataId = %s"),
(None if count == "x" else count,
strain_id, data_id))
updated_n_strains = cursor.rowcount
return (updated_published_data,
updated_se_data, updated_n_strains)
def delete_sample_data(conn: Any,
trait_name: str,
strain_name: str,
phenotype_id: int):
"""Given the right parameters, delete sample-data from the relevant
table."""
strain_id, data_id = "", ""
deleted_published_data: int = 0
deleted_se_data: int = 0
deleted_n_strains: int = 0
with conn.cursor() as cursor:
# Delete the PublishData table
try:
cursor.execute(
("SELECT Strain.Id, PublishData.Id FROM "
"(PublishData, Strain, PublishXRef, PublishFreeze) "
"LEFT JOIN PublishSE ON "
"(PublishSE.DataId = PublishData.Id AND "
"PublishSE.StrainId = PublishData.StrainId) "
"LEFT JOIN NStrain ON "
"(NStrain.DataId = PublishData.Id AND "
"NStrain.StrainId = PublishData.StrainId) "
"WHERE PublishXRef.InbredSetId = "
"PublishFreeze.InbredSetId AND "
"PublishData.Id = PublishXRef.DataId AND "
"PublishXRef.Id = %s AND "
"PublishXRef.PhenotypeId = %s "
"AND PublishData.StrainId = Strain.Id "
"AND Strain.Name = \"%s\"") % (trait_name,
phenotype_id,
str(strain_name)))
# Check if it exists if the data was already deleted:
if _result := cursor.fetchone():
strain_id, data_id = _result
# Only run if the strain_id and data_id exist
if strain_id and data_id:
cursor.execute(("DELETE FROM PublishData "
"WHERE StrainId = %s AND Id = %s")
% (strain_id, data_id))
deleted_published_data = cursor.rowcount
# Delete the PublishSE table
cursor.execute(("DELETE FROM PublishSE "
"WHERE StrainId = %s AND DataId = %s") %
(strain_id, data_id))
deleted_se_data = cursor.rowcount
# Delete the NStrain table
cursor.execute(("DELETE FROM NStrain "
"WHERE StrainId = %s AND DataId = %s" %
(strain_id, data_id)))
deleted_n_strains = cursor.rowcount
except Exception as e: #pylint: disable=[C0103, W0612]
conn.rollback()
raise MySQLdb.Error
conn.commit()
cursor.close()
cursor.close()
return (deleted_published_data,
deleted_se_data, deleted_n_strains)
def insert_sample_data(conn: Any, #pylint: disable=[R0913]
trait_name: str,
strain_name: str,
phenotype_id: int,
value: Union[int, float, str],
error: Union[int, float, str],
count: Union[int, str]):
"""Given the right parameters, insert sample-data to the relevant table.
"""
inserted_published_data, inserted_se_data, inserted_n_strains = 0, 0, 0
with conn.cursor() as cursor:
try:
cursor.execute("SELECT DataId FROM PublishXRef WHERE Id = %s AND "
"PhenotypeId = %s", (trait_name, phenotype_id))
data_id = cursor.fetchone()
cursor.execute("SELECT Id FROM Strain WHERE Name = %s",
(strain_name,))
strain_id = cursor.fetchone()
# Return early if an insert already exists!
cursor.execute("SELECT Id FROM PublishData where Id = %s "
"AND StrainId = %s",
(data_id, strain_id))
if cursor.fetchone(): # This strain already exists
return (0, 0, 0)
# Insert the PublishData table
cursor.execute(("INSERT INTO PublishData (Id, StrainId, value)"
"VALUES (%s, %s, %s)"),
(data_id, strain_id, value))
inserted_published_data = cursor.rowcount
# Insert into the PublishSE table if error is specified
if error and error != "x":
cursor.execute(("INSERT INTO PublishSE (StrainId, DataId, "
" error) VALUES (%s, %s, %s)") %
(strain_id, data_id, error))
inserted_se_data = cursor.rowcount
# Insert into the NStrain table
if count and count != "x":
cursor.execute(("INSERT INTO NStrain "
"(StrainId, DataId, error) "
"VALUES (%s, %s, %s)") %
(strain_id, data_id, count))
inserted_n_strains = cursor.rowcount
except Exception as e: #pylint: disable=[C0103, W0612]
conn.rollback()
raise MySQLdb.Error
return (inserted_published_data,
inserted_se_data, inserted_n_strains)
def retrieve_publish_trait_info(trait_data_source: Dict[str, Any], conn: Any):
"""Retrieve trait information for type `Publish` traits.
https://github.com/genenetwork/genenetwork1/blob/master/web/webqtl/base/webqtlTrait.py#L399-L421"""
keys = (
"Id", "PubMed_ID", "Pre_publication_description",
"Post_publication_description", "Original_description",
"Pre_publication_abbreviation", "Post_publication_abbreviation",
"Lab_code", "Submitter", "Owner", "Authorized_Users", "Authors",
"Title", "Abstract", "Journal", "Volume", "Pages", "Month", "Year",
"Sequence", "Units", "comments")
columns = (
"PublishXRef.Id, Publication.PubMed_ID, "
"Phenotype.Pre_publication_description, "
"Phenotype.Post_publication_description, "
"Phenotype.Original_description, "
"Phenotype.Pre_publication_abbreviation, "
"Phenotype.Post_publication_abbreviation, "
"Phenotype.Lab_code, Phenotype.Submitter, Phenotype.Owner, "
"Phenotype.Authorized_Users, CAST(Publication.Authors AS BINARY), "
"Publication.Title, Publication.Abstract, Publication.Journal, "
"Publication.Volume, Publication.Pages, Publication.Month, "
"Publication.Year, PublishXRef.Sequence, Phenotype.Units, "
"PublishXRef.comments")
query = (
"SELECT "
"{columns} "
"FROM "
"PublishXRef, Publication, Phenotype, PublishFreeze "
"WHERE "
"PublishXRef.Id = %(trait_name)s AND "
"Phenotype.Id = PublishXRef.PhenotypeId AND "
"Publication.Id = PublishXRef.PublicationId AND "
"PublishXRef.InbredSetId = PublishFreeze.InbredSetId AND "
"PublishFreeze.Id =%(trait_dataset_id)s").format(columns=columns)
with conn.cursor() as cursor:
cursor.execute(
query,
{
k: v for k, v in trait_data_source.items()
if k in ["trait_name", "trait_dataset_id"]
})
return dict(zip([k.lower() for k in keys], cursor.fetchone()))
def set_confidential_field(trait_type, trait_info):
"""Post processing function for 'Publish' trait types.
It sets the value for the 'confidential' key."""
if trait_type == "Publish":
return {
**trait_info,
"confidential": 1 if (
trait_info.get("pre_publication_description", None)
and not trait_info.get("pubmed_id", None)) else 0}
return trait_info
def retrieve_probeset_trait_info(trait_data_source: Dict[str, Any], conn: Any):
"""Retrieve trait information for type `ProbeSet` traits.
https://github.com/genenetwork/genenetwork1/blob/master/web/webqtl/base/webqtlTrait.py#L424-L435"""
keys = (
"name", "symbol", "description", "probe_target_description", "chr",
"mb", "alias", "geneid", "genbankid", "unigeneid", "omim",
"refseq_transcriptid", "blatseq", "targetseq", "chipid", "comments",
"strand_probe", "strand_gene", "probe_set_target_region", "proteinid",
"probe_set_specificity", "probe_set_blat_score",
"probe_set_blat_mb_start", "probe_set_blat_mb_end", "probe_set_strand",
"probe_set_note_by_rw", "flag")
query = (
"SELECT "
"{columns} "
"FROM "
"ProbeSet, ProbeSetFreeze, ProbeSetXRef "
"WHERE "
"ProbeSetXRef.ProbeSetFreezeId = ProbeSetFreeze.Id AND "
"ProbeSetXRef.ProbeSetId = ProbeSet.Id AND "
"ProbeSetFreeze.Name = %(trait_dataset_name)s AND "
"ProbeSet.Name = %(trait_name)s").format(
columns=", ".join(["ProbeSet.{}".format(x) for x in keys]))
with conn.cursor() as cursor:
cursor.execute(
query,
{
k: v for k, v in trait_data_source.items()
if k in ["trait_name", "trait_dataset_name"]
})
return dict(zip(keys, cursor.fetchone()))
def retrieve_geno_trait_info(trait_data_source: Dict[str, Any], conn: Any):
"""Retrieve trait information for type `Geno` traits.
https://github.com/genenetwork/genenetwork1/blob/master/web/webqtl/base/webqtlTrait.py#L438-L449"""
keys = ("name", "chr", "mb", "source2", "sequence")
query = (
"SELECT "
"{columns} "
"FROM "
"Geno, GenoFreeze, GenoXRef "
"WHERE "
"GenoXRef.GenoFreezeId = GenoFreeze.Id AND GenoXRef.GenoId = Geno.Id AND "
"GenoFreeze.Name = %(trait_dataset_name)s AND "
"Geno.Name = %(trait_name)s").format(
columns=", ".join(["Geno.{}".format(x) for x in keys]))
with conn.cursor() as cursor:
cursor.execute(
query,
{
k: v for k, v in trait_data_source.items()
if k in ["trait_name", "trait_dataset_name"]
})
return dict(zip(keys, cursor.fetchone()))
def retrieve_temp_trait_info(trait_data_source: Dict[str, Any], conn: Any):
"""Retrieve trait information for type `Temp` traits.
https://github.com/genenetwork/genenetwork1/blob/master/web/webqtl/base/webqtlTrait.py#L450-452"""
keys = ("name", "description")
query = (
"SELECT {columns} FROM Temp "
"WHERE Name = %(trait_name)s").format(columns=", ".join(keys))
with conn.cursor() as cursor:
cursor.execute(
query,
{
k: v for k, v in trait_data_source.items()
if k in ["trait_name"]
})
return dict(zip(keys, cursor.fetchone()))
def set_haveinfo_field(trait_info):
"""
Common postprocessing function for all trait types.
Sets the value for the 'haveinfo' field."""
return {**trait_info, "haveinfo": 1 if trait_info else 0}
def set_homologene_id_field_probeset(trait_info, conn):
"""
Postprocessing function for 'ProbeSet' traits.
Sets the value for the 'homologene' key.
"""
query = (
"SELECT HomologeneId FROM Homologene, Species, InbredSet"
" WHERE Homologene.GeneId = %(geneid)s AND InbredSet.Name = %(group)s"
" AND InbredSet.SpeciesId = Species.Id AND"
" Species.TaxonomyId = Homologene.TaxonomyId")
with conn.cursor() as cursor:
cursor.execute(
query,
{
k: v for k, v in trait_info.items()
if k in ["geneid", "group"]
})
res = cursor.fetchone()
if res:
return {**trait_info, "homologeneid": res[0]}
return {**trait_info, "homologeneid": None}
def set_homologene_id_field(trait_type, trait_info, conn):
"""
Common postprocessing function for all trait types.
Sets the value for the 'homologene' key."""
def set_to_null(ti): return {**ti, "homologeneid": None} # pylint: disable=[C0103, C0321]
functions_table = {
"Temp": set_to_null,
"Geno": set_to_null,
"Publish": set_to_null,
"ProbeSet": lambda ti: set_homologene_id_field_probeset(ti, conn)
}
return functions_table[trait_type](trait_info)
def load_publish_qtl_info(trait_info, conn):
"""
Load extra QTL information for `Publish` traits
"""
query = (
"SELECT PublishXRef.Locus, PublishXRef.LRS, PublishXRef.additive "
"FROM PublishXRef, PublishFreeze "
"WHERE PublishXRef.Id = %(trait_name)s "
"AND PublishXRef.InbredSetId = PublishFreeze.InbredSetId "
"AND PublishFreeze.Id = %(dataset_id)s")
with conn.cursor() as cursor:
cursor.execute(
query,
{
"trait_name": trait_info["trait_name"],
"dataset_id": trait_info["db"]["dataset_id"]
})
return dict(zip(["locus", "lrs", "additive"], cursor.fetchone()))
return {"locus": "", "lrs": "", "additive": ""}
def load_probeset_qtl_info(trait_info, conn):
"""
Load extra QTL information for `ProbeSet` traits
"""
query = (
"SELECT ProbeSetXRef.Locus, ProbeSetXRef.LRS, ProbeSetXRef.pValue, "
"ProbeSetXRef.mean, ProbeSetXRef.additive "
"FROM ProbeSetXRef, ProbeSet "
"WHERE ProbeSetXRef.ProbeSetId = ProbeSet.Id "
" AND ProbeSet.Name = %(trait_name)s "
"AND ProbeSetXRef.ProbeSetFreezeId = %(dataset_id)s")
with conn.cursor() as cursor:
cursor.execute(
query,
{
"trait_name": trait_info["trait_name"],
"dataset_id": trait_info["db"]["dataset_id"]
})
return dict(zip(
["locus", "lrs", "pvalue", "mean", "additive"], cursor.fetchone()))
return {"locus": "", "lrs": "", "pvalue": "", "mean": "", "additive": ""}
def load_qtl_info(qtl, trait_type, trait_info, conn):
"""
Load extra QTL information for traits
DESCRIPTION:
Migrated from
https://github.com/genenetwork/genenetwork1/blob/master/web/webqtl/base/webqtlTrait.py#L500-L534
PARAMETERS:
qtl: boolean
trait_type: string
The type of the trait in consideration
trait_info: map/dictionary
A dictionary of the trait's key-value pairs
conn:
A database connection object
"""
if not qtl:
return trait_info
qtl_info_functions = {
"Publish": load_publish_qtl_info,
"ProbeSet": load_probeset_qtl_info
}
if trait_info["name"] not in qtl_info_functions.keys():
return trait_info
return qtl_info_functions[trait_type](trait_info, conn)
def build_trait_name(trait_fullname):
"""
Initialises the trait's name, and other values from the search data provided
"""
def dataset_type(dset_name):
if dset_name.find('Temp') >= 0:
return "Temp"
if dset_name.find('Geno') >= 0:
return "Geno"
if dset_name.find('Publish') >= 0:
return "Publish"
return "ProbeSet"
name_parts = trait_fullname.split("::")
assert len(name_parts) >= 2, f"Name format error: '{trait_fullname}'"
dataset_name = name_parts[0]
dataset_type = dataset_type(dataset_name)
return {
"db": {
"dataset_name": dataset_name,
"dataset_type": dataset_type},
"trait_fullname": trait_fullname,
"trait_name": name_parts[1],
"cellid": name_parts[2] if len(name_parts) == 3 else ""
}
def retrieve_probeset_sequence(trait, conn):
"""
Retrieve a 'ProbeSet' trait's sequence information
"""
query = (
"SELECT ProbeSet.BlatSeq "
"FROM ProbeSet, ProbeSetFreeze, ProbeSetXRef "
"WHERE ProbeSet.Id=ProbeSetXRef.ProbeSetId "
"AND ProbeSetFreeze.Id = ProbeSetXRef.ProbeSetFreezeId "
"AND ProbeSet.Name = %(trait_name)s "
"AND ProbeSetFreeze.Name = %(dataset_name)s")
with conn.cursor() as cursor:
cursor.execute(
query,
{
"trait_name": trait["trait_name"],
"dataset_name": trait["db"]["dataset_name"]
})
seq = cursor.fetchone()
return {**trait, "sequence": seq[0] if seq else ""}
def retrieve_trait_info(
threshold: int, trait_full_name: str, conn: Any,
qtl=None):
"""Retrieves the trait information.
https://github.com/genenetwork/genenetwork1/blob/master/web/webqtl/base/webqtlTrait.py#L397-L456
This function, or the dependent functions, might be incomplete as they are
currently."""
trait = build_trait_name(trait_full_name)
trait_dataset_type = trait["db"]["dataset_type"]
trait_info_function_table = {
"Publish": retrieve_publish_trait_info,
"ProbeSet": retrieve_probeset_trait_info,
"Geno": retrieve_geno_trait_info,
"Temp": retrieve_temp_trait_info
}
common_post_processing_fn = compose(
lambda ti: load_qtl_info(qtl, trait_dataset_type, ti, conn),
lambda ti: set_homologene_id_field(trait_dataset_type, ti, conn),
lambda ti: {"trait_type": trait_dataset_type, **ti},
lambda ti: {**trait, **ti})
trait_post_processing_functions_table = {
"Publish": compose(
lambda ti: set_confidential_field(trait_dataset_type, ti),
common_post_processing_fn),
"ProbeSet": compose(
lambda ti: retrieve_probeset_sequence(ti, conn),
common_post_processing_fn),
"Geno": common_post_processing_fn,
"Temp": common_post_processing_fn
}
retrieve_info = compose(
set_haveinfo_field, trait_info_function_table[trait_dataset_type])
trait_dataset = retrieve_trait_dataset(
trait_dataset_type, trait, threshold, conn)
trait_info = retrieve_info(
{
"trait_name": trait["trait_name"],
"trait_dataset_id": trait_dataset["dataset_id"],
"trait_dataset_name": trait_dataset["dataset_name"]
},
conn)
if trait_info["haveinfo"]:
return {
**trait_post_processing_functions_table[trait_dataset_type](
{**trait_info, "group": trait_dataset["group"]}),
"db": {**trait["db"], **trait_dataset}
}
return trait_info
def retrieve_temp_trait_data(trait_info: dict, conn: Any):
"""
Retrieve trait data for `Temp` traits.
"""
query = (
"SELECT "
"Strain.Name, TempData.value, TempData.SE, TempData.NStrain, "
"TempData.Id "
"FROM TempData, Temp, Strain "
"WHERE TempData.StrainId = Strain.Id "
"AND TempData.Id = Temp.DataId "
"AND Temp.name = %(trait_name)s "
"ORDER BY Strain.Name")
with conn.cursor() as cursor:
cursor.execute(
query,
{"trait_name": trait_info["trait_name"]})
return [dict(zip(
["sample_name", "value", "se_error", "nstrain", "id"],
row))
for row in cursor.fetchall()]
return []
def retrieve_species_id(group, conn: Any):
"""
Retrieve a species id given the Group value
"""
with conn.cursor as cursor:
cursor.execute(
"SELECT SpeciesId from InbredSet WHERE Name = %(group)s",
{"group": group})
return cursor.fetchone()[0]
return None
def retrieve_geno_trait_data(trait_info: Dict, conn: Any):
"""
Retrieve trait data for `Geno` traits.
"""
query = (
"SELECT Strain.Name, GenoData.value, GenoSE.error, GenoData.Id "
"FROM (GenoData, GenoFreeze, Strain, Geno, GenoXRef) "
"LEFT JOIN GenoSE ON "
"(GenoSE.DataId = GenoData.Id AND GenoSE.StrainId = GenoData.StrainId) "
"WHERE Geno.SpeciesId = %(species_id)s "
"AND Geno.Name = %(trait_name)s AND GenoXRef.GenoId = Geno.Id "
"AND GenoXRef.GenoFreezeId = GenoFreeze.Id "
"AND GenoFreeze.Name = %(dataset_name)s "
"AND GenoXRef.DataId = GenoData.Id "
"AND GenoData.StrainId = Strain.Id "
"ORDER BY Strain.Name")
with conn.cursor() as cursor:
cursor.execute(
query,
{"trait_name": trait_info["trait_name"],
"dataset_name": trait_info["db"]["dataset_name"],
"species_id": retrieve_species_id(
trait_info["db"]["group"], conn)})
return [
dict(zip(
["sample_name", "value", "se_error", "id"],
row))
for row in cursor.fetchall()]
return []
def retrieve_publish_trait_data(trait_info: Dict, conn: Any):
"""
Retrieve trait data for `Publish` traits.
"""
query = (
"SELECT "
"Strain.Name, PublishData.value, PublishSE.error, NStrain.count, "
"PublishData.Id "
"FROM (PublishData, Strain, PublishXRef, PublishFreeze) "
"LEFT JOIN PublishSE ON "
"(PublishSE.DataId = PublishData.Id "
"AND PublishSE.StrainId = PublishData.StrainId) "
"LEFT JOIN NStrain ON "
"(NStrain.DataId = PublishData.Id "
"AND NStrain.StrainId = PublishData.StrainId) "
"WHERE PublishXRef.InbredSetId = PublishFreeze.InbredSetId "
"AND PublishData.Id = PublishXRef.DataId "
"AND PublishXRef.Id = %(trait_name)s "
"AND PublishFreeze.Id = %(dataset_id)s "
"AND PublishData.StrainId = Strain.Id "
"ORDER BY Strain.Name")
with conn.cursor() as cursor:
cursor.execute(
query,
{"trait_name": trait_info["trait_name"],
"dataset_id": trait_info["db"]["dataset_id"]})
return [
dict(zip(
["sample_name", "value", "se_error", "nstrain", "id"], row))
for row in cursor.fetchall()]
return []
def retrieve_cellid_trait_data(trait_info: Dict, conn: Any):
"""
Retrieve trait data for `Probe Data` types.
"""
query = (
"SELECT "
"Strain.Name, ProbeData.value, ProbeSE.error, ProbeData.Id "
"FROM (ProbeData, ProbeFreeze, ProbeSetFreeze, ProbeXRef, Strain,"
" Probe, ProbeSet) "
"LEFT JOIN ProbeSE ON "
"(ProbeSE.DataId = ProbeData.Id "
" AND ProbeSE.StrainId = ProbeData.StrainId) "
"WHERE Probe.Name = %(cellid)s "
"AND ProbeSet.Name = %(trait_name)s "
"AND Probe.ProbeSetId = ProbeSet.Id "
"AND ProbeXRef.ProbeId = Probe.Id "
"AND ProbeXRef.ProbeFreezeId = ProbeFreeze.Id "
"AND ProbeSetFreeze.ProbeFreezeId = ProbeFreeze.Id "
"AND ProbeSetFreeze.Name = %(dataset_name)s "
"AND ProbeXRef.DataId = ProbeData.Id "
"AND ProbeData.StrainId = Strain.Id "
"ORDER BY Strain.Name")
with conn.cursor() as cursor:
cursor.execute(
query,
{"cellid": trait_info["cellid"],
"trait_name": trait_info["trait_name"],
"dataset_id": trait_info["db"]["dataset_id"]})
return [
dict(zip(
["sample_name", "value", "se_error", "id"], row))
for row in cursor.fetchall()]
return []
def retrieve_probeset_trait_data(trait_info: Dict, conn: Any):
"""
Retrieve trait data for `ProbeSet` traits.
"""
query = (
"SELECT Strain.Name, ProbeSetData.value, ProbeSetSE.error, "
"ProbeSetData.Id "
"FROM (ProbeSetData, ProbeSetFreeze, Strain, ProbeSet, ProbeSetXRef) "
"LEFT JOIN ProbeSetSE ON "
"(ProbeSetSE.DataId = ProbeSetData.Id "
"AND ProbeSetSE.StrainId = ProbeSetData.StrainId) "
"WHERE ProbeSet.Name = %(trait_name)s "
"AND ProbeSetXRef.ProbeSetId = ProbeSet.Id "
"AND ProbeSetXRef.ProbeSetFreezeId = ProbeSetFreeze.Id "
"AND ProbeSetFreeze.Name = %(dataset_name)s "
"AND ProbeSetXRef.DataId = ProbeSetData.Id "
"AND ProbeSetData.StrainId = Strain.Id "
"ORDER BY Strain.Name")
with conn.cursor() as cursor:
cursor.execute(
query,
{"trait_name": trait_info["trait_name"],
"dataset_name": trait_info["db"]["dataset_name"]})
return [
dict(zip(
["sample_name", "value", "se_error", "id"], row))
for row in cursor.fetchall()]
return []
def with_samplelist_data_setup(samplelist: Sequence[str]):
"""
Build function that computes the trait data from provided list of samples.
PARAMETERS
samplelist: (list)
A list of sample names
RETURNS:
Returns a function that given some data from the database, computes the
sample's value, variance and ndata values, only if the sample is present
in the provided `samplelist` variable.
"""
def setup_fn(tdata):
if tdata["sample_name"] in samplelist:
val = tdata["value"]
if val is not None:
return {
"sample_name": tdata["sample_name"],
"value": val,
"variance": tdata["se_error"],
"ndata": tdata.get("nstrain", None)
}
return None
return setup_fn
def without_samplelist_data_setup():
"""
Build function that computes the trait data.
RETURNS:
Returns a function that given some data from the database, computes the
sample's value, variance and ndata values.
"""
def setup_fn(tdata):
val = tdata["value"]
if val is not None:
return {
"sample_name": tdata["sample_name"],
"value": val,
"variance": tdata["se_error"],
"ndata": tdata.get("nstrain", None)
}
return None
return setup_fn
def retrieve_trait_data(trait: dict, conn: Any, samplelist: Sequence[str] = tuple()):
"""
Retrieve trait data
DESCRIPTION
Retrieve trait data as is done in
https://github.com/genenetwork/genenetwork1/blob/master/web/webqtl/base/webqtlTrait.py#L258-L386
"""
# I do not like this section, but it retains the flow in the old codebase
if trait["db"]["dataset_type"] == "Temp":
results = retrieve_temp_trait_data(trait, conn)
elif trait["db"]["dataset_type"] == "Publish":
results = retrieve_publish_trait_data(trait, conn)
elif trait["cellid"]:
results = retrieve_cellid_trait_data(trait, conn)
elif trait["db"]["dataset_type"] == "ProbeSet":
results = retrieve_probeset_trait_data(trait, conn)
else:
results = retrieve_geno_trait_data(trait, conn)
if results:
# do something with mysqlid
mysqlid = results[0]["id"]
if samplelist:
data = [
item for item in
map(with_samplelist_data_setup(samplelist), results)
if item is not None]
else:
data = [
item for item in
map(without_samplelist_data_setup(), results)
if item is not None]
return {
"mysqlid": mysqlid,
"data": dict(map(
lambda x: (
x["sample_name"],
{k: v for k, v in x.items() if x != "sample_name"}),
data))}
return {}
def generate_traits_filename(base_path: str = TMPDIR):
"""Generate a unique filename for use with generated traits files."""
return "{}/traits_test_file_{}.txt".format(
os.path.abspath(base_path), random_string(10))
def export_informative(trait_data: dict, inc_var: bool = False) -> tuple:
"""
Export informative strain
This is a migration of the `exportInformative` function in
web/webqtl/base/webqtlTrait.py module in GeneNetwork1.
There is a chance that the original implementation has a bug, especially
dealing with the `inc_var` value. It the `inc_var` value is meant to control
the inclusion of the `variance` value, then the current implementation, and
that one in GN1 have a bug.
"""
def __exporter__(acc, data_item):
if not inc_var or data_item["variance"] is not None:
return (
acc[0] + (data_item["sample_name"],),
acc[1] + (data_item["value"],),
acc[2] + (data_item["variance"],))
return acc
return reduce(
__exporter__,
filter(lambda td: td["value"] is not None,
trait_data["data"].values()),
(tuple(), tuple(), tuple()))