about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--gn3/db/sample_data.py447
1 files changed, 118 insertions, 329 deletions
diff --git a/gn3/db/sample_data.py b/gn3/db/sample_data.py
index 57966f1..07810e0 100644
--- a/gn3/db/sample_data.py
+++ b/gn3/db/sample_data.py
@@ -6,7 +6,6 @@ import MySQLdb
 from gn3.csvcmp import extract_strain_name
 from gn3.csvcmp import parse_csv_column
 
-
 _MAP = {
     "ProbeSetData": ("StrainId", "Id", "value"),
     "PublishData": ("StrainId", "Id", "value"),
@@ -237,14 +236,14 @@ def get_pheno_sample_data_ids(
             strain_id = cursor.fetchone()[0]
     return (strain_id, publishdata_id, inbredset_id)
 
+
 # pylint: disable=[R0913, R0914]
-def update_mrna_sample_data(
+def update_sample_data(
     conn: Any,
     original_data: str,
     updated_data: str,
     csv_header: str,
-    probeset_id: int,
-    dataset_name: str
+    trait_info: dict
 ) -> int:
     """Given the right parameters, update sample-data from the relevant
     table."""
@@ -290,19 +289,40 @@ def update_mrna_sample_data(
                 return cursor.rowcount
         return 0
 
-    strain_id, data_id, inbredset_id = get_mrna_sample_data_ids(
-        conn=conn,
-        probeset_id=int(probeset_id),
-        dataset_name=dataset_name,
-        strain_name=extract_strain_name(csv_header, original_data),
-    )
+    if 'probeset_id' in trait_info and 'dataset_name' in trait_info: # If ProbeSet/mRNA Assay trait
+        probeset_id, dataset_name = trait_info['probeset_id'], trait_info['dataset_name']
+        data_type = "mrna"
+    else: # If Publish/phenotype trait
+        trait_name, phenotype_id = trait_info['trait_name'], trait_info['phenotype_id']
+        data_type = "pheno"
+
+    if data_type == "mrna":
+        strain_id, data_id, inbredset_id = get_mrna_sample_data_ids(
+            conn=conn,
+            probeset_id=int(probeset_id),
+            dataset_name=dataset_name,
+            strain_name=extract_strain_name(csv_header, original_data),
+        )
+        none_case_attrs: Dict[str, Callable] = {
+            "Strain Name": lambda x: 0,
+            "Value": lambda x: __update_data(conn, "ProbeSetData", x),
+            "SE": lambda x: __update_data(conn, "ProbeSetSE", x),
+            "Count": lambda x: __update_data(conn, "NStrain", x),
+        }
+    else:
+        strain_id, data_id, inbredset_id = get_pheno_sample_data_ids(
+            conn=conn,
+            publishxref_id=int(trait_name),
+            phenotype_id=phenotype_id,
+            strain_name=extract_strain_name(csv_header, original_data),
+        )
+        none_case_attrs: Dict[str, Callable] = {
+            "Strain Name": lambda x: 0,
+            "Value": lambda x: __update_data(conn, "PublishData", x),
+            "SE": lambda x: __update_data(conn, "PublishSE", x),
+            "Count": lambda x: __update_data(conn, "NStrain", x),
+        }
 
-    none_case_attrs: Dict[str, Callable] = {
-        "Strain Name": lambda x: 0,
-        "Value": lambda x: __update_data(conn, "ProbeSetData", x),
-        "SE": lambda x: __update_data(conn, "ProbeSetSE", x),
-        "Count": lambda x: __update_data(conn, "NStrain", x),
-    }
     count = 0
     # try:
     __actions = __extract_actions(
@@ -328,23 +348,22 @@ def update_mrna_sample_data(
                     case_attr=header,
                     inbredset_id=inbredset_id,
                 )
+
     if __actions.get("delete"):
-        _rowcount = delete_mrna_sample_data(
+        _rowcount = delete_sample_data(
             conn=conn,
             data=__actions["delete"]["data"],
             csv_header=__actions["delete"]["csv_header"],
-            probeset_id=probeset_id,
-            dataset_name=dataset_name
+            trait_info=trait_info
         )
         if _rowcount:
             count += 1
     if __actions.get("insert"):
-        _rowcount = insert_mrna_sample_data(
+        _rowcount = insert_sample_data(
             conn=conn,
             data=__actions["insert"]["data"],
             csv_header=__actions["insert"]["csv_header"],
-            probeset_id=probeset_id,
-            dataset_name=dataset_name
+            trait_info=trait_info
         )
         if _rowcount:
             count += 1
@@ -352,123 +371,8 @@ def update_mrna_sample_data(
     #     raise MySQLdb.Error(_e) from _e
     return count
 
-# pylint: disable=[R0913, R0914]
-def update_pheno_sample_data(
-    conn: Any,
-    trait_name: str,
-    original_data: str,
-    updated_data: str,
-    csv_header: str,
-    phenotype_id: int,
-) -> int:
-    """Given the right parameters, update sample-data from the relevant
-    table."""
-
-    def __update_data(conn, table, value):
-        if value and value != "x":
-            with conn.cursor() as cursor:
-                sub_query = " = %s AND ".join(_MAP.get(table)[:2]) + " = %s"
-                _val = _MAP.get(table)[-1]
-                cursor.execute(
-                    (f"UPDATE {table} SET {_val} = %s " f"WHERE {sub_query}"),
-                    (value, strain_id, data_id),
-                )
-                conn.commit()
-                return cursor.rowcount
-        return 0
-
-    def __update_case_attribute(
-        conn, value, strain_id, case_attr, inbredset_id
-    ):
-        if value != "x":
-            (id_, name) = parse_csv_column(case_attr)
-            with conn.cursor() as cursor:
-                if id_:
-                    cursor.execute(
-                        "UPDATE CaseAttributeXRefNew "
-                        "SET Value = %s "
-                        "WHERE StrainId = %s AND CaseAttributeId = %s "
-                        "AND InbredSetId = %s",
-                        (value, strain_id, id_, inbredset_id),
-                    )
-                else:
-                    cursor.execute(
-                        "UPDATE CaseAttributeXRefNew "
-                        "SET Value = %s "
-                        "WHERE StrainId = %s AND CaseAttributeId = "
-                        "(SELECT CaseAttributeId FROM "
-                        "CaseAttribute WHERE Name = %s) "
-                        "AND InbredSetId = %s",
-                        (value, strain_id, name, inbredset_id),
-                    )
-                conn.commit()
-                return cursor.rowcount
-        return 0
-
-    strain_id, data_id, inbredset_id = get_pheno_sample_data_ids(
-        conn=conn,
-        publishxref_id=int(trait_name),
-        phenotype_id=phenotype_id,
-        strain_name=extract_strain_name(csv_header, original_data),
-    )
-
-    none_case_attrs: Dict[str, Callable] = {
-        "Strain Name": lambda x: 0,
-        "Value": lambda x: __update_data(conn, "PublishData", x),
-        "SE": lambda x: __update_data(conn, "PublishSE", x),
-        "Count": lambda x: __update_data(conn, "NStrain", x),
-    }
-    count = 0
-    try:
-        __actions = __extract_actions(
-            original_data=original_data,
-            updated_data=updated_data,
-            csv_header=csv_header,
-        )
-
-        if __actions.get("update"):
-            _csv_header = __actions["update"]["csv_header"]
-            _data = __actions["update"]["data"]
-            # pylint: disable=[E1101]
-            for header, value in zip(_csv_header.split(","), _data.split(",")):
-                header = header.strip()
-                value = value.strip()
-                if header in none_case_attrs:
-                    count += none_case_attrs[header](value)
-                else:
-                    count += __update_case_attribute(
-                        conn=conn,
-                        value=value,
-                        strain_id=strain_id,
-                        case_attr=header,
-                        inbredset_id=inbredset_id,
-                    )
-        if __actions.get("delete"):
-            _rowcount = delete_sample_data(
-                conn=conn,
-                trait_name=trait_name,
-                data=__actions["delete"]["data"],
-                csv_header=__actions["delete"]["csv_header"],
-                phenotype_id=phenotype_id,
-            )
-            if _rowcount:
-                count += 1
-        if __actions.get("insert"):
-            _rowcount = insert_pheno_sample_data(
-                conn=conn,
-                trait_name=trait_name,
-                data=__actions["insert"]["data"],
-                csv_header=__actions["insert"]["csv_header"],
-                phenotype_id=phenotype_id,
-            )
-            if _rowcount:
-                count += 1
-    except Exception as _e:
-        raise MySQLdb.Error(_e) from _e
-    return count
-
-def delete_mrna_sample_data(
-    conn: Any, data: str, csv_header: str, probeset_id: int, dataset_name: str
+def delete_sample_data(
+    conn: Any, data: str, csv_header: str, trait_info: dict
 ) -> int:
     """Given the right parameters, delete sample-data from the relevant
     tables."""
@@ -505,88 +409,40 @@ def delete_mrna_sample_data(
             conn.commit()
             return cursor.rowcount
 
-    strain_id, data_id, inbredset_id = get_mrna_sample_data_ids(
-        conn=conn,
-        probeset_id=int(probeset_id),
-        dataset_name=dataset_name,
-        strain_name=extract_strain_name(csv_header, original_data),
-    )
-
-    none_case_attrs: Dict[str, Any] = {
-        "Strain Name": lambda: 0,
-        "Value": lambda: __delete_data(conn, "ProbeSetData"),
-        "SE": lambda: __delete_data(conn, "ProbeSetSE"),
-        "Count": lambda: __delete_data(conn, "NStrain"),
-    }
-    count = 0
-
-    try:
-        for header in csv_header.split(","):
-            header = header.strip()
-            if header in none_case_attrs:
-                count += none_case_attrs[header]()
-            else:
-                count += __delete_case_attribute(
-                    conn=conn,
-                    strain_id=strain_id,
-                    case_attr=header,
-                    inbredset_id=inbredset_id,
-                )
-    except Exception as _e:
-        raise MySQLdb.Error(_e) from _e
-    return count
-
-def delete_pheno_sample_data(
-    conn: Any, trait_name: str, data: str, csv_header: str, phenotype_id: int
-) -> int:
-    """Given the right parameters, delete sample-data from the relevant
-    tables."""
-
-    def __delete_data(conn, table):
-        sub_query = " = %s AND ".join(_MAP.get(table)[:2]) + " = %s"
-        with conn.cursor() as cursor:
-            cursor.execute(
-                (f"DELETE FROM {table} " f"WHERE {sub_query}"),
-                (strain_id, data_id),
-            )
-            conn.commit()
-            return cursor.rowcount
+    if 'probeset_id' in trait_info and 'dataset_name' in trait_info: # If ProbeSet/mRNA Assay trait
+        probeset_id, dataset_name = trait_info['probeset_id'], trait_info['dataset_name']
+        data_type = "mrna"
+    else: # If Publish/phenotype trait
+        trait_name, phenotype_id = trait_info['trait_name'], trait_info['phenotype_id']
+        data_type = "pheno"
 
-    def __delete_case_attribute(conn, strain_id, case_attr, inbredset_id):
-        with conn.cursor() as cursor:
-            (id_, name) = parse_csv_column(case_attr)
-            if id_:
-                cursor.execute(
-                    "DELETE FROM CaseAttributeXRefNew "
-                    "WHERE StrainId = %s AND CaseAttributeId = %s "
-                    "AND InbredSetId = %s",
-                    (strain_id, id_, inbredset_id),
-                )
-            else:
-                cursor.execute(
-                    "DELETE FROM CaseAttributeXRefNew "
-                    "WHERE StrainId = %s AND CaseAttributeId = "
-                    "(SELECT CaseAttributeId FROM "
-                    "CaseAttribute WHERE Name = %s) "
-                    "AND InbredSetId = %s",
-                    (strain_id, name, inbredset_id),
-                )
-            conn.commit()
-            return cursor.rowcount
-
-    strain_id, data_id, inbredset_id = get_pheno_sample_data_ids(
-        conn=conn,
-        publishxref_id=int(trait_name),
-        phenotype_id=phenotype_id,
-        strain_name=extract_strain_name(csv_header, data),
-    )
+    if data_type == "mrna":
+        strain_id, data_id, inbredset_id = get_mrna_sample_data_ids(
+            conn=conn,
+            probeset_id=int(probeset_id),
+            dataset_name=dataset_name,
+            strain_name=extract_strain_name(csv_header, original_data),
+        )
+        none_case_attrs: Dict[str, Any] = {
+            "Strain Name": lambda: 0,
+            "Value": lambda: __delete_data(conn, "ProbeSetData"),
+            "SE": lambda: __delete_data(conn, "ProbeSetSE"),
+            "Count": lambda: __delete_data(conn, "NStrain"),
+        }
+    else:
+        strain_id, data_id, inbredset_id = get_pheno_sample_data_ids(
+            conn=conn,
+            publishxref_id=int(trait_name),
+            phenotype_id=phenotype_id,
+            strain_name=extract_strain_name(csv_header, original_data),
+        )
+        none_case_attrs: Dict[str, Any] = {
+            "Strain Name": lambda: 0,
+            "Value": lambda: __delete_data(conn, "PublishData"),
+            "SE": lambda: __delete_data(conn, "PublishSE"),
+            "Count": lambda: __delete_data(conn, "NStrain"),
+        }
 
-    none_case_attrs: Dict[str, Any] = {
-        "Strain Name": lambda: 0,
-        "Value": lambda: __delete_data(conn, "PublishData"),
-        "SE": lambda: __delete_data(conn, "PublishSE"),
-        "Count": lambda: __delete_data(conn, "NStrain"),
-    }
     count = 0
 
     try:
@@ -606,8 +462,8 @@ def delete_pheno_sample_data(
     return count
 
 # pylint: disable=[R0913, R0914]
-def insert_mrna_sample_data(
-    conn: Any, data: str, csv_header: str, probeset_id: int, dataset_name: str
+def insert_sample_data(
+    conn: Any, data: str, csv_header: str, trait_info: dict
 ) -> int:
     """Given the right parameters, insert sample-data to the relevant table."""
 
@@ -659,124 +515,57 @@ def insert_mrna_sample_data(
                 conn.commit()
         return 0
 
-    strain_id, data_id, inbredset_id = get_mrna_sample_data_ids(
-        conn=conn,
-        probeset_id=int(probeset_id),
-        dataset_name=dataset_name,
-        strain_name=extract_strain_name(csv_header, data),
-    )
+    if 'probeset_id' in trait_info and 'dataset_name' in trait_info: # If ProbeSet/mRNA Assay trait
+        probeset_id, dataset_name = trait_info['probeset_id'], trait_info['dataset_name']
+        data_type = "mrna"
+    else: # If Publish/phenotype trait
+        trait_name, phenotype_id = trait_info['trait_name'], trait_info['phenotype_id']
+        data_type = "pheno"
 
-    none_case_attrs: Dict[str, Any] = {
-        "Strain Name": lambda _: 0,
-        "Value": lambda x: __insert_data(conn, "ProbeSetData", x),
-        "SE": lambda x: __insert_data(conn, "ProbeSetSE", x),
-        "Count": lambda x: __insert_data(conn, "NStrain", x),
-    }
+    if data_type == "mrna":
+        strain_id, data_id, inbredset_id = get_mrna_sample_data_ids(
+            conn=conn,
+            probeset_id=int(probeset_id),
+            dataset_name=dataset_name,
+            strain_name=extract_strain_name(csv_header, original_data),
+        )
+        none_case_attrs: Dict[str, Any] = {
+            "Strain Name": lambda _: 0,
+            "Value": lambda x: __insert_data(conn, "ProbeSetData", x),
+            "SE": lambda x: __insert_data(conn, "ProbeSetSE", x),
+            "Count": lambda x: __insert_data(conn, "NStrain", x),
+        }
+    else:
+        strain_id, data_id, inbredset_id = get_pheno_sample_data_ids(
+            conn=conn,
+            publishxref_id=int(trait_name),
+            phenotype_id=phenotype_id,
+            strain_name=extract_strain_name(csv_header, original_data),
+        )
+        none_case_attrs: Dict[str, Any] = {
+            "Strain Name": lambda _: 0,
+            "Value": lambda: __insert_data(conn, "PublishData", x),
+            "SE": lambda: __insert_data(conn, "PublishSE", x),
+            "Count": lambda: __insert_data(conn, "NStrain", x),
+        }
 
     try:
         count = 0
 
         # Check if the data already exists:
         with conn.cursor() as cursor:
-            cursor.execute(
-                "SELECT Id FROM ProbeSetData where Id = %s "
-                "AND StrainId = %s",
-                (data_id, strain_id))
-            data_exists = cursor.fetchone()
-        if data_exists:  # Data already exists
-            return count
-
-        for header, value in zip(csv_header.split(","), data.split(",")):
-            header = header.strip()
-            value = value.strip()
-            if header in none_case_attrs:
-                count += none_case_attrs[header](value)
-            else:
-                count += __insert_case_attribute(
-                    conn=conn, case_attr=header, value=value
-                )
-        return count
-    except Exception as _e:
-        raise MySQLdb.Error(_e) from _e
-
-# pylint: disable=[R0913, R0914]
-def insert_pheno_sample_data(
-    conn: Any, trait_name: str, data: str, csv_header: str, phenotype_id: int
-) -> int:
-    """Given the right parameters, insert sample-data to the relevant table."""
-
-    def __insert_data(conn, table, value):
-        if value and value != "x":
-            with conn.cursor() as cursor:
-                columns = ", ".join(_MAP.get(table))
+            if data_type == "mrna":
                 cursor.execute(
-                    (
-                        f"INSERT INTO {table} "
-                        f"({columns}) "
-                        f"VALUES (%s, %s, %s)"
-                    ),
-                    (strain_id, data_id, value),
-                )
-                conn.commit()
-                return cursor.rowcount
-        return 0
-
-    def __insert_case_attribute(conn, case_attr, value):
-        if value != "x":
-            with conn.cursor() as cursor:
-                (id_, name) = parse_csv_column(case_attr)
-                if not id_:
-                    cursor.execute(
-                        "SELECT Id FROM CaseAttribute WHERE Name = %s",
-                        (name,),
-                    )
-                    if case_attr_id := cursor.fetchone():
-                        id_ = case_attr_id[0]
-
+                    "SELECT Id FROM ProbeSetData where Id = %s "
+                    "AND StrainId = %s",
+                    (data_id, strain_id))
+            else:
                 cursor.execute(
-                    "SELECT StrainId FROM "
-                    "CaseAttributeXRefNew WHERE StrainId = %s "
-                    "AND CaseAttributeId = %s "
-                    "AND InbredSetId = %s",
-                    (strain_id, id_, inbredset_id),
-                )
-                if (not cursor.fetchone()) and id_:
-                    cursor.execute(
-                        "INSERT INTO CaseAttributeXRefNew "
-                        "(StrainId, CaseAttributeId, Value, InbredSetId) "
-                        "VALUES (%s, %s, %s, %s)",
-                        (strain_id, id_, value, inbredset_id),
-                    )
-                    row_count = cursor.rowcount
-                    conn.commit()
-                    return row_count
-                conn.commit()
-        return 0
-
-    strain_id, data_id, inbredset_id = get_pheno_sample_data_ids(
-        conn=conn,
-        publishxref_id=int(trait_name),
-        phenotype_id=phenotype_id,
-        strain_name=extract_strain_name(csv_header, data),
-    )
-
-    none_case_attrs: Dict[str, Any] = {
-        "Strain Name": lambda _: 0,
-        "Value": lambda x: __insert_data(conn, "PublishData", x),
-        "SE": lambda x: __insert_data(conn, "PublishSE", x),
-        "Count": lambda x: __insert_data(conn, "NStrain", x),
-    }
-
-    try:
-        count = 0
-
-        # Check if the data already exists:
-        with conn.cursor() as cursor:
-            cursor.execute(
-                "SELECT Id FROM PublishData where Id = %s "
-                "AND StrainId = %s",
-                (data_id, strain_id))
+                    "SELECT Id FROM PublishData where Id = %s "
+                    "AND StrainId = %s",
+                    (data_id, strain_id))
             data_exists = cursor.fetchone()
+
         if data_exists:  # Data already exists
             return count