aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gn3/db/correlations.py139
-rw-r--r--gn3/db/species.py31
-rw-r--r--gn3/partial_correlations.py38
-rw-r--r--tests/unit/test_partial_correlations.py64
4 files changed, 269 insertions, 3 deletions
diff --git a/gn3/db/correlations.py b/gn3/db/correlations.py
new file mode 100644
index 0000000..67cfef9
--- /dev/null
+++ b/gn3/db/correlations.py
@@ -0,0 +1,139 @@
+"""
+This module will hold functions that are used in the (partial) correlations
+feature to access the database to retrieve data needed for computations.
+"""
+
+from typing import Any
+
+from gn3.random import random_string
+from gn3.db.species import translate_to_mouse_gene_id
+
+def get_filename(target_db_name: str, conn: Any) -> str:
+ """
+ Retrieve the name of the reference database file with which correlations are
+ computed.
+
+ This is a migration of the
+ `web.webqtl.correlation.CorrelationPage.getFileName` function in
+ GeneNetwork1.
+ """
+ with conn.cursor() as cursor:
+ cursor.execute(
+ "SELECT Id, FullName from ProbeSetFreeze WHERE Name-%s",
+ target_db_name)
+ result = cursor.fetchone()
+ if result:
+ return "ProbeSetFreezeId_{tid}_FullName_{fname}.txt".format(
+ tid=result[0],
+ fname=result[1].replace(' ', '_').replace('/', '_'))
+
+ return ""
+
+def build_temporary_literature_table(
+ species: str, gene_id: int, return_number: int, conn: Any) -> str:
+ """
+ Build and populate a temporary table to hold the literature correlation data
+ to be used in computations.
+
+ "This is a migration of the
+ `web.webqtl.correlation.CorrelationPage.getTempLiteratureTable` function in
+ GeneNetwork1.
+ """
+ def __translated_species_id(row, cursor):
+ if species == "mouse":
+ return row[1]
+ query = {
+ "rat": "SELECT rat FROM GeneIDXRef WHERE mouse=%s",
+ "human": "SELECT human FROM GeneIDXRef WHERE mouse=%d"}
+ if species in query.keys():
+ cursor.execute(query[species], row[1])
+ record = cursor.fetchone()
+ if record:
+ return record[0]
+ return None
+ return None
+
+ temp_table_name = f"TOPLITERATURE{random_string(8)}"
+ with conn.cursor as cursor:
+ mouse_geneid = translate_to_mouse_gene_id(species, gene_id, conn)
+ data_query = (
+ "SELECT GeneId1, GeneId2, value FROM LCorrRamin3 "
+ "WHERE GeneId1 = %(mouse_gene_id)s "
+ "UNION ALL "
+ "SELECT GeneId2, GeneId1, value FROM LCorrRamin3 "
+ "WHERE GeneId2 = %(mouse_gene_id)s "
+ "AND GeneId1 != %(mouse_gene_id)s")
+ cursor.execute(
+ (f"CREATE TEMPORARY TABLE {temp_table_name} ("
+ "GeneId1 int(12) unsigned, "
+ "GeneId2 int(12) unsigned PRIMARY KEY, "
+ "value double)"))
+ cursor.execute(data_query, mouse_gene_id=mouse_geneid)
+ literature_data = [
+ {"GeneId1": row[0], "GeneId2": row[1], "value": row[2]}
+ for row in cursor.fetchall()
+ if __translated_species_id(row, cursor)]
+
+ cursor.execute(
+ (f"INSERT INTO {temp_table_name} "
+ "VALUES (%(GeneId1)s, %(GeneId2)s, %(value)s)"),
+ literature_data[0:(2 * return_number)])
+
+ return temp_table_name
+
+def fetch_geno_literature_correlations(temp_table: str) -> str:
+ """
+ Helper function for `fetch_literature_correlations` below, to build query
+ for `Geno*` tables.
+ """
+ return (
+ f"SELECT Geno.Name, {temp_table}.value "
+ "FROM Geno, GenoXRef, GenoFreeze "
+ f"LEFT JOIN {temp_table} ON {temp_table}.GeneId2=ProbeSet.GeneId "
+ "WHERE ProbeSet.GeneId IS NOT NULL "
+ f"AND {temp_table}.value IS NOT NULL "
+ "AND GenoXRef.GenoFreezeId = GenoFreeze.Id "
+ "AND GenoFreeze.Name = %(db_name)s "
+ "AND Geno.Id=GenoXRef.GenoId "
+ "ORDER BY Geno.Id")
+
+def fetch_probeset_literature_correlations(temp_table: str) -> str:
+ """
+ Helper function for `fetch_literature_correlations` below, to build query
+ for `ProbeSet*` tables.
+ """
+ return (
+ f"SELECT ProbeSet.Name, {temp_table}.value "
+ "FROM ProbeSet, ProbeSetXRef, ProbeSetFreeze "
+ "LEFT JOIN {temp_table} ON {temp_table}.GeneId2=ProbeSet.GeneId "
+ "WHERE ProbeSet.GeneId IS NOT NULL "
+ "AND {temp_table}.value IS NOT NULL "
+ "AND ProbeSetXRef.ProbeSetFreezeId = ProbeSetFreeze.Id "
+ "AND ProbeSetFreeze.Name = %(db_name)s "
+ "AND ProbeSet.Id=ProbeSetXRef.ProbeSetId "
+ "ORDER BY ProbeSet.Id")
+
+def fetch_literature_correlations(
+ species: str, gene_id: int, dataset: dict, return_number: int,
+ conn: Any) -> dict:
+ """
+ Gather the literature correlation data and pair it with trait id string(s).
+
+ This is a migration of the
+ `web.webqtl.correlation.CorrelationPage.fetchLitCorrelations` function in
+ GeneNetwork1.
+ """
+ temp_table = build_temporary_literature_table(
+ species, gene_id, return_number, conn)
+ query_fns = {
+ "Geno": fetch_geno_literature_correlations,
+ # "Temp": fetch_temp_literature_correlations,
+ # "Publish": fetch_publish_literature_correlations,
+ "ProbeSet": fetch_probeset_literature_correlations}
+ with conn.cursor as cursor:
+ cursor.execute(
+ query_fns[dataset["dataset_type"]](temp_table),
+ db_name=dataset["dataset_name"])
+ results = cursor.fetchall()
+ cursor.execute("DROP TEMPORARY TABLE %s", temp_table)
+ return dict(results) # {trait_name: lit_corr for trait_name, lit_corr in results}
diff --git a/gn3/db/species.py b/gn3/db/species.py
index 0deae4e..1e5015f 100644
--- a/gn3/db/species.py
+++ b/gn3/db/species.py
@@ -30,3 +30,34 @@ def get_chromosome(name: str, is_species: bool, conn: Any) -> Optional[Tuple]:
with conn.cursor() as cursor:
cursor.execute(_sql)
return cursor.fetchall()
+
+def translate_to_mouse_gene_id(species: str, geneid: int, conn: Any) -> int:
+ """
+ Translate rat or human geneid to mouse geneid
+
+ This is a migration of the
+ `web.webqtl.correlation/CorrelationPage.translateToMouseGeneID` function in
+ GN1
+ """
+ assert species in ("rat", "mouse", "human"), "Invalid species"
+ if geneid is None:
+ return 0
+
+ if species == "mouse":
+ return geneid
+
+ with conn.cursor as cursor:
+ if species == "rat":
+ cursor.execute(
+ "SELECT mouse FROM GeneIDXRef WHERE rat = %s", geneid)
+ rat_geneid = cursor.fetchone()
+ if rat_geneid:
+ return rat_geneid[0]
+
+ cursor.execute(
+ "SELECT mouse FROM GeneIDXRef WHERE human = %s", geneid)
+ human_geneid = cursor.fetchone()
+ if human_geneid:
+ return human_geneid[0]
+
+ return 0 # default if all else fails
diff --git a/gn3/partial_correlations.py b/gn3/partial_correlations.py
index c556d10..1fb0ccc 100644
--- a/gn3/partial_correlations.py
+++ b/gn3/partial_correlations.py
@@ -6,7 +6,7 @@ GeneNetwork1.
"""
from functools import reduce
-from typing import Any, Sequence
+from typing import Any, Tuple, Sequence
def control_samples(controls: Sequence[dict], sampleslist: Sequence[str]):
"""
@@ -86,3 +86,39 @@ def fix_samples(primary_trait: dict, control_traits: Sequence[dict]) -> Sequence
control_vals_vars[0],
tuple(primary_trait[sample]["variance"] for sample in primary_samples),
control_vals_vars[1])
+
+def find_identical_traits(
+ primary_name: str, primary_value: float, control_names: Tuple[str, ...],
+ control_values: Tuple[float, ...]) -> Tuple[str, ...]:
+ """
+ Find traits that have the same value when the values are considered to
+ 3 decimal places.
+
+ This is a migration of the
+ `web.webqtl.correlation.correlationFunction.findIdenticalTraits` function in
+ GN1.
+ """
+ def __merge_identicals__(
+ acc: Tuple[str, ...],
+ ident: Tuple[str, Tuple[str, ...]]) -> Tuple[str, ...]:
+ return acc + ident[1]
+
+ def __dictify_controls__(acc, control_item):
+ ckey = "{:.3f}".format(control_item[0])
+ return {**acc, ckey: acc.get(ckey, tuple()) + (control_item[1],)}
+
+ return (reduce(## for identical control traits
+ __merge_identicals__,
+ (item for item in reduce(# type: ignore[var-annotated]
+ __dictify_controls__, zip(control_values, control_names),
+ {}).items() if len(item[1]) > 1),
+ tuple())
+ or
+ reduce(## If no identical control traits, try primary and controls
+ __merge_identicals__,
+ (item for item in reduce(# type: ignore[var-annotated]
+ __dictify_controls__,
+ zip((primary_value,) + control_values,
+ (primary_name,) + control_names), {}).items()
+ if len(item[1]) > 1),
+ tuple()))
diff --git a/tests/unit/test_partial_correlations.py b/tests/unit/test_partial_correlations.py
index 7631a71..60e54c1 100644
--- a/tests/unit/test_partial_correlations.py
+++ b/tests/unit/test_partial_correlations.py
@@ -4,7 +4,8 @@ from unittest import TestCase
from gn3.partial_correlations import (
fix_samples,
control_samples,
- dictify_by_samples)
+ dictify_by_samples,
+ find_identical_traits)
sampleslist = ["B6cC3-1", "BXD1", "BXD12", "BXD16", "BXD19", "BXD2"]
control_traits = (
@@ -106,6 +107,8 @@ class TestPartialCorrelations(TestCase):
def test_dictify_by_samples(self):
"""
+ Test that `dictify_by_samples` generates the appropriate dict
+
Given:
a sequence of sequences with sample names, values and variances, as
in the output of `gn3.partial_correlations.control_samples` or
@@ -133,7 +136,34 @@ class TestPartialCorrelations(TestCase):
dictified_control_samples)
def test_fix_samples(self):
- """Test that fix_samples fixes the values"""
+ """
+ Test that `fix_samples` returns only the common samples
+
+ Given:
+ - A primary trait
+ - A sequence of control samples
+ When:
+ - The two arguments are passed to `fix_samples`
+ Then:
+ - Only the names of the samples present in the primary trait that
+ are also present in ALL the control traits are present in the
+ return value
+ - Only the values of the samples present in the primary trait that
+ are also present in ALL the control traits are present in the
+ return value
+ - ALL the values for ALL the control traits are present in the
+ return value
+ - Only the variances of the samples present in the primary trait
+ that are also present in ALL the control traits are present in the
+ return value
+ - ALL the variances for ALL the control traits are present in the
+ return value
+ - The return value is a tuple of the above items, in the following
+ order:
+ ((sample_names, ...), (primary_trait_values, ...),
+ (control_traits_values, ...), (primary_trait_variances, ...)
+ (control_traits_variances, ...))
+ """
self.assertEqual(
fix_samples(
{"B6cC3-1": {"sample_name": "B6cC3-1", "value": 7.51879,
@@ -149,3 +179,33 @@ class TestPartialCorrelations(TestCase):
(None,),
(None, None, None, None, None, None, None, None, None, None, None,
None, None)))
+
+ def test_find_identical_traits(self):
+ """
+ Test `gn3.partial_correlations.find_identical_traits`.
+
+ Given:
+ - the name of a primary trait
+ - the value of a primary trait
+ - a sequence of names of control traits
+ - a sequence of values of control traits
+ When:
+ - the arguments above are passed to the `find_identical_traits`
+ function
+ Then:
+ - Return ALL trait names that have the same value when up to three
+ decimal places are considered
+ """
+ for primn, primv, contn, contv, expected in (
+ ("pt", 12.98395, ("ct0", "ct1", "ct2"),
+ (0.1234, 2.3456, 3.4567), tuple()),
+ ("pt", 12.98395, ("ct0", "ct1", "ct2"),
+ (12.98354, 2.3456, 3.4567), ("pt", "ct0")),
+ ("pt", 12.98395, ("ct0", "ct1", "ct2", "ct3"),
+ (0.1234, 2.3456, 0.1233, 4.5678), ("ct0", "ct2"))
+ ):
+ with self.subTest(
+ primary_name=primn, primary_value=primv,
+ control_names=contn, control_values=contv):
+ self.assertEqual(
+ find_identical_traits(primn, primv, contn, contv), expected)