From 30c31955df6a9d20c28e0dcd9f99f06cbfd51a77 Mon Sep 17 00:00:00 2001 From: Munyoki Kilyungi Date: Sun, 9 Oct 2022 14:04:29 +0300 Subject: Use the Maybe Monad when fetching the accession_id of a dataset * wqflask/base/data_set/dataset.py: Import itertools, DictCursor, MonadicDict, Maybe, Nothing and query_sql. (DataSet.__init__): Initialize accession_id to Nothing. (DataSet.as_dict): Rename this to ... (Dataset.as_monadic_dict): ... this which returns a monadic dictionary. (DataSet.get_accession_id): Query for the accession_id and return it as a maybe monad. --- wqflask/base/data_set/dataset.py | 90 ++++++++++++++++++++++------------------ 1 file changed, 50 insertions(+), 40 deletions(-) (limited to 'wqflask/base/data_set/dataset.py') diff --git a/wqflask/base/data_set/dataset.py b/wqflask/base/data_set/dataset.py index b6899278..dfe09921 100644 --- a/wqflask/base/data_set/dataset.py +++ b/wqflask/base/data_set/dataset.py @@ -2,13 +2,15 @@ import math import collections - +import itertools from redis import Redis - +from MySQLdb.cursors import DictCursor from base import species from utility import chunks +from gn3.monads import MonadicDict, query_sql +from pymonad.maybe import Maybe, Nothing from .datasetgroup import DatasetGroup from wqflask.database import database_connection from utility.db_tools import escape, mescape, create_in_clause @@ -30,7 +32,7 @@ class DataSet: self.fullname = None self.type = None self.data_scale = None # ZS: For example log2 - self.accession_id = None + self.accession_id = Nothing self.setup() @@ -47,50 +49,58 @@ class DataSet: self.group.get_samplelist(redis_conn) self.species = species.TheSpecies(dataset=self) - def as_dict(self): - return { + def as_monadic_dict(self): + _result = MonadicDict({ 'name': self.name, 'shortname': self.shortname, 'fullname': self.fullname, 'type': self.type, 'data_scale': self.data_scale, - 'group': self.group.name, - 'accession_id': self.accession_id - } - - def get_accession_id(self): - results = None - with database_connection() as conn, conn.cursor() as cursor: + 'group': self.group.name + }) + _result["accession_id"] = self.accession_id + return _result + + def get_accession_id(self) -> Maybe[str]: + """Get the accession_id of this dataset depending on the + dataset type.""" + __accession_id_dict = MonadicDict() + with database_connection() as conn: if self.type == "Publish": - cursor.execute( - "SELECT InfoFiles.GN_AccesionId FROM " - "InfoFiles, PublishFreeze, InbredSet " - "WHERE InbredSet.Name = %s AND " - "PublishFreeze.InbredSetId = InbredSet.Id " - "AND InfoFiles.InfoPageName = PublishFreeze.Name " - "AND PublishFreeze.public > 0 AND " - "PublishFreeze.confidentiality < 1 " - "ORDER BY PublishFreeze.CreateTime DESC", - (self.group.name,) - ) - results = cursor.fetchone() + __accession_id_dict, = itertools.islice( + query_sql(conn, + ("SELECT InfoFiles.GN_AccesionId AS accession_id FROM " + "InfoFiles, PublishFreeze, InbredSet " + f"WHERE InbredSet.Name = '{conn.escape_string(self.group.name).decode()}' " + "AND PublishFreeze.InbredSetId = InbredSet.Id " + "AND InfoFiles.InfoPageName = PublishFreeze.Name " + "AND PublishFreeze.public > 0 AND " + "PublishFreeze.confidentiality < 1 " + "ORDER BY PublishFreeze.CreateTime DESC") + ), 1) elif self.type == "Geno": - cursor.execute( - "SELECT InfoFiles.GN_AccesionId FROM " - "InfoFiles, GenoFreeze, InbredSet " - "WHERE InbredSet.Name = %s AND " - "GenoFreeze.InbredSetId = InbredSet.Id " - "AND InfoFiles.InfoPageName = GenoFreeze.ShortName " - "AND GenoFreeze.public > 0 AND " - "GenoFreeze.confidentiality < 1 " - "ORDER BY GenoFreeze.CreateTime DESC", - (self.group.name,) - ) - results = cursor.fetchone() - - # Returns None by default if this is not executed - if results: - return str(results[0]) + __accession_id_dict, = itertools.islice( + query_sql(conn, + ("SELECT InfoFiles.GN_AccesionId AS accession_id FROM " + "InfoFiles, GenoFreeze, InbredSet " + f"WHERE InbredSet.Name = '{conn.escape_string(self.group.name).decode()}' AND " + "GenoFreeze.InbredSetId = InbredSet.Id " + "AND InfoFiles.InfoPageName = GenoFreeze.ShortName " + "AND GenoFreeze.public > 0 AND " + "GenoFreeze.confidentiality < 1 " + "ORDER BY GenoFreeze.CreateTime DESC") + ), 1) + elif self.type == "ProbeSet": + __accession_id_dict, = itertools.islice( + query_sql(conn, + ("SELECT InfoFiles.GN_AccesionId AS accession_id " + f"FROM InfoFiles WHERE InfoFiles.InfoPageName = '{conn.escape_string(self.name).decode()}' " + f"AND InfoFiles.DB_Name = '{conn.escape_string(self.fullname).decode()}' " + f"OR InfoFiles.DB_Name = '{conn.escape_string(self.shortname).decode()}'") + ), 1) + else: # The Value passed is not present + raise LookupError + return __accession_id_dict["accession_id"] def retrieve_other_names(self): """This method fetches the the dataset names in search_result. -- cgit v1.2.3