aboutsummaryrefslogtreecommitdiff
path: root/gn3/db/traits.py
diff options
context:
space:
mode:
Diffstat (limited to 'gn3/db/traits.py')
-rw-r--r--gn3/db/traits.py101
1 files changed, 50 insertions, 51 deletions
diff --git a/gn3/db/traits.py b/gn3/db/traits.py
index a7e7e7c..90d1e9d 100644
--- a/gn3/db/traits.py
+++ b/gn3/db/traits.py
@@ -69,7 +69,7 @@ def export_trait_data(
return accumulator + (trait_data["data"][sample]["ndata"], )
if dtype == "all":
return accumulator + __export_all_types(trait_data["data"], sample)
- raise KeyError("Type `%s` is incorrect" % dtype)
+ raise KeyError(f"Type `{dtype}` is incorrect")
if var_exists and n_exists:
return accumulator + (None, None, None)
if var_exists or n_exists:
@@ -125,9 +125,8 @@ def update_sample_data(conn: Any, #pylint: disable=[R0913]
"PublishXRef.Id = %s AND "
"PublishXRef.PhenotypeId = %s "
"AND PublishData.StrainId = Strain.Id "
- "AND Strain.Name = \"%s\"") % (trait_name,
- phenotype_id,
- str(strain_name)))
+ "AND Strain.Name = %s"),
+ (trait_name, phenotype_id, str(strain_name)))
strain_id, data_id = cursor.fetchone()
updated_published_data: int = 0
updated_se_data: int = 0
@@ -137,8 +136,8 @@ def update_sample_data(conn: Any, #pylint: disable=[R0913]
# Update the PublishData table
if value == "x":
cursor.execute(("DELETE FROM PublishData "
- "WHERE StrainId = %s AND Id = %s")
- % (strain_id, data_id))
+ "WHERE StrainId = %s AND Id = %s"),
+ (strain_id, data_id))
updated_published_data = cursor.rowcount
else:
cursor.execute(("UPDATE PublishData SET value = %s "
@@ -148,19 +147,20 @@ def update_sample_data(conn: Any, #pylint: disable=[R0913]
if not updated_published_data:
cursor.execute(
- "SELECT * FROM "
- "PublishData WHERE StrainId = "
- "%s AND Id = %s" % (strain_id, data_id))
+ ("SELECT * FROM "
+ "PublishData WHERE StrainId = "
+ "%s AND Id = %s"),
+ (strain_id, data_id))
if not cursor.fetchone():
cursor.execute(("INSERT INTO PublishData (Id, StrainId, "
- " value) VALUES (%s, %s, %s)") %
+ " value) VALUES (%s, %s, %s)"),
(data_id, strain_id, value))
updated_published_data = cursor.rowcount
# Update the PublishSE table
if error == "x":
cursor.execute(("DELETE FROM PublishSE "
- "WHERE StrainId = %s AND DataId = %s") %
+ "WHERE StrainId = %s AND DataId = %s"),
(strain_id, data_id))
updated_se_data = cursor.rowcount
else:
@@ -171,21 +171,22 @@ def update_sample_data(conn: Any, #pylint: disable=[R0913]
updated_se_data = cursor.rowcount
if not updated_se_data:
cursor.execute(
- "SELECT * FROM "
- "PublishSE WHERE StrainId = "
- "%s AND DataId = %s" % (strain_id, data_id))
+ ("SELECT * FROM "
+ "PublishSE WHERE StrainId = "
+ "%s AND DataId = %s"),
+ (strain_id, data_id))
if not cursor.fetchone():
- cursor.execute(("INSERT INTO PublishSE (StrainId, DataId, "
- " error) VALUES (%s, %s, %s)") %
- (strain_id, data_id,
- None if error == "x" else error))
+ cursor.execute(
+ ("INSERT INTO PublishSE (StrainId, DataId, "
+ " error) VALUES (%s, %s, %s)"),
+ (strain_id, data_id, None if error == "x" else error))
updated_se_data = cursor.rowcount
# Update the NStrain table
if count == "x":
cursor.execute(("DELETE FROM NStrain "
- "WHERE StrainId = %s AND DataId = %s" %
- (strain_id, data_id)))
+ "WHERE StrainId = %s AND DataId = %s"),
+ (strain_id, data_id))
updated_n_strains = cursor.rowcount
else:
cursor.execute(("UPDATE NStrain SET count = %s "
@@ -194,13 +195,14 @@ def update_sample_data(conn: Any, #pylint: disable=[R0913]
updated_n_strains = cursor.rowcount
if not updated_n_strains:
cursor.execute(
- "SELECT * FROM "
- "NStrain WHERE StrainId = "
- "%s AND DataId = %s" % (strain_id, data_id))
+ ("SELECT * FROM "
+ "NStrain WHERE StrainId = "
+ "%s AND DataId = %s"),
+ (strain_id, data_id))
if not cursor.fetchone():
cursor.execute(("INSERT INTO NStrain "
"(StrainId, DataId, count) "
- "VALUES (%s, %s, %s)") %
+ "VALUES (%s, %s, %s)"),
(strain_id, data_id, count))
updated_n_strains = cursor.rowcount
return (updated_published_data,
@@ -237,9 +239,8 @@ def delete_sample_data(conn: Any,
"PublishXRef.Id = %s AND "
"PublishXRef.PhenotypeId = %s "
"AND PublishData.StrainId = Strain.Id "
- "AND Strain.Name = \"%s\"") % (trait_name,
- phenotype_id,
- str(strain_name)))
+ "AND Strain.Name = %s"),
+ (trait_name, phenotype_id, str(strain_name)))
# Check if it exists if the data was already deleted:
if _result := cursor.fetchone():
@@ -248,20 +249,20 @@ def delete_sample_data(conn: Any,
# Only run if the strain_id and data_id exist
if strain_id and data_id:
cursor.execute(("DELETE FROM PublishData "
- "WHERE StrainId = %s AND Id = %s")
- % (strain_id, data_id))
+ "WHERE StrainId = %s AND Id = %s"),
+ (strain_id, data_id))
deleted_published_data = cursor.rowcount
# Delete the PublishSE table
cursor.execute(("DELETE FROM PublishSE "
- "WHERE StrainId = %s AND DataId = %s") %
+ "WHERE StrainId = %s AND DataId = %s"),
(strain_id, data_id))
deleted_se_data = cursor.rowcount
# Delete the NStrain table
cursor.execute(("DELETE FROM NStrain "
- "WHERE StrainId = %s AND DataId = %s" %
- (strain_id, data_id)))
+ "WHERE StrainId = %s AND DataId = %s"),
+ (strain_id, data_id))
deleted_n_strains = cursor.rowcount
except Exception as e: #pylint: disable=[C0103, W0612]
conn.rollback()
@@ -312,7 +313,7 @@ def insert_sample_data(conn: Any, #pylint: disable=[R0913]
# Insert into the PublishSE table if error is specified
if error and error != "x":
cursor.execute(("INSERT INTO PublishSE (StrainId, DataId, "
- " error) VALUES (%s, %s, %s)") %
+ " error) VALUES (%s, %s, %s)"),
(strain_id, data_id, error))
inserted_se_data = cursor.rowcount
@@ -320,7 +321,7 @@ def insert_sample_data(conn: Any, #pylint: disable=[R0913]
if count and count != "x":
cursor.execute(("INSERT INTO NStrain "
"(StrainId, DataId, count) "
- "VALUES (%s, %s, %s)") %
+ "VALUES (%s, %s, %s)"),
(strain_id, data_id, count))
inserted_n_strains = cursor.rowcount
except Exception as e: #pylint: disable=[C0103, W0612]
@@ -356,14 +357,14 @@ def retrieve_publish_trait_info(trait_data_source: Dict[str, Any], conn: Any):
"PublishXRef.comments")
query = (
"SELECT "
- "{columns} "
+ f"{columns} "
"FROM "
"PublishXRef, Publication, Phenotype "
"WHERE "
"PublishXRef.Id = %(trait_name)s AND "
"Phenotype.Id = PublishXRef.PhenotypeId AND "
"Publication.Id = PublishXRef.PublicationId AND "
- "PublishXRef.InbredSetId = %(trait_dataset_id)s").format(columns=columns)
+ "PublishXRef.InbredSetId = %(trait_dataset_id)s")
with conn.cursor() as cursor:
cursor.execute(
query,
@@ -399,17 +400,16 @@ def retrieve_probeset_trait_info(trait_data_source: Dict[str, Any], conn: Any):
"probe_set_specificity", "probe_set_blat_score",
"probe_set_blat_mb_start", "probe_set_blat_mb_end", "probe_set_strand",
"probe_set_note_by_rw", "flag")
+ columns = (f"ProbeSet.{x}" for x in keys)
query = (
- "SELECT "
- "{columns} "
+ f"SELECT {','.join(columns)} "
"FROM "
"ProbeSet, ProbeSetFreeze, ProbeSetXRef "
"WHERE "
"ProbeSetXRef.ProbeSetFreezeId = ProbeSetFreeze.Id AND "
"ProbeSetXRef.ProbeSetId = ProbeSet.Id AND "
"ProbeSetFreeze.Name = %(trait_dataset_name)s AND "
- "ProbeSet.Name = %(trait_name)s").format(
- columns=", ".join(["ProbeSet.{}".format(x) for x in keys]))
+ "ProbeSet.Name = %(trait_name)s")
with conn.cursor() as cursor:
cursor.execute(
query,
@@ -425,16 +425,15 @@ def retrieve_geno_trait_info(trait_data_source: Dict[str, Any], conn: Any):
https://github.com/genenetwork/genenetwork1/blob/master/web/webqtl/base/webqtlTrait.py#L438-L449"""
keys = ("name", "chr", "mb", "source2", "sequence")
+ columns = ", ".join(f"Geno.{x}" for x in keys)
query = (
- "SELECT "
- "{columns} "
+ f"SELECT {columns} "
"FROM "
- "Geno, GenoFreeze, GenoXRef "
+ "Geno INNER JOIN GenoXRef ON GenoXRef.GenoId = Geno.Id "
+ "INNER JOIN GenoFreeze ON GenoFreeze.Id = GenoXRef.GenoFreezeId "
"WHERE "
- "GenoXRef.GenoFreezeId = GenoFreeze.Id AND GenoXRef.GenoId = Geno.Id AND "
"GenoFreeze.Name = %(trait_dataset_name)s AND "
- "Geno.Name = %(trait_name)s").format(
- columns=", ".join(["Geno.{}".format(x) for x in keys]))
+ "Geno.Name = %(trait_name)s")
with conn.cursor() as cursor:
cursor.execute(
query,
@@ -451,8 +450,8 @@ def retrieve_temp_trait_info(trait_data_source: Dict[str, Any], conn: Any):
https://github.com/genenetwork/genenetwork1/blob/master/web/webqtl/base/webqtlTrait.py#L450-452"""
keys = ("name", "description")
query = (
- "SELECT {columns} FROM Temp "
- "WHERE Name = %(trait_name)s").format(columns=", ".join(keys))
+ f"SELECT {', '.join(keys)} FROM Temp "
+ "WHERE Name = %(trait_name)s")
with conn.cursor() as cursor:
cursor.execute(
query,
@@ -577,7 +576,7 @@ def load_qtl_info(qtl, trait_type, trait_info, conn):
"Publish": load_publish_qtl_info,
"ProbeSet": load_probeset_qtl_info
}
- if trait_info["name"] not in qtl_info_functions.keys():
+ if trait_info["name"] not in qtl_info_functions:
return trait_info
return qtl_info_functions[trait_type](trait_info, conn)
@@ -947,8 +946,8 @@ def retrieve_trait_data(trait: dict, conn: Any, samplelist: Sequence[str] = tupl
def generate_traits_filename(base_path: str = TMPDIR):
"""Generate a unique filename for use with generated traits files."""
- return "{}/traits_test_file_{}.txt".format(
- os.path.abspath(base_path), random_string(10))
+ return (
+ f"{os.path.abspath(base_path)}/traits_test_file_{random_string(10)}.txt")
def export_informative(trait_data: dict, inc_var: bool = False) -> tuple: