aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--wqflask/scripts/textfiles_generator.py304
1 files changed, 304 insertions, 0 deletions
diff --git a/wqflask/scripts/textfiles_generator.py b/wqflask/scripts/textfiles_generator.py
new file mode 100644
index 00000000..2f35d6f8
--- /dev/null
+++ b/wqflask/scripts/textfiles_generator.py
@@ -0,0 +1,304 @@
+
+# database connection
+import contextlib
+import pickle
+import datetime
+from argparse import ArgumentParser
+from typing import Any, Iterator, Protocol, Tuple
+from pathlib import Path
+from urllib.parse import urlparse
+import MySQLdb as mdb
+import lmdb
+import os
+
+
+"""
+*script generate both metadata and probeset textfiles
+*manually commonly used datasets
+
+** USage:
+python3 file_name sql_uri_path tmp_path
+flags:
+
+ --metadata to generate metadata files
+
+ -- textfile to generate the probeset strain data
+
+
+# example python3
+
+ python3 meta_data_script.py "mysql://kabui:1234@localhost/db_webqtl" /tmp --textfile
+
+
+"""
+
+DATASET_NAMES = [
+ ("ProbeSet", "HC_M2_0606_P", "mouse"),
+ ("ProbeSet", "UMUTAffyExon_0209_RMA","mouse")
+]
+
+
+def get_probes_meta():
+
+ # if you need to generate for all probes use this note 1000+
+ query = "SELECT Id,NAME FROM ProbeSetFreeze"
+ cursor.execute(query)
+ return cursor.fetchall()
+
+
+def parse_db_url(sql_uri: str) -> Tuple:
+ """function to parse SQL_URI env variable note:there\
+ is a default value for SQL_URI so a tuple result is\
+ always expected"""
+ parsed_db = urlparse(sql_uri)
+ return (
+ parsed_db.hostname, parsed_db.username, parsed_db.password,
+ parsed_db.path[1:], parsed_db.port)
+
+
+class Connection(Protocol):
+
+ def cursor(self, *args) -> Any:
+ """A cursor in which queries may be performed"""
+ ...
+
+
+@contextlib.contextmanager
+def database_connection(sql_uri: str = "") -> Iterator[Connection]:
+ """Connect to MySQL database."""
+ host, user, passwd, db_name, port = parse_db_url(
+ sql_uri)
+
+ connection = mdb.connect(db=db_name,
+ user=user,
+ passwd=passwd or '',
+ host=host,
+ port=port or 3306)
+ try:
+ yield connection
+ finally:
+ connection.close()
+
+
+def query_probes_metadata(dataset_type, dataset_name, species, sql_uri):
+ """query traits metadata in bulk for probeset"""
+
+ if dataset_type.lower() != "probeset":
+ return []
+ with database_connection(sql_uri) as conn:
+ with conn.cursor() as cursor:
+ query = """
+ SELECT ProbeSet.Name,ProbeSet.Chr,ProbeSet.Mb,
+ ProbeSet.Symbol,ProbeSetXRef.mean,
+ CONCAT_WS('; ', ProbeSet.description, ProbeSet.Probe_Target_Description) AS description,
+ ProbeSetXRef.additive,ProbeSetXRef.LRS,Geno.Chr, Geno.Mb
+ FROM ProbeSet INNER JOIN ProbeSetXRef
+ ON ProbeSet.Id=ProbeSetXRef.ProbeSetId
+ INNER JOIN Geno
+ ON ProbeSetXRef.Locus = Geno.Name
+ INNER JOIN Species
+ ON Geno.SpeciesId = Species.Id
+ WHERE Species.Name = %s AND
+ ProbeSetXRef.ProbeSetFreezeId IN (
+ SELECT ProbeSetFreeze.Id
+ FROM ProbeSetFreeze WHERE ProbeSetFreeze.Name = %s)
+ """
+ cursor.execute(query, (species,) + (dataset_name,))
+ return cursor.fetchall()
+
+
+def get_metadata(dataset_type, dataset_name, species, sql_uri):
+ """Retrieve the metadata"""
+ def __location__(probe_chr, probe_mb):
+ if probe_mb:
+ return f"Chr{probe_chr}: {probe_mb:.6f}"
+ return f"Chr{probe_chr}: ???"
+ return {trait_name: {
+ "name": trait_name,
+ "view": True,
+ "symbol": symbol,
+ "dataset": dataset_name,
+ "dataset_name": dataset_name,
+ "mean": mean,
+ "description": description,
+ "additive": additive,
+ "lrs_score": f"{lrs:3.1f}" if lrs else "",
+ "location": __location__(probe_chr, probe_mb),
+ "chr": probe_chr,
+ "mb": probe_mb,
+ "lrs_location": f'Chr{chr_score}: {mb:{".6f" if mb else ""}}',
+ "lrs_chr": chr_score,
+ "lrs_mb": mb
+
+ } for trait_name, probe_chr, probe_mb, symbol, mean, description,
+ additive, lrs, chr_score, mb
+ in query_probes_metadata(dataset_type, dataset_name, species, sql_uri)}
+
+
+def cache_trait_metadata(dataset_name, data):
+ if not data:
+ return
+ try:
+
+ path = os.path.join(TMPDIR, "metadata")
+ Path(path).mkdir(parents=True, exist_ok=True)
+ db_path = os.path.join(path, f"metadata_{dataset_name}")
+ if not check_file_expiry(db_path):
+ return
+ with lmdb.open(db_path, map_size=80971520) as env:
+ with env.begin(write=True) as txn:
+ data_bytes = pickle.dumps(data)
+ txn.put(f"{dataset_name}".encode(), data_bytes)
+ current_date = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+ txn.put(b"creation_date", current_date.encode())
+ except lmdb.Error as error:
+ raise error
+
+
+def __sanitise_filename__(filename):
+ ttable = str.maketrans({" ": "_", "/": "_", "\\": "_"})
+ return str.translate(filename, ttable)
+
+def __generate_file_name__(db_name,sql_uri):
+ # todo add expiry time and checker
+
+
+ with database_connection(sql_uri) as conn:
+ with conn.cursor() as cursor:
+ cursor.execute(
+ 'SELECT Id, FullName FROM ProbeSetFreeze WHERE Name = %s', (db_name,))
+ results = cursor.fetchone()
+ if (results):
+ return __sanitise_filename__(
+ f"ProbeSetFreezeId_{results[0]}_{results[1]}")
+
+
+def write_strains_data(sql_uri, dataset_name: str, col_names: list[str], data):
+
+ def __sanitise_filename__(filename):
+ ttable = str.maketrans({" ": "_", "/": "_", "\\": "_"})
+ return str.translate(filename, ttable)
+
+ if not data:
+ return
+ try:
+
+ db_path = os.path.join(TMPDIR, __generate_file_name__(dataset_name,sql_uri))
+ breakpoint()
+
+ with lmdb.open(db_path, map_size=80971520) as env:
+ with env.begin(write=True) as txn:
+
+ txn.put(f"strain_names".encode(), pickle.dumps(col_names))
+
+ txn.put(f"data".encode(), pickle.dumps(data))
+ current_date = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+ txn.put(b"creation_date", current_date.encode())
+ except lmdb.Error as error:
+ raise error
+
+
+
+def generate_probes_textfiles(db_name, db_type, sql_uri):
+
+ def __parse_to_dict__(results):
+ ids = ["ID"]
+ data = {}
+ for (trait, strain, val) in results:
+ if strain not in ids:
+ ids.append(strain)
+ if trait in data:
+ data[trait].append(val)
+ else:
+ data[trait] = [trait, val]
+ return (data, ids)
+ with database_connection(sql_uri) as conn:
+ with conn.cursor() as cursor:
+
+ cursor.execute(
+ "SELECT ProbeSet.Name, Strain.Name, ProbeSetData.value "
+ "FROM Strain LEFT JOIN ProbeSetData "
+ "ON Strain.Id = ProbeSetData.StrainId "
+ "LEFT JOIN ProbeSetXRef ON ProbeSetData.Id = ProbeSetXRef.DataId "
+ "LEFT JOIN ProbeSet ON ProbeSetXRef.ProbeSetId = ProbeSet.Id "
+ "WHERE ProbeSetXRef.ProbeSetFreezeId IN "
+ "(SELECT Id FROM ProbeSetFreeze WHERE Name = %s) "
+ "ORDER BY Strain.Name",
+ (db_name,))
+ return __parse_to_dict__(cursor.fetchall())
+
+
+def argument_parser():
+ parser = ArgumentParser()
+
+ # add maybe dataset,species as args
+ parser.add_argument(
+ "SQL_URI",
+ help="The uri to use to connect to the database",
+ type=str)
+
+ parser.add_argument(
+ "TMPDIR",
+ help="tmpdir to write the metadata to",
+ type=str)
+ parser.add_argument('--metadata', dest="metadata", action="store_true")
+ parser.add_argument('--textfiles', dest='textfiles', action='store_true')
+
+ parser.set_defaults(textfiles=False)
+ parser.set_defaults(metadata=False)
+ return parser.parse_args()
+
+
+def run_textfiles_generator(args):
+ try:
+ for (d_type, dataset_name, _species) in DATASET_NAMES:
+ if not check_file_expiry(os.path.join(args.TMPDIR,__generate_file_name__(dataset_name,args.SQL_URI))):
+ return
+
+ breakpoint()
+ write_strains_data(
+ args.SQL_URI, dataset_name, *generate_probes_textfiles(dataset_name, d_type, args.SQL_URI))
+ except Exception as error:
+ raise error
+
+
+def run_metadata_files_generator(args):
+ for (dataset_type, dataset_name, species) in DATASET_NAMES:
+ try:
+ cache_trait_metadata(dataset_name, get_metadata(
+ dataset_type, dataset_name, species, args.SQL_URI))
+ except Exception as error:
+ raise error
+
+def read_trait_metadata(dataset_name):
+ try:
+ with lmdb.open(os.path.join(TMPDIR,f"metadata_{dataset_name}"),
+ readonly=True, lock=False) as env:
+ with env.begin() as txn:
+ db_name = txn.get(dataset_name.encode())
+ return (pickle.loads(db_name) if db_name else {})
+ except lmdb.Error as error:
+ return {}
+
+
+
+def check_file_expiry(target_file_path,max_days=20):
+
+ # return true if file has expired
+ try:
+ with lmdb.open(target_file_path,readonly=True, lock=False) as env:
+ with env.begin() as txn:
+
+ creation_date = datetime.datetime.strptime(txn.get(b"creation_date").decode(), '%Y-%m-%d %H:%M:%S')
+ return ((datetime.datetime.now() - creation_date).days > max_days)
+ except lmdb.Error as error:
+ return True
+
+
+if __name__ == '__main__':
+ args = argument_parser()
+ TMPDIR = args.TMPDIR
+ if args.metadata:
+ run_metadata_files_generator(args)
+ if args.textfiles:
+ run_textfiles_generator(args)