diff options
-rw-r--r-- | gn3/db/phenotypes.py | 61 |
1 files changed, 61 insertions, 0 deletions
diff --git a/gn3/db/phenotypes.py b/gn3/db/phenotypes.py new file mode 100644 index 0000000..46a54bd --- /dev/null +++ b/gn3/db/phenotypes.py @@ -0,0 +1,61 @@ +"""This contains all the necessary functions that access the phenotypes from +the db""" +from dataclasses import dataclass, asdict, astuple + +from typing import Any, Optional +from MySQLdb import escape_string + + +# pylint: disable=[R0902] +@dataclass(frozen=True) +class Phenotype: + """Data Type that represents a Phenotype""" + id_: Optional[int] = None + pre_pub_description: Optional[str] = None + post_pub_description: Optional[str] = None + original_description: Optional[str] = None + units: Optional[str] = None + pre_pub_abbrevition: Optional[str] = None + post_pub_abbreviation: Optional[str] = None + lab_code: Optional[str] = None + submitter: Optional[str] = None + owner: Optional[str] = None + authorized_users: Optional[str] = None + + +# Mapping from the Phenotype dataclass to the actual column names in the +# database +phenotype_column_mapping = { + "id_": "id", + "pre_pub_description": "Pre_publication_description", + "post_pub_description": "Post_publication_description", + "original_description": "Original_description", + "units": "Units", + "pre_pub_abbrevition": "Pre_publication_abbreviation", + "post_pub_abbreviation": "Post_publication_abbreviation", + "lab_code": "Lab_code", + "submitter": "Submitter", + "owner": "Owner", + "authorized_users": "Authorized_Users", +} + + +def update_phenotype(conn: Any, + data: Phenotype, + where: Phenotype) -> Optional[int]: + """Update phenotype metadata with DATA that depends on the WHERE clause""" + if not any(astuple(data) + astuple(where)): + return None + sql = "UPDATE Phenotype SET " + sql += ", ".join(f"{phenotype_column_mapping.get(k)} " + f"= '{escape_string(str(v)).decode('utf-8')}'" for + k, v in asdict(data).items() + if v is not None and k in phenotype_column_mapping) + sql += " WHERE " + sql += "AND ".join(f"{phenotype_column_mapping.get(k)} = " + f"'{escape_string(str(v)).decode('utf-8')}'" for + k, v in asdict(where).items() + if v is not None and k in phenotype_column_mapping) + with conn.cursor() as cursor: + cursor.execute(sql) + return cursor.rowcount |