aboutsummaryrefslogtreecommitdiff
path: root/gn3/db
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2022-02-21 16:23:06 +0300
committerFrederick Muriuki Muriithi2022-02-21 16:23:06 +0300
commita35fce27875d9db80dce1976b6f8ee8c00ecfe0a (patch)
tree8e8f815a6e3d37348bdb8f253f5ec53f72dc2dbc /gn3/db
parentc84b07b8c5ac0a42c0fab929c75823b30b548191 (diff)
downloadgenenetwork3-a35fce27875d9db80dce1976b6f8ee8c00ecfe0a.tar.gz
Fix a myriad of linter issues
* Use `with` in place of plain `open` * Use f-strings in place of `str.format()` * Remove string interpolation from queries - provide data as query parameters * other minor fixes
Diffstat (limited to 'gn3/db')
-rw-r--r--gn3/db/correlations.py20
-rw-r--r--gn3/db/datasets.py10
-rw-r--r--gn3/db/genotypes.py44
-rw-r--r--gn3/db/partial_correlations.py145
-rw-r--r--gn3/db/traits.py101
5 files changed, 143 insertions, 177 deletions
diff --git a/gn3/db/correlations.py b/gn3/db/correlations.py
index d372607..3ae66ca 100644
--- a/gn3/db/correlations.py
+++ b/gn3/db/correlations.py
@@ -26,9 +26,9 @@ def get_filename(conn: Any, target_db_name: str, text_files_dir: str) -> Union[
(target_db_name,))
result = cursor.fetchone()
if result:
- filename = "ProbeSetFreezeId_{tid}_FullName_{fname}.txt".format(
- tid=result[0],
- fname=result[1].replace(' ', '_').replace('/', '_'))
+ filename = (
+ f"ProbeSetFreezeId_{result[0]}_FullName_"
+ f"{result[1].replace(' ', '_').replace('/', '_')}.txt")
full_filename = f"{text_files_dir}/{filename}"
return (
os.path.exists(full_filename) and
@@ -53,7 +53,7 @@ def build_temporary_literature_table(
query = {
"rat": "SELECT rat FROM GeneIDXRef WHERE mouse=%s",
"human": "SELECT human FROM GeneIDXRef WHERE mouse=%d"}
- if species in query.keys():
+ if species in query:
cursor.execute(query[species], row[1])
record = cursor.fetchone()
if record:
@@ -160,8 +160,10 @@ def fetch_symbol_value_pair_dict(
symbol: data_id_dict.get(symbol) for symbol in symbol_list
if data_id_dict.get(symbol) is not None
}
- query = "SELECT Id, value FROM TissueProbeSetData WHERE Id IN ({})".format(
- ",".join(f"%(id{i})s" for i in range(len(data_ids.values()))))
+ data_ids_fields = (f"%(id{i})s" for i in range(len(data_ids.values())))
+ query = (
+ "SELECT Id, value FROM TissueProbeSetData "
+ f"WHERE Id IN ({','.join(data_ids_fields)})")
with conn.cursor() as cursor:
cursor.execute(
query,
@@ -408,12 +410,12 @@ def fetch_sample_ids(
`web.webqtl.correlation.CorrelationPage.fetchAllDatabaseData` function in
GeneNetwork1.
"""
+ samples_fields = (f"%(s{i})s" for i in range(len(sample_names)))
query = (
"SELECT Strain.Id FROM Strain, Species "
- "WHERE Strain.Name IN ({}) "
+ f"WHERE Strain.Name IN ({','.join(samples_fields)}) "
"AND Strain.SpeciesId=Species.Id "
- "AND Species.name=%(species_name)s").format(
- ",".join(f"%(s{i})s" for i in range(len(sample_names))))
+ "AND Species.name=%(species_name)s")
with conn.cursor() as cursor:
cursor.execute(
query,
diff --git a/gn3/db/datasets.py b/gn3/db/datasets.py
index 1d6cdf8..b19db53 100644
--- a/gn3/db/datasets.py
+++ b/gn3/db/datasets.py
@@ -83,8 +83,7 @@ def retrieve_geno_trait_dataset_name(
cursor.fetchone()))
def retrieve_dataset_name(
- trait_type: str, threshold: int, trait_name: str, dataset_name: str,
- conn: Any):
+ trait_type: str, threshold: int, dataset_name: str, conn: Any):
"""
Retrieve the name of a trait given the trait's name
@@ -184,7 +183,6 @@ def retrieve_temp_trait_dataset():
"""
Retrieve the dataset that relates to `Temp` traits
"""
- # pylint: disable=[C0330]
return {
"searchfield": ["name", "description"],
"disfield": ["name", "description"],
@@ -198,7 +196,6 @@ def retrieve_geno_trait_dataset():
"""
Retrieve the dataset that relates to `Geno` traits
"""
- # pylint: disable=[C0330]
return {
"searchfield": ["name", "chr"],
"disfield": ["name", "chr", "mb", "source2", "sequence"],
@@ -209,7 +206,6 @@ def retrieve_publish_trait_dataset():
"""
Retrieve the dataset that relates to `Publish` traits
"""
- # pylint: disable=[C0330]
return {
"searchfield": [
"name", "post_publication_description", "abstract", "title",
@@ -228,7 +224,6 @@ def retrieve_probeset_trait_dataset():
"""
Retrieve the dataset that relates to `ProbeSet` traits
"""
- # pylint: disable=[C0330]
return {
"searchfield": [
"name", "description", "probe_target_description", "symbol",
@@ -259,8 +254,7 @@ def retrieve_trait_dataset(trait_type, trait, threshold, conn):
"dataset_id": None,
"dataset_name": trait["db"]["dataset_name"],
**retrieve_dataset_name(
- trait_type, threshold, trait["trait_name"],
- trait["db"]["dataset_name"], conn)
+ trait_type, threshold, trait["db"]["dataset_name"], conn)
}
group = retrieve_group_fields(
trait_type, trait["trait_name"], dataset_name_info, conn)
diff --git a/gn3/db/genotypes.py b/gn3/db/genotypes.py
index 8f18cac..0e19a5f 100644
--- a/gn3/db/genotypes.py
+++ b/gn3/db/genotypes.py
@@ -2,7 +2,6 @@
import os
import gzip
-from typing import Union, TextIO
from gn3.settings import GENOTYPE_FILES
@@ -10,7 +9,7 @@ def build_genotype_file(
geno_name: str, base_dir: str = GENOTYPE_FILES,
extension: str = "geno"):
"""Build the absolute path for the genotype file."""
- return "{}/{}.{}".format(os.path.abspath(base_dir), geno_name, extension)
+ return f"{os.path.abspath(base_dir)}/{geno_name}.{extension}"
def load_genotype_samples(genotype_filename: str, file_type: str = "geno"):
"""
@@ -44,22 +43,23 @@ def __load_genotype_samples_from_geno(genotype_filename: str):
Loads samples from '.geno' files.
"""
- gzipped_filename = "{}.gz".format(genotype_filename)
+ def __remove_comments_and_empty_lines__(rows):
+ return(
+ line for line in rows
+ if line and not line.startswith(("#", "@")))
+
+ gzipped_filename = f"{genotype_filename}.gz"
if os.path.isfile(gzipped_filename):
- genofile: Union[TextIO, gzip.GzipFile] = gzip.open(gzipped_filename)
+ with gzip.open(gzipped_filename) as genofile:
+ rows = __remove_comments_and_empty_lines__(genofile.readlines())
else:
- genofile = open(genotype_filename)
-
- for row in genofile:
- line = row.strip()
- if (not line) or (line.startswith(("#", "@"))): # type: ignore[arg-type]
- continue
- break
+ with open(genotype_filename, encoding="utf8") as genofile:
+ rows = __remove_comments_and_empty_lines__(genofile.readlines())
- headers = line.split("\t") # type: ignore[arg-type]
+ headers = next(rows).split() # type: ignore[arg-type]
if headers[3] == "Mb":
- return headers[4:]
- return headers[3:]
+ return tuple(headers[4:])
+ return tuple(headers[3:])
def __load_genotype_samples_from_plink(genotype_filename: str):
"""
@@ -67,8 +67,8 @@ def __load_genotype_samples_from_plink(genotype_filename: str):
Loads samples from '.plink' files.
"""
- genofile = open(genotype_filename)
- return [line.split(" ")[1] for line in genofile]
+ with open(genotype_filename, encoding="utf8") as genofile:
+ return tuple(line.split()[1] for line in genofile)
def parse_genotype_labels(lines: list):
"""
@@ -129,7 +129,7 @@ def parse_genotype_marker(line: str, geno_obj: dict, parlist: tuple):
alleles = marker_row[start_pos:]
genotype = tuple(
- (geno_table[allele] if allele in geno_table.keys() else "U")
+ (geno_table[allele] if allele in geno_table else "U")
for allele in alleles)
if len(parlist) > 0:
genotype = (-1, 1) + genotype
@@ -164,7 +164,7 @@ def parse_genotype_file(filename: str, parlist: tuple = tuple()):
"""
Parse the provided genotype file into a usable pytho3 data structure.
"""
- with open(filename, "r") as infile:
+ with open(filename, "r", encoding="utf8") as infile:
contents = infile.readlines()
lines = tuple(line for line in contents if
@@ -175,10 +175,10 @@ def parse_genotype_file(filename: str, parlist: tuple = tuple()):
data_lines = tuple(line for line in lines if not line.startswith("@"))
header = parse_genotype_header(data_lines[0], parlist)
geno_obj = dict(labels + header)
- markers = tuple(
- [parse_genotype_marker(line, geno_obj, parlist)
- for line in data_lines[1:]])
+ markers = (
+ parse_genotype_marker(line, geno_obj, parlist)
+ for line in data_lines[1:])
chromosomes = tuple(
dict(chromosome) for chromosome in
- build_genotype_chromosomes(geno_obj, markers))
+ build_genotype_chromosomes(geno_obj, tuple(markers)))
return {**geno_obj, "chromosomes": chromosomes}
diff --git a/gn3/db/partial_correlations.py b/gn3/db/partial_correlations.py
index 0075cad..a28b111 100644
--- a/gn3/db/partial_correlations.py
+++ b/gn3/db/partial_correlations.py
@@ -48,9 +48,8 @@ def temp_traits_data(conn, traits):
"FROM TempData, Temp, Strain "
"WHERE TempData.StrainId = Strain.Id "
"AND TempData.Id = Temp.DataId "
- "AND Temp.name IN ({}) "
- "ORDER BY Strain.Name").format(
- ", ".join(["%s"] * len(traits)))
+ "AND Temp.name IN ({', '.join(['%s'] * len(traits))}) "
+ "ORDER BY Strain.Name")
with conn.cursor(cursorclass=DictCursor) as cursor:
cursor.execute(
query,
@@ -79,12 +78,11 @@ def publish_traits_data(conn, traits):
"AND NStrain.StrainId = PublishData.StrainId) "
"WHERE PublishXRef.InbredSetId = PublishFreeze.InbredSetId "
"AND PublishData.Id = PublishXRef.DataId "
- "AND PublishXRef.Id IN ({trait_names}) "
- "AND PublishFreeze.Id IN ({dataset_ids}) "
+ f"AND PublishXRef.Id IN ({', '.join(['%s'] * len(traits))}) "
+ "AND PublishFreeze.Id IN "
+ f"({', '.join(['%s'] * len(dataset_ids))}) "
"AND PublishData.StrainId = Strain.Id "
- "ORDER BY Strain.Name").format(
- trait_names=", ".join(["%s"] * len(traits)),
- dataset_ids=", ".join(["%s"] * len(dataset_ids)))
+ "ORDER BY Strain.Name")
if len(dataset_ids) > 0:
with conn.cursor(cursorclass=DictCursor) as cursor:
cursor.execute(
@@ -109,19 +107,16 @@ def cellid_traits_data(conn, traits):
"LEFT JOIN ProbeSE "
"ON (ProbeSE.DataId = ProbeData.Id "
"AND ProbeSE.StrainId = ProbeData.StrainId) "
- "WHERE Probe.Name IN ({cellids}) "
- "AND ProbeSet.Name IN ({trait_names}) "
+ f"WHERE Probe.Name IN ({', '.join(['%s'] * len(cellids))}) "
+ f"AND ProbeSet.Name IN ({', '.join(['%s'] * len(traits))}) "
"AND Probe.ProbeSetId = ProbeSet.Id "
"AND ProbeXRef.ProbeId = Probe.Id "
"AND ProbeXRef.ProbeFreezeId = ProbeFreeze.Id "
"AND ProbeSetFreeze.ProbeFreezeId = ProbeFreeze.Id "
- "AND ProbeSetFreeze.Name IN ({dataset_names}) "
+ f"AND ProbeSetFreeze.Name IN ({', '.join(['%s'] * len(dataset_names))}) "
"AND ProbeXRef.DataId = ProbeData.Id "
"AND ProbeData.StrainId = Strain.Id "
- "ORDER BY Strain.Name").format(
- cellids=", ".join(["%s"] * len(cellids)),
- trait_names=", ".join(["%s"] * len(traits)),
- dataset_names=", ".join(["%s"] * len(dataset_names)))
+ "ORDER BY Strain.Name")
with conn.cursor(cursorclass=DictCursor) as cursor:
cursor.execute(
query,
@@ -143,15 +138,13 @@ def probeset_traits_data(conn, traits):
"LEFT JOIN ProbeSetSE ON "
"(ProbeSetSE.DataId = ProbeSetData.Id "
"AND ProbeSetSE.StrainId = ProbeSetData.StrainId) "
- "WHERE ProbeSet.Name IN ({trait_names}) "
+ f"WHERE ProbeSet.Name IN ({', '.join(['%s'] * len(traits))})"
"AND ProbeSetXRef.ProbeSetId = ProbeSet.Id "
"AND ProbeSetXRef.ProbeSetFreezeId = ProbeSetFreeze.Id "
- "AND ProbeSetFreeze.Name IN ({dataset_names}) "
+ f"AND ProbeSetFreeze.Name IN ({', '.join(['%s']*len(dataset_names))}) "
"AND ProbeSetXRef.DataId = ProbeSetData.Id "
"AND ProbeSetData.StrainId = Strain.Id "
- "ORDER BY Strain.Name").format(
- trait_names=", ".join(["%s"] * len(traits)),
- dataset_names=", ".join(["%s"] * len(dataset_names)))
+ "ORDER BY Strain.Name")
with conn.cursor(cursorclass=DictCursor) as cursor:
cursor.execute(
query,
@@ -170,8 +163,7 @@ def species_ids(conn, traits):
query = (
"SELECT Name AS `group`, SpeciesId AS species_id "
"FROM InbredSet "
- "WHERE Name IN ({groups})").format(
- groups=", ".join(["%s"] * len(groups)))
+ f"WHERE Name IN ({', '.join(['%s'] * len(groups))})")
if len(groups) > 0:
with conn.cursor(cursorclass=DictCursor) as cursor:
cursor.execute(query, groups)
@@ -190,16 +182,14 @@ def geno_traits_data(conn, traits):
"FROM (GenoData, GenoFreeze, Strain, Geno, GenoXRef) "
"LEFT JOIN GenoSE ON "
"(GenoSE.DataId = GenoData.Id AND GenoSE.StrainId = GenoData.StrainId) "
- "WHERE Geno.SpeciesId IN ({species_ids}) "
- "AND Geno.Name IN ({trait_names}) AND GenoXRef.GenoId = Geno.Id "
+ f"WHERE Geno.SpeciesId IN ({', '.join(['%s'] * len(sp_ids))}) "
+ f"AND Geno.Name IN ({', '.join(['%s'] * len(traits))}) "
+ "AND GenoXRef.GenoId = Geno.Id "
"AND GenoXRef.GenoFreezeId = GenoFreeze.Id "
- "AND GenoFreeze.Name IN ({dataset_names}) "
+ f"AND GenoFreeze.Name IN ({', '.join(['%s'] * len(dataset_names))}) "
"AND GenoXRef.DataId = GenoData.Id "
"AND GenoData.StrainId = Strain.Id "
- "ORDER BY Strain.Name").format(
- species_ids=", ".join(["%s"] * len(sp_ids)),
- trait_names=", ".join(["%s"] * len(traits)),
- dataset_names=", ".join(["%s"] * len(dataset_names)))
+ "ORDER BY Strain.Name")
if len(sp_ids) > 0 and len(dataset_names) > 0:
with conn.cursor(cursorclass=DictCursor) as cursor:
cursor.execute(
@@ -309,18 +299,16 @@ def publish_traits_info(
"PublishXRef.Sequence, Phenotype.Units, PublishXRef.comments")
query = (
"SELECT "
- "PublishXRef.Id AS trait_name, {columns} "
+ f"PublishXRef.Id AS trait_name, {columns} "
"FROM "
"PublishXRef, Publication, Phenotype, PublishFreeze "
"WHERE "
- "PublishXRef.Id IN ({trait_names}) "
+ f"PublishXRef.Id IN ({', '.join(['%s'] * len(traits))}) "
"AND Phenotype.Id = PublishXRef.PhenotypeId "
"AND Publication.Id = PublishXRef.PublicationId "
"AND PublishXRef.InbredSetId = PublishFreeze.InbredSetId "
- "AND PublishFreeze.Id IN ({trait_dataset_ids})").format(
- columns=columns,
- trait_names=", ".join(["%s"] * len(traits)),
- trait_dataset_ids=", ".join(["%s"] * len(trait_dataset_ids)))
+ "AND PublishFreeze.Id IN "
+ f"({', '.join(['%s'] * len(trait_dataset_ids))})")
if trait_dataset_ids:
with conn.cursor(cursorclass=DictCursor) as cursor:
cursor.execute(
@@ -337,25 +325,24 @@ def probeset_traits_info(
Retrieve information for the probeset traits
"""
dataset_names = set(trait["db"]["dataset_name"] for trait in traits)
- keys = (
- "name", "symbol", "description", "probe_target_description", "chr",
- "mb", "alias", "geneid", "genbankid", "unigeneid", "omim",
- "refseq_transcriptid", "blatseq", "targetseq", "chipid", "comments",
- "strand_probe", "strand_gene", "probe_set_target_region", "proteinid",
- "probe_set_specificity", "probe_set_blat_score",
- "probe_set_blat_mb_start", "probe_set_blat_mb_end", "probe_set_strand",
- "probe_set_note_by_rw", "flag")
+ columns = ", ".join(
+ [f"ProbeSet.{x}" for x in
+ ("name", "symbol", "description", "probe_target_description", "chr",
+ "mb", "alias", "geneid", "genbankid", "unigeneid", "omim",
+ "refseq_transcriptid", "blatseq", "targetseq", "chipid", "comments",
+ "strand_probe", "strand_gene", "probe_set_target_region", "proteinid",
+ "probe_set_specificity", "probe_set_blat_score",
+ "probe_set_blat_mb_start", "probe_set_blat_mb_end",
+ "probe_set_strand", "probe_set_note_by_rw", "flag")])
query = (
- "SELECT ProbeSet.Name AS trait_name, {columns} "
+ f"SELECT ProbeSet.Name AS trait_name, {columns} "
"FROM ProbeSet INNER JOIN ProbeSetXRef "
"ON ProbeSetXRef.ProbeSetId = ProbeSet.Id "
"INNER JOIN ProbeSetFreeze "
"ON ProbeSetFreeze.Id = ProbeSetXRef.ProbeSetFreezeId "
- "WHERE ProbeSetFreeze.Name IN ({dataset_names}) "
- "AND ProbeSet.Name IN ({trait_names})").format(
- columns=", ".join(["ProbeSet.{}".format(x) for x in keys]),
- dataset_names=", ".join(["%s"] * len(dataset_names)),
- trait_names=", ".join(["%s"] * len(traits)))
+ "WHERE ProbeSetFreeze.Name IN "
+ f"({', '.join(['%s'] * len(dataset_names))}) "
+ f"AND ProbeSet.Name IN ({', '.join(['%s'] * len(traits))})")
with conn.cursor(cursorclass=DictCursor) as cursor:
cursor.execute(
query,
@@ -372,18 +359,16 @@ def geno_traits_info(
This is a rework of the `gn3.db.traits.retrieve_geno_trait_info` function.
"""
dataset_names = set(trait["db"]["dataset_name"] for trait in traits)
- keys = ("name", "chr", "mb", "source2", "sequence")
+ columns = ", ".join([
+ f"Geno.{x}" for x in ("name", "chr", "mb", "source2", "sequence")])
query = (
"SELECT "
- "Geno.Name AS trait_name, {columns} "
+ f"Geno.Name AS trait_name, {columns} "
"FROM "
"Geno INNER JOIN GenoXRef ON GenoXRef.GenoId = Geno.Id "
"INNER JOIN GenoFreeze ON GenoFreeze.Id = GenoXRef.GenoFreezeId "
- "WHERE GenoFreeze.Name IN ({dataset_names}) "
- "AND Geno.Name IN ({trait_names})").format(
- columns=", ".join(["Geno.{}".format(x) for x in keys]),
- dataset_names=", ".join(["%s"] * len(dataset_names)),
- trait_names=", ".join(["%s"] * len(traits)))
+ f"WHERE GenoFreeze.Name IN ({', '.join(['%s'] * len(dataset_names))}) "
+ f"AND Geno.Name IN ({', '.join(['%s'] * len(traits))})")
with conn.cursor(cursorclass=DictCursor) as cursor:
cursor.execute(
query,
@@ -399,12 +384,9 @@ def temp_traits_info(
A rework of the `gn3.db.traits.retrieve_temp_trait_info` function.
"""
- keys = ("name", "description")
query = (
- "SELECT Name as trait_name, {columns} FROM Temp "
- "WHERE Name = ({trait_names})").format(
- columns=", ".join(keys),
- trait_names=", ".join(["%s"] * len(traits)))
+ "SELECT Name as trait_name, name, description FROM Temp "
+ "WHERE Name IN ({', '.join(['%s'] * len(traits))})")
with conn.cursor(cursorclass=DictCursor) as cursor:
cursor.execute(
query,
@@ -468,8 +450,7 @@ def publish_datasets_groups(conn: Any, dataset_names: Tuple[str]):
"InbredSet.Id "
"FROM InbredSet, PublishFreeze "
"WHERE PublishFreeze.InbredSetId = InbredSet.Id "
- "AND PublishFreeze.Name IN ({dataset_names})").format(
- dataset_names=", ".join(["%s"] * len(dataset_names)))
+ "AND PublishFreeze.Name IN ({', '.join(['%s'] * len(dataset_names))})")
with conn.cursor(cursorclass=DictCursor) as cursor:
cursor.execute(query, tuple(dataset_names))
return organise_groups_by_dataset(cursor.fetchall())
@@ -519,8 +500,7 @@ def probeset_datasets_groups(conn, dataset_names):
"FROM InbredSet, ProbeSetFreeze, ProbeFreeze "
"WHERE ProbeFreeze.InbredSetId = InbredSet.Id "
"AND ProbeFreeze.Id = ProbeSetFreeze.ProbeFreezeId "
- "AND ProbeSetFreeze.Name IN ({names})").format(
- names=", ".join(["%s"] * len(dataset_names)))
+ "AND ProbeSetFreeze.Name IN ({', '.join(['%s'] * len(dataset_names))})")
with conn.cursor(cursorclass=DictCursor) as cursor:
cursor.execute(query, tuple(dataset_names))
return organise_groups_by_dataset(cursor.fetchall())
@@ -567,8 +547,7 @@ def geno_datasets_groups(conn, dataset_names):
"SELECT GenoFreeze.Name AS dataset_name, InbredSet.Name, InbredSet.Id "
"FROM InbredSet, GenoFreeze "
"WHERE GenoFreeze.InbredSetId = InbredSet.Id "
- "AND GenoFreeze.Name IN ({names})").format(
- names=", ".join(["%s"] * len(dataset_names)))
+ "AND GenoFreeze.Name IN ({', '.join(['%s'] * len(dataset_names))})")
with conn.cursor(cursorclass=DictCursor) as cursor:
cursor.execute(query, tuple(dataset_names))
return organise_groups_by_dataset(cursor.fetchall())
@@ -596,14 +575,13 @@ def temp_datasets_groups(conn, dataset_names):
"SELECT Temp.Name AS dataset_name, InbredSet.Name, InbredSet.Id "
"FROM InbredSet, Temp "
"WHERE Temp.InbredSetId = InbredSet.Id "
- "AND Temp.Name IN ({names})").format(
- names=", ".join(["%s"] * len(dataset_names)))
+ "AND Temp.Name IN ({', '.join(['%s'] * len(dataset_names))})")
with conn.cursor(cursorclass=DictCursor) as cursor:
cursor.execute(query, tuple(dataset_names))
return organise_groups_by_dataset(cursor.fetchall())
return {}
-def temp_traits_datasets(conn: Any, threshold: int, traits: Tuple[Dict]):
+def temp_traits_datasets(conn: Any, threshold: int, traits: Tuple[Dict]): #pylint: disable=[W0613]
"""
Retrieve datasets for 'Temp' traits.
"""
@@ -657,11 +635,9 @@ def set_publish_qtl_info(conn, qtl, traits):
"SELECT PublishXRef.Id AS trait_name, PublishXRef.Locus, "
"PublishXRef.LRS, PublishXRef.additive "
"FROM PublishXRef, PublishFreeze "
- "WHERE PublishXRef.Id IN ({trait_names}) "
+ f"WHERE PublishXRef.Id IN ({', '.join(['%s'] * len(traits))}) "
"AND PublishXRef.InbredSetId = PublishFreeze.InbredSetId "
- "AND PublishFreeze.Id IN ({dataset_ids})").format(
- trait_names=", ".join(["%s"] * len(traits)),
- dataset_ids=", ".join(["%s"] * len(dataset_ids)))
+ f"AND PublishFreeze.Id IN ({', '.join(['%s'] * len(dataset_ids))})")
return query_qtl_info(conn, query, traits, tuple(dataset_ids))
return traits
@@ -677,10 +653,9 @@ def set_probeset_qtl_info(conn, qtl, traits):
"ProbeSetXRef.mean, ProbeSetXRef.additive "
"FROM ProbeSetXRef, ProbeSet "
"WHERE ProbeSetXRef.ProbeSetId = ProbeSet.Id "
- " AND ProbeSet.Name IN ({trait_names}) "
- "AND ProbeSetXRef.ProbeSetFreezeId IN ({dataset_ids})").format(
- trait_names=", ".join(["%s"] * len(traits)),
- dataset_ids=", ".join(["%s"] * len(dataset_ids)))
+ f"AND ProbeSet.Name IN ({', '.join(['%s'] * len(traits))}) "
+ "AND ProbeSetXRef.ProbeSetFreezeId IN "
+ f"({', '.join(['%s'] * len(dataset_ids))})")
return query_qtl_info(conn, query, traits, tuple(dataset_ids))
return traits
@@ -694,10 +669,8 @@ def set_sequence(conn, traits):
"FROM ProbeSet, ProbeSetFreeze, ProbeSetXRef "
"WHERE ProbeSet.Id=ProbeSetXRef.ProbeSetId "
"AND ProbeSetFreeze.Id = ProbeSetXRef.ProbeSetFreezeId "
- "AND ProbeSet.Name IN ({trait_names}) "
- "AND ProbeSetFreeze.Name IN ({dataset_names})").format(
- trait_names=", ".join(["%s"] * len(traits)),
- dataset_names=", ".join(["%s"] * len(dataset_names)))
+ f"AND ProbeSet.Name IN ({', '.join(['%s'] * len(traits))}) "
+ f"AND ProbeSetFreeze.Name IN ({', '.join(['%s'] * len(dataset_names))})")
with conn.cursor(cursorclass=DictCursor) as cursor:
cursor.execute(
query,
@@ -727,12 +700,10 @@ def set_homologene_id(conn, traits):
"SELECT InbredSet.Name AS `group`, Homologene.GeneId AS geneid, "
"HomologeneId "
"FROM Homologene, Species, InbredSet "
- "WHERE Homologene.GeneId IN ({geneids}) "
- "AND InbredSet.Name IN ({groups}) "
+ f"WHERE Homologene.GeneId IN ({', '.join(['%s'] * len(geneids))}) "
+ f"AND InbredSet.Name IN ({', '.join(['%s'] * len(groups))}) "
"AND InbredSet.SpeciesId = Species.Id "
- "AND Species.TaxonomyId = Homologene.TaxonomyId").format(
- geneids=", ".join(["%s"] * len(geneids)),
- groups=", ".join(["%s"] * len(groups)))
+ "AND Species.TaxonomyId = Homologene.TaxonomyId")
with conn.cursor(cursorclass=DictCursor) as cursor:
cursor.execute(query, (tuple(geneids) + tuple(groups)))
results = {
diff --git a/gn3/db/traits.py b/gn3/db/traits.py
index a7e7e7c..90d1e9d 100644
--- a/gn3/db/traits.py
+++ b/gn3/db/traits.py
@@ -69,7 +69,7 @@ def export_trait_data(
return accumulator + (trait_data["data"][sample]["ndata"], )
if dtype == "all":
return accumulator + __export_all_types(trait_data["data"], sample)
- raise KeyError("Type `%s` is incorrect" % dtype)
+ raise KeyError(f"Type `{dtype}` is incorrect")
if var_exists and n_exists:
return accumulator + (None, None, None)
if var_exists or n_exists:
@@ -125,9 +125,8 @@ def update_sample_data(conn: Any, #pylint: disable=[R0913]
"PublishXRef.Id = %s AND "
"PublishXRef.PhenotypeId = %s "
"AND PublishData.StrainId = Strain.Id "
- "AND Strain.Name = \"%s\"") % (trait_name,
- phenotype_id,
- str(strain_name)))
+ "AND Strain.Name = %s"),
+ (trait_name, phenotype_id, str(strain_name)))
strain_id, data_id = cursor.fetchone()
updated_published_data: int = 0
updated_se_data: int = 0
@@ -137,8 +136,8 @@ def update_sample_data(conn: Any, #pylint: disable=[R0913]
# Update the PublishData table
if value == "x":
cursor.execute(("DELETE FROM PublishData "
- "WHERE StrainId = %s AND Id = %s")
- % (strain_id, data_id))
+ "WHERE StrainId = %s AND Id = %s"),
+ (strain_id, data_id))
updated_published_data = cursor.rowcount
else:
cursor.execute(("UPDATE PublishData SET value = %s "
@@ -148,19 +147,20 @@ def update_sample_data(conn: Any, #pylint: disable=[R0913]
if not updated_published_data:
cursor.execute(
- "SELECT * FROM "
- "PublishData WHERE StrainId = "
- "%s AND Id = %s" % (strain_id, data_id))
+ ("SELECT * FROM "
+ "PublishData WHERE StrainId = "
+ "%s AND Id = %s"),
+ (strain_id, data_id))
if not cursor.fetchone():
cursor.execute(("INSERT INTO PublishData (Id, StrainId, "
- " value) VALUES (%s, %s, %s)") %
+ " value) VALUES (%s, %s, %s)"),
(data_id, strain_id, value))
updated_published_data = cursor.rowcount
# Update the PublishSE table
if error == "x":
cursor.execute(("DELETE FROM PublishSE "
- "WHERE StrainId = %s AND DataId = %s") %
+ "WHERE StrainId = %s AND DataId = %s"),
(strain_id, data_id))
updated_se_data = cursor.rowcount
else:
@@ -171,21 +171,22 @@ def update_sample_data(conn: Any, #pylint: disable=[R0913]
updated_se_data = cursor.rowcount
if not updated_se_data:
cursor.execute(
- "SELECT * FROM "
- "PublishSE WHERE StrainId = "
- "%s AND DataId = %s" % (strain_id, data_id))
+ ("SELECT * FROM "
+ "PublishSE WHERE StrainId = "
+ "%s AND DataId = %s"),
+ (strain_id, data_id))
if not cursor.fetchone():
- cursor.execute(("INSERT INTO PublishSE (StrainId, DataId, "
- " error) VALUES (%s, %s, %s)") %
- (strain_id, data_id,
- None if error == "x" else error))
+ cursor.execute(
+ ("INSERT INTO PublishSE (StrainId, DataId, "
+ " error) VALUES (%s, %s, %s)"),
+ (strain_id, data_id, None if error == "x" else error))
updated_se_data = cursor.rowcount
# Update the NStrain table
if count == "x":
cursor.execute(("DELETE FROM NStrain "
- "WHERE StrainId = %s AND DataId = %s" %
- (strain_id, data_id)))
+ "WHERE StrainId = %s AND DataId = %s"),
+ (strain_id, data_id))
updated_n_strains = cursor.rowcount
else:
cursor.execute(("UPDATE NStrain SET count = %s "
@@ -194,13 +195,14 @@ def update_sample_data(conn: Any, #pylint: disable=[R0913]
updated_n_strains = cursor.rowcount
if not updated_n_strains:
cursor.execute(
- "SELECT * FROM "
- "NStrain WHERE StrainId = "
- "%s AND DataId = %s" % (strain_id, data_id))
+ ("SELECT * FROM "
+ "NStrain WHERE StrainId = "
+ "%s AND DataId = %s"),
+ (strain_id, data_id))
if not cursor.fetchone():
cursor.execute(("INSERT INTO NStrain "
"(StrainId, DataId, count) "
- "VALUES (%s, %s, %s)") %
+ "VALUES (%s, %s, %s)"),
(strain_id, data_id, count))
updated_n_strains = cursor.rowcount
return (updated_published_data,
@@ -237,9 +239,8 @@ def delete_sample_data(conn: Any,
"PublishXRef.Id = %s AND "
"PublishXRef.PhenotypeId = %s "
"AND PublishData.StrainId = Strain.Id "
- "AND Strain.Name = \"%s\"") % (trait_name,
- phenotype_id,
- str(strain_name)))
+ "AND Strain.Name = %s"),
+ (trait_name, phenotype_id, str(strain_name)))
# Check if it exists if the data was already deleted:
if _result := cursor.fetchone():
@@ -248,20 +249,20 @@ def delete_sample_data(conn: Any,
# Only run if the strain_id and data_id exist
if strain_id and data_id:
cursor.execute(("DELETE FROM PublishData "
- "WHERE StrainId = %s AND Id = %s")
- % (strain_id, data_id))
+ "WHERE StrainId = %s AND Id = %s"),
+ (strain_id, data_id))
deleted_published_data = cursor.rowcount
# Delete the PublishSE table
cursor.execute(("DELETE FROM PublishSE "
- "WHERE StrainId = %s AND DataId = %s") %
+ "WHERE StrainId = %s AND DataId = %s"),
(strain_id, data_id))
deleted_se_data = cursor.rowcount
# Delete the NStrain table
cursor.execute(("DELETE FROM NStrain "
- "WHERE StrainId = %s AND DataId = %s" %
- (strain_id, data_id)))
+ "WHERE StrainId = %s AND DataId = %s"),
+ (strain_id, data_id))
deleted_n_strains = cursor.rowcount
except Exception as e: #pylint: disable=[C0103, W0612]
conn.rollback()
@@ -312,7 +313,7 @@ def insert_sample_data(conn: Any, #pylint: disable=[R0913]
# Insert into the PublishSE table if error is specified
if error and error != "x":
cursor.execute(("INSERT INTO PublishSE (StrainId, DataId, "
- " error) VALUES (%s, %s, %s)") %
+ " error) VALUES (%s, %s, %s)"),
(strain_id, data_id, error))
inserted_se_data = cursor.rowcount
@@ -320,7 +321,7 @@ def insert_sample_data(conn: Any, #pylint: disable=[R0913]
if count and count != "x":
cursor.execute(("INSERT INTO NStrain "
"(StrainId, DataId, count) "
- "VALUES (%s, %s, %s)") %
+ "VALUES (%s, %s, %s)"),
(strain_id, data_id, count))
inserted_n_strains = cursor.rowcount
except Exception as e: #pylint: disable=[C0103, W0612]
@@ -356,14 +357,14 @@ def retrieve_publish_trait_info(trait_data_source: Dict[str, Any], conn: Any):
"PublishXRef.comments")
query = (
"SELECT "
- "{columns} "
+ f"{columns} "
"FROM "
"PublishXRef, Publication, Phenotype "
"WHERE "
"PublishXRef.Id = %(trait_name)s AND "
"Phenotype.Id = PublishXRef.PhenotypeId AND "
"Publication.Id = PublishXRef.PublicationId AND "
- "PublishXRef.InbredSetId = %(trait_dataset_id)s").format(columns=columns)
+ "PublishXRef.InbredSetId = %(trait_dataset_id)s")
with conn.cursor() as cursor:
cursor.execute(
query,
@@ -399,17 +400,16 @@ def retrieve_probeset_trait_info(trait_data_source: Dict[str, Any], conn: Any):
"probe_set_specificity", "probe_set_blat_score",
"probe_set_blat_mb_start", "probe_set_blat_mb_end", "probe_set_strand",
"probe_set_note_by_rw", "flag")
+ columns = (f"ProbeSet.{x}" for x in keys)
query = (
- "SELECT "
- "{columns} "
+ f"SELECT {','.join(columns)} "
"FROM "
"ProbeSet, ProbeSetFreeze, ProbeSetXRef "
"WHERE "
"ProbeSetXRef.ProbeSetFreezeId = ProbeSetFreeze.Id AND "
"ProbeSetXRef.ProbeSetId = ProbeSet.Id AND "
"ProbeSetFreeze.Name = %(trait_dataset_name)s AND "
- "ProbeSet.Name = %(trait_name)s").format(
- columns=", ".join(["ProbeSet.{}".format(x) for x in keys]))
+ "ProbeSet.Name = %(trait_name)s")
with conn.cursor() as cursor:
cursor.execute(
query,
@@ -425,16 +425,15 @@ def retrieve_geno_trait_info(trait_data_source: Dict[str, Any], conn: Any):
https://github.com/genenetwork/genenetwork1/blob/master/web/webqtl/base/webqtlTrait.py#L438-L449"""
keys = ("name", "chr", "mb", "source2", "sequence")
+ columns = ", ".join(f"Geno.{x}" for x in keys)
query = (
- "SELECT "
- "{columns} "
+ f"SELECT {columns} "
"FROM "
- "Geno, GenoFreeze, GenoXRef "
+ "Geno INNER JOIN GenoXRef ON GenoXRef.GenoId = Geno.Id "
+ "INNER JOIN GenoFreeze ON GenoFreeze.Id = GenoXRef.GenoFreezeId "
"WHERE "
- "GenoXRef.GenoFreezeId = GenoFreeze.Id AND GenoXRef.GenoId = Geno.Id AND "
"GenoFreeze.Name = %(trait_dataset_name)s AND "
- "Geno.Name = %(trait_name)s").format(
- columns=", ".join(["Geno.{}".format(x) for x in keys]))
+ "Geno.Name = %(trait_name)s")
with conn.cursor() as cursor:
cursor.execute(
query,
@@ -451,8 +450,8 @@ def retrieve_temp_trait_info(trait_data_source: Dict[str, Any], conn: Any):
https://github.com/genenetwork/genenetwork1/blob/master/web/webqtl/base/webqtlTrait.py#L450-452"""
keys = ("name", "description")
query = (
- "SELECT {columns} FROM Temp "
- "WHERE Name = %(trait_name)s").format(columns=", ".join(keys))
+ f"SELECT {', '.join(keys)} FROM Temp "
+ "WHERE Name = %(trait_name)s")
with conn.cursor() as cursor:
cursor.execute(
query,
@@ -577,7 +576,7 @@ def load_qtl_info(qtl, trait_type, trait_info, conn):
"Publish": load_publish_qtl_info,
"ProbeSet": load_probeset_qtl_info
}
- if trait_info["name"] not in qtl_info_functions.keys():
+ if trait_info["name"] not in qtl_info_functions:
return trait_info
return qtl_info_functions[trait_type](trait_info, conn)
@@ -947,8 +946,8 @@ def retrieve_trait_data(trait: dict, conn: Any, samplelist: Sequence[str] = tupl
def generate_traits_filename(base_path: str = TMPDIR):
"""Generate a unique filename for use with generated traits files."""
- return "{}/traits_test_file_{}.txt".format(
- os.path.abspath(base_path), random_string(10))
+ return (
+ f"{os.path.abspath(base_path)}/traits_test_file_{random_string(10)}.txt")
def export_informative(trait_data: dict, inc_var: bool = False) -> tuple: