about summary refs log tree commit diff
path: root/gn3/db
diff options
context:
space:
mode:
authorBonfaceKilz2022-03-12 17:36:15 +0300
committerBonfaceKilz2022-03-12 17:38:12 +0300
commit81cfd736b5263e3f95eda41cda41a78aaf361851 (patch)
tree7eae3fc1eaec06fc0f268da73c3dda127270c950 /gn3/db
parentf27f8470e79857c9c088e230a141995c3127640b (diff)
downloadgenenetwork3-81cfd736b5263e3f95eda41cda41a78aaf361851.tar.gz
Fix mypy issues
Diffstat (limited to 'gn3/db')
-rw-r--r--gn3/db/sample_data.py33
1 files changed, 16 insertions, 17 deletions
diff --git a/gn3/db/sample_data.py b/gn3/db/sample_data.py
index e3daa21..2018587 100644
--- a/gn3/db/sample_data.py
+++ b/gn3/db/sample_data.py
@@ -14,21 +14,20 @@ _MAP = {
 
 def __extract_actions(original_data: str,
                       updated_data: str,
-                      csv_header: str) -> dict:
+                      csv_header: str) -> Dict:
     """Return a dictionary containing elements that need to be deleted, inserted,
 or updated.
 
     """
-    original_data = original_data.strip().split(",")
-    updated_data = updated_data.strip().split(",")
-    csv_header = csv_header.strip().split(",")
-    result = {
+    result: Dict[str, Any] = {
         "delete": {"data": [], "csv_header": []},
         "insert": {"data": [], "csv_header": []},
         "update": {"data": [], "csv_header": []},
     }
     strain_name = ""
-    for _o, _u, _h in zip(original_data, updated_data, csv_header):
+    for _o, _u, _h in zip(original_data.strip().split(","),
+                          updated_data.strip().split(","),
+                          csv_header.strip().split(",")):
         if _h == "Strain Name":
             strain_name = _o
         if _o == _u:  # No change
@@ -74,7 +73,7 @@ def get_trait_csv_sample_data(conn: Any,
                "LEFT JOIN CaseAttribute ca ON ca.Id = cxref.CaseAttributeId "
                "WHERE px.Id = %s AND px.PhenotypeId = %s ORDER BY st.Name")
     case_attr_columns = set()
-    csv_data = {}
+    csv_data: Dict = {}
     with conn.cursor() as cursor:
         cursor.execute(__query, (trait_name, phenotype_id))
         for data in cursor.fetchall():
@@ -167,11 +166,11 @@ def update_sample_data(conn: Any,
         return 0
 
     strain_id, data_id, inbredset_id = get_sample_data_ids(
-        conn=conn, publishxref_id=trait_name,
+        conn=conn, publishxref_id=int(trait_name),
         phenotype_id=phenotype_id,
         strain_name=extract_strain_name(csv_header, original_data))
 
-    none_case_attrs = {
+    none_case_attrs: Dict[str, Callable] = {
         "Strain Name": lambda x: 0,
         "Value": lambda x: __update_data(conn, "PublishData", x),
         "SE": lambda x: __update_data(conn, "PublishSE", x),
@@ -191,11 +190,11 @@ def update_sample_data(conn: Any,
                 header = header.strip()
                 value = value.strip()
                 if header in none_case_attrs:
-                    count += none_case_attrs.get(header)(value)
+                    count += none_case_attrs[header](value)
                 else:
                     count += __update_case_attribute(
                         conn=conn,
-                        value=none_case_attrs.get(header)(value),
+                        value=none_case_attrs[header](value),
                         strain_id=strain_id,
                         case_attr=header,
                         inbredset_id=inbredset_id)
@@ -252,11 +251,11 @@ def delete_sample_data(conn: Any,
             return cursor.rowcount
 
     strain_id, data_id, inbredset_id = get_sample_data_ids(
-        conn=conn, publishxref_id=trait_name,
+        conn=conn, publishxref_id=int(trait_name),
         phenotype_id=phenotype_id,
         strain_name=extract_strain_name(csv_header, data))
 
-    none_case_attrs = {
+    none_case_attrs: Dict[str, Any] = {
         "Strain Name": lambda: 0,
         "Value": lambda: __delete_data(conn, "PublishData"),
         "SE": lambda: __delete_data(conn, "PublishSE"),
@@ -268,7 +267,7 @@ def delete_sample_data(conn: Any,
         for header in csv_header.split(","):
             header = header.strip()
             if header in none_case_attrs:
-                count += none_case_attrs.get(header)()
+                count += none_case_attrs[header]()
             else:
                 count += __delete_case_attribute(
                     conn=conn,
@@ -326,11 +325,11 @@ def insert_sample_data(conn: Any,
         return 0
 
     strain_id, data_id, inbredset_id = get_sample_data_ids(
-        conn=conn, publishxref_id=trait_name,
+        conn=conn, publishxref_id=int(trait_name),
         phenotype_id=phenotype_id,
         strain_name=extract_strain_name(csv_header, data))
 
-    none_case_attrs = {
+    none_case_attrs: Dict[str, Any] = {
         "Strain Name": lambda _: 0,
         "Value": lambda x: __insert_data(conn, "PublishData", x),
         "SE": lambda x: __insert_data(conn, "PublishSE", x),
@@ -353,7 +352,7 @@ def insert_sample_data(conn: Any,
             header = header.strip()
             value = value.strip()
             if header in none_case_attrs:
-                count += none_case_attrs.get(header)(value)
+                count += none_case_attrs[header](value)
             else:
                 count += __insert_case_attribute(
                     conn=conn,