aboutsummaryrefslogtreecommitdiff
path: root/gn3/db
diff options
context:
space:
mode:
Diffstat (limited to 'gn3/db')
-rw-r--r--gn3/db/__init__.py2
-rw-r--r--gn3/db/traits.py165
2 files changed, 77 insertions, 90 deletions
diff --git a/gn3/db/__init__.py b/gn3/db/__init__.py
index 5ab9f3c..149a344 100644
--- a/gn3/db/__init__.py
+++ b/gn3/db/__init__.py
@@ -1,7 +1,7 @@
# pylint: disable=[R0902, R0903]
"""Module that exposes common db operations"""
from dataclasses import asdict, astuple
-from typing import Any, Dict, List, Optional, Generator, Union
+from typing import Any, Dict, List, Optional, Generator, Tuple, Union
from typing_extensions import Protocol
from gn3.db.metadata_audit import MetadataAudit
diff --git a/gn3/db/traits.py b/gn3/db/traits.py
index f18e16a..ae1939a 100644
--- a/gn3/db/traits.py
+++ b/gn3/db/traits.py
@@ -1,95 +1,81 @@
-"""This contains all the necessary functions that are required to add traits
-to the published database"""
-from dataclasses import dataclass
-from typing import Any, Dict, Optional
-
-
-@dataclass(frozen=True)
-class Riset:
- """Class for keeping track of riset. A riset is a group e.g. rat HSNIH-Palmer,
-BXD
-
- """
- name: Optional[str]
- r_id: Optional[int]
-
-
-@dataclass(frozen=True)
-class WebqtlCaseData:
- """Class for keeping track of one case data in one trait"""
- value: Optional[float] = None
- variance: Optional[float] = None
- count: Optional[int] = None # Number of Individuals
-
- def __str__(self):
- _str = ""
- if self.value:
- _str += f"value={self.value:.3f}"
- if self.variance:
- _str += f" variance={self.variance:.3f}"
- if self.count:
- _str += " n_data={self.count}"
- return _str
-
-
-def lookup_webqtldataset_name(riset_name: str, conn: Any):
- """Given a group name(riset), return it's name e.g. BXDPublish,
-HLCPublish."""
+"""This class contains functions relating to trait data manipulation"""
+from typing import Any, Dict, Union
+
+
+def get_trait_csv_sample_data(conn: Any,
+ trait_name: int, phenotype_id: int):
+ """Fetch a trait and return it as a csv string"""
+ sql = ("SELECT Strain.Id, PublishData.Id, Strain.Name, "
+ "PublishData.value, "
+ "PublishSE.error, NStrain.count FROM "
+ "(PublishData, Strain, PublishXRef, PublishFreeze) "
+ "LEFT JOIN PublishSE ON "
+ "(PublishSE.DataId = PublishData.Id AND "
+ "PublishSE.StrainId = PublishData.StrainId) "
+ "LEFT JOIN NStrain ON (NStrain.DataId = PublishData.Id AND "
+ "NStrain.StrainId = PublishData.StrainId) WHERE "
+ "PublishXRef.InbredSetId = PublishFreeze.InbredSetId AND "
+ "PublishData.Id = PublishXRef.DataId AND "
+ "PublishXRef.Id = %s AND PublishXRef.PhenotypeId = %s "
+ "AND PublishData.StrainId = Strain.Id Order BY Strain.Name")
+ csv_data = ["Strain Id,Strain Name,Value,SE,Count"]
+ publishdata_id = ""
with conn.cursor() as cursor:
- cursor.execute(
- "SELECT PublishFreeze.Name FROM "
- "PublishFreeze, InbredSet WHERE "
- "PublishFreeze.InbredSetId = InbredSet.Id "
- "AND InbredSet.Name = '%s'" % riset_name)
- _result, *_ = cursor.fetchone()
- return _result
+ cursor.execute(sql, (trait_name, phenotype_id,))
+ for record in cursor.fetchall():
+ (strain_id, publishdata_id,
+ strain_name, value, error, count) = record
+ csv_data.append(
+ ",".join([str(val) if val else "x"
+ for val in (strain_id, strain_name,
+ value, error, count)]))
+ return f"# Publish Data Id: {publishdata_id}\n\n" + "\n".join(csv_data)
+
+
+def update_sample_data(conn: Any,
+ strain_name: str,
+ strain_id: int,
+ publish_data_id: int,
+ value: Union[int, float, str],
+ error: Union[int, float, str],
+ count: Union[int, str]):
+ """Given the right parameters, update sample-data from the relevant
+ table."""
+ STRAIN_ID_SQL: str = "UPDATE Strain SET Name = %s WHERE Id = %s"
+ PUBLISH_DATA_SQL: str = ("UPDATE PublishData SET value = %s "
+ "WHERE StrainId = %s AND Id = %s")
+ PUBLISH_SE_SQL: str = ("UPDATE PublishSE SET error = %s "
+ "WHERE StrainId = %s AND DataId = %s")
+ N_STRAIN_SQL: str = ("UPDATE NStrain SET count = %s "
+ "WHERE StrainId = %s AND DataId = %s")
+
+ updated_strains: int = 0
+ updated_published_data: int = 0
+ updated_se_data: int = 0
+ updated_n_strains: int = 0
-
-def get_riset(data_type: str, name: str, conn: Any):
- """Get the groups given the data type and it's PublishFreeze or GenoFreeze
-name
-
- """
- query, _name, _id = None, None, None
- if data_type == "Publish":
- query = ("SELECT InbredSet.Name, InbredSet.Id FROM InbredSet, "
- "PublishFreeze WHERE PublishFreeze.InbredSetId = "
- "InbredSet.Id AND PublishFreeze.Name = '%s'" % name)
- elif data_type == "Geno":
- query = ("SELECT InbredSet.Name, InbredSet.Id FROM InbredSet, "
- "GenoFreeze WHERE GenoFreeze.InbredSetId = "
- "InbredSet.Id AND GenoFreeze.Name = '%s'" % name)
- elif data_type == "ProbeSet":
- query = ("SELECT InbredSet.Name, InbredSet.Id FROM "
- "InbredSet, ProbeSetFreeze, ProbeFreeze WHERE "
- "ProbeFreeze.InbredSetId = InbredSet.Id AND "
- "ProbeFreeze.Id = ProbeSetFreeze.ProbeFreezeId AND "
- "ProbeSetFreeze.Name = '%s'" % name)
- if query:
- with conn.cursor() as cursor:
- _name, _id = cursor.fetchone()
- if _name == "BXD300":
- _name = "BXD"
- return Riset(_name, _id)
-
-
-def insert_publication(pubmed_id: int, publication: Optional[Dict],
- conn: Any):
- """Creates a new publication record if it's not available"""
- sql = ("SELECT Id FROM Publication where "
- "PubMed_ID = %d" % pubmed_id)
- _id = None
with conn.cursor() as cursor:
- cursor.execute(sql)
- _id = cursor.fetchone()
- if not _id and publication:
- # The Publication contains the fields: 'authors', 'title', 'abstract',
- # 'journal','volume','pages','month','year'
- insert_query = ("INSERT into Publication (%s) Values (%s)" %
- (", ".join(publication.keys()),
- ", ".join(['%s'] * len(publication))))
- with conn.cursor() as cursor:
- cursor.execute(insert_query, tuple(publication.values()))
+ # Update the Strains table
+ cursor.execute(STRAIN_ID_SQL, (strain_name, strain_id))
+ updated_strains: int = cursor.rowcount
+ # Update the PublishData table
+ cursor.execute(PUBLISH_DATA_SQL,
+ (None if value == "x" else value,
+ strain_id, publish_data_id))
+ updated_published_data: int = cursor.rowcount
+ # Update the PublishSE table
+ cursor.execute(PUBLISH_SE_SQL,
+ (None if error == "x" else error,
+ strain_id, publish_data_id))
+ updated_se_data: int = cursor.rowcount
+ # Update the NStrain table
+ cursor.execute(N_STRAIN_SQL,
+ (None if count == "x" else count,
+ strain_id, publish_data_id))
+ updated_n_strains: int = cursor.rowcount
+ return (updated_strains, updated_published_data,
+ updated_se_data, updated_n_strains)
+
def retrieve_trait_dataset_name(
trait_type: str, threshold: int, name: str, connection: Any):
@@ -137,6 +123,7 @@ PUBLISH_TRAIT_INFO_QUERY = (
"PublishXRef.InbredSetId = PublishFreeze.InbredSetId AND "
"PublishFreeze.Id =%(trait_dataset_id)s")
+
def retrieve_publish_trait_info(trait_data_source: Dict[str, Any], conn: Any):
"""Retrieve trait information for type `Publish` traits.