about summary refs log tree commit diff
path: root/gn3/db/traits.py
diff options
context:
space:
mode:
Diffstat (limited to 'gn3/db/traits.py')
-rw-r--r--gn3/db/traits.py101
1 files changed, 50 insertions, 51 deletions
diff --git a/gn3/db/traits.py b/gn3/db/traits.py
index a7e7e7c..90d1e9d 100644
--- a/gn3/db/traits.py
+++ b/gn3/db/traits.py
@@ -69,7 +69,7 @@ def export_trait_data(
                 return accumulator + (trait_data["data"][sample]["ndata"], )
             if dtype == "all":
                 return accumulator + __export_all_types(trait_data["data"], sample)
-            raise KeyError("Type `%s` is incorrect" % dtype)
+            raise KeyError(f"Type `{dtype}` is incorrect")
         if var_exists and n_exists:
             return accumulator + (None, None, None)
         if var_exists or n_exists:
@@ -125,9 +125,8 @@ def update_sample_data(conn: Any, #pylint: disable=[R0913]
              "PublishXRef.Id = %s AND "
              "PublishXRef.PhenotypeId = %s "
              "AND PublishData.StrainId = Strain.Id "
-             "AND Strain.Name = \"%s\"") % (trait_name,
-                                            phenotype_id,
-                                            str(strain_name)))
+             "AND Strain.Name = %s"),
+            (trait_name, phenotype_id, str(strain_name)))
         strain_id, data_id = cursor.fetchone()
     updated_published_data: int = 0
     updated_se_data: int = 0
@@ -137,8 +136,8 @@ def update_sample_data(conn: Any, #pylint: disable=[R0913]
         # Update the PublishData table
         if value == "x":
             cursor.execute(("DELETE FROM PublishData "
-                            "WHERE StrainId = %s AND Id = %s")
-                           % (strain_id, data_id))
+                            "WHERE StrainId = %s AND Id = %s"),
+                           (strain_id, data_id))
             updated_published_data = cursor.rowcount
         else:
             cursor.execute(("UPDATE PublishData SET value = %s "
@@ -148,19 +147,20 @@ def update_sample_data(conn: Any, #pylint: disable=[R0913]
 
             if not updated_published_data:
                 cursor.execute(
-                    "SELECT * FROM "
-                    "PublishData WHERE StrainId = "
-                    "%s AND Id = %s" % (strain_id, data_id))
+                    ("SELECT * FROM "
+                     "PublishData WHERE StrainId = "
+                     "%s AND Id = %s"),
+                    (strain_id, data_id))
                 if not cursor.fetchone():
                     cursor.execute(("INSERT INTO PublishData (Id, StrainId, "
-                                    " value) VALUES (%s, %s, %s)") %
+                                    " value) VALUES (%s, %s, %s)"),
                                    (data_id, strain_id, value))
                     updated_published_data = cursor.rowcount
 
         # Update the PublishSE table
         if error == "x":
             cursor.execute(("DELETE FROM PublishSE "
-                            "WHERE StrainId = %s AND DataId = %s") %
+                            "WHERE StrainId = %s AND DataId = %s"),
                            (strain_id, data_id))
             updated_se_data = cursor.rowcount
         else:
@@ -171,21 +171,22 @@ def update_sample_data(conn: Any, #pylint: disable=[R0913]
             updated_se_data = cursor.rowcount
             if not updated_se_data:
                 cursor.execute(
-                        "SELECT * FROM "
-                        "PublishSE WHERE StrainId = "
-                        "%s AND DataId = %s" % (strain_id, data_id))
+                    ("SELECT * FROM "
+                     "PublishSE WHERE StrainId = "
+                     "%s AND DataId = %s"),
+                    (strain_id, data_id))
                 if not cursor.fetchone():
-                    cursor.execute(("INSERT INTO PublishSE (StrainId, DataId, "
-                                    " error) VALUES (%s, %s, %s)") %
-                                   (strain_id, data_id,
-                                    None if error == "x" else error))
+                    cursor.execute(
+                        ("INSERT INTO PublishSE (StrainId, DataId, "
+                         " error) VALUES (%s, %s, %s)"),
+                        (strain_id, data_id, None if error == "x" else error))
                     updated_se_data = cursor.rowcount
 
         # Update the NStrain table
         if count == "x":
             cursor.execute(("DELETE FROM NStrain "
-                                "WHERE StrainId = %s AND DataId = %s" %
-                                (strain_id, data_id)))
+                            "WHERE StrainId = %s AND DataId = %s"),
+                           (strain_id, data_id))
             updated_n_strains = cursor.rowcount
         else:
             cursor.execute(("UPDATE NStrain SET count = %s "
@@ -194,13 +195,14 @@ def update_sample_data(conn: Any, #pylint: disable=[R0913]
             updated_n_strains = cursor.rowcount
             if not updated_n_strains:
                 cursor.execute(
-                        "SELECT * FROM "
-                        "NStrain WHERE StrainId = "
-                        "%s AND DataId = %s" % (strain_id, data_id))
+                    ("SELECT * FROM "
+                     "NStrain WHERE StrainId = "
+                     "%s AND DataId = %s"),
+                    (strain_id, data_id))
                 if not cursor.fetchone():
                     cursor.execute(("INSERT INTO NStrain "
                                     "(StrainId, DataId, count) "
-                                    "VALUES (%s, %s, %s)") %
+                                    "VALUES (%s, %s, %s)"),
                                    (strain_id, data_id, count))
                     updated_n_strains = cursor.rowcount
     return (updated_published_data,
@@ -237,9 +239,8 @@ def delete_sample_data(conn: Any,
                  "PublishXRef.Id = %s AND "
                  "PublishXRef.PhenotypeId = %s "
                  "AND PublishData.StrainId = Strain.Id "
-                 "AND Strain.Name = \"%s\"") % (trait_name,
-                                                phenotype_id,
-                                                str(strain_name)))
+                 "AND Strain.Name = %s"),
+                (trait_name, phenotype_id, str(strain_name)))
 
             # Check if it exists if the data was already deleted:
             if _result := cursor.fetchone():
@@ -248,20 +249,20 @@ def delete_sample_data(conn: Any,
             # Only run if the strain_id and data_id exist
             if strain_id and data_id:
                 cursor.execute(("DELETE FROM PublishData "
-                                "WHERE StrainId = %s AND Id = %s")
-                               % (strain_id, data_id))
+                                "WHERE StrainId = %s AND Id = %s"),
+                               (strain_id, data_id))
                 deleted_published_data = cursor.rowcount
 
                 # Delete the PublishSE table
                 cursor.execute(("DELETE FROM PublishSE "
-                                "WHERE StrainId = %s AND DataId = %s") %
+                                "WHERE StrainId = %s AND DataId = %s"),
                                (strain_id, data_id))
                 deleted_se_data = cursor.rowcount
 
                 # Delete the NStrain table
                 cursor.execute(("DELETE FROM NStrain "
-                                "WHERE StrainId = %s AND DataId = %s" %
-                                (strain_id, data_id)))
+                                "WHERE StrainId = %s AND DataId = %s"),
+                               (strain_id, data_id))
                 deleted_n_strains = cursor.rowcount
         except Exception as e:  #pylint: disable=[C0103, W0612]
             conn.rollback()
@@ -312,7 +313,7 @@ def insert_sample_data(conn: Any, #pylint: disable=[R0913]
             # Insert into the PublishSE table if error is specified
             if error and error != "x":
                 cursor.execute(("INSERT INTO PublishSE (StrainId, DataId, "
-                                " error) VALUES (%s, %s, %s)") %
+                                " error) VALUES (%s, %s, %s)"),
                                (strain_id, data_id, error))
             inserted_se_data = cursor.rowcount
 
@@ -320,7 +321,7 @@ def insert_sample_data(conn: Any, #pylint: disable=[R0913]
             if count and count != "x":
                 cursor.execute(("INSERT INTO NStrain "
                                 "(StrainId, DataId, count) "
-                                "VALUES (%s, %s, %s)") %
+                                "VALUES (%s, %s, %s)"),
                                (strain_id, data_id, count))
             inserted_n_strains = cursor.rowcount
         except Exception as e: #pylint: disable=[C0103, W0612]
@@ -356,14 +357,14 @@ def retrieve_publish_trait_info(trait_data_source: Dict[str, Any], conn: Any):
         "PublishXRef.comments")
     query = (
         "SELECT "
-        "{columns} "
+        f"{columns} "
         "FROM "
         "PublishXRef, Publication, Phenotype "
         "WHERE "
         "PublishXRef.Id = %(trait_name)s AND "
         "Phenotype.Id = PublishXRef.PhenotypeId AND "
         "Publication.Id = PublishXRef.PublicationId AND "
-        "PublishXRef.InbredSetId = %(trait_dataset_id)s").format(columns=columns)
+        "PublishXRef.InbredSetId = %(trait_dataset_id)s")
     with conn.cursor() as cursor:
         cursor.execute(
             query,
@@ -399,17 +400,16 @@ def retrieve_probeset_trait_info(trait_data_source: Dict[str, Any], conn: Any):
         "probe_set_specificity", "probe_set_blat_score",
         "probe_set_blat_mb_start", "probe_set_blat_mb_end", "probe_set_strand",
         "probe_set_note_by_rw", "flag")
+    columns = (f"ProbeSet.{x}" for x in keys)
     query = (
-        "SELECT "
-        "{columns} "
+        f"SELECT {','.join(columns)} "
         "FROM "
         "ProbeSet, ProbeSetFreeze, ProbeSetXRef "
         "WHERE "
         "ProbeSetXRef.ProbeSetFreezeId = ProbeSetFreeze.Id AND "
         "ProbeSetXRef.ProbeSetId = ProbeSet.Id AND "
         "ProbeSetFreeze.Name = %(trait_dataset_name)s AND "
-        "ProbeSet.Name = %(trait_name)s").format(
-            columns=", ".join(["ProbeSet.{}".format(x) for x in keys]))
+        "ProbeSet.Name = %(trait_name)s")
     with conn.cursor() as cursor:
         cursor.execute(
             query,
@@ -425,16 +425,15 @@ def retrieve_geno_trait_info(trait_data_source: Dict[str, Any], conn: Any):
 
     https://github.com/genenetwork/genenetwork1/blob/master/web/webqtl/base/webqtlTrait.py#L438-L449"""
     keys = ("name", "chr", "mb", "source2", "sequence")
+    columns = ", ".join(f"Geno.{x}" for x in keys)
     query = (
-        "SELECT "
-        "{columns} "
+        f"SELECT {columns} "
         "FROM "
-        "Geno, GenoFreeze, GenoXRef "
+        "Geno INNER JOIN GenoXRef ON GenoXRef.GenoId = Geno.Id "
+        "INNER JOIN GenoFreeze ON GenoFreeze.Id = GenoXRef.GenoFreezeId "
         "WHERE "
-        "GenoXRef.GenoFreezeId = GenoFreeze.Id AND GenoXRef.GenoId = Geno.Id AND "
         "GenoFreeze.Name = %(trait_dataset_name)s AND "
-        "Geno.Name = %(trait_name)s").format(
-            columns=", ".join(["Geno.{}".format(x) for x in keys]))
+        "Geno.Name = %(trait_name)s")
     with conn.cursor() as cursor:
         cursor.execute(
             query,
@@ -451,8 +450,8 @@ def retrieve_temp_trait_info(trait_data_source: Dict[str, Any], conn: Any):
     https://github.com/genenetwork/genenetwork1/blob/master/web/webqtl/base/webqtlTrait.py#L450-452"""
     keys = ("name", "description")
     query = (
-        "SELECT {columns} FROM Temp "
-        "WHERE Name = %(trait_name)s").format(columns=", ".join(keys))
+        f"SELECT {', '.join(keys)} FROM Temp "
+        "WHERE Name = %(trait_name)s")
     with conn.cursor() as cursor:
         cursor.execute(
             query,
@@ -577,7 +576,7 @@ def load_qtl_info(qtl, trait_type, trait_info, conn):
         "Publish": load_publish_qtl_info,
         "ProbeSet": load_probeset_qtl_info
     }
-    if trait_info["name"] not in qtl_info_functions.keys():
+    if trait_info["name"] not in qtl_info_functions:
         return trait_info
 
     return qtl_info_functions[trait_type](trait_info, conn)
@@ -947,8 +946,8 @@ def retrieve_trait_data(trait: dict, conn: Any, samplelist: Sequence[str] = tupl
 
 def generate_traits_filename(base_path: str = TMPDIR):
     """Generate a unique filename for use with generated traits files."""
-    return "{}/traits_test_file_{}.txt".format(
-        os.path.abspath(base_path), random_string(10))
+    return (
+        f"{os.path.abspath(base_path)}/traits_test_file_{random_string(10)}.txt")
 
 
 def export_informative(trait_data: dict, inc_var: bool = False) -> tuple: