From c1c5ca6e994ff8afbfa7a4270d210fd80d118f98 Mon Sep 17 00:00:00 2001 From: Alexander_Kabui Date: Wed, 19 Apr 2023 02:58:19 +0300 Subject: metadata &textfile generator script --- wqflask/scripts/textfiles_generator.py | 304 +++++++++++++++++++++++++++++++++ 1 file changed, 304 insertions(+) create mode 100644 wqflask/scripts/textfiles_generator.py (limited to 'wqflask') diff --git a/wqflask/scripts/textfiles_generator.py b/wqflask/scripts/textfiles_generator.py new file mode 100644 index 00000000..2f35d6f8 --- /dev/null +++ b/wqflask/scripts/textfiles_generator.py @@ -0,0 +1,304 @@ + +# database connection +import contextlib +import pickle +import datetime +from argparse import ArgumentParser +from typing import Any, Iterator, Protocol, Tuple +from pathlib import Path +from urllib.parse import urlparse +import MySQLdb as mdb +import lmdb +import os + + +""" +*script generate both metadata and probeset textfiles +*manually commonly used datasets + +** USage: +python3 file_name sql_uri_path tmp_path +flags: + + --metadata to generate metadata files + + -- textfile to generate the probeset strain data + + +# example python3 + + python3 meta_data_script.py "mysql://kabui:1234@localhost/db_webqtl" /tmp --textfile + + +""" + +DATASET_NAMES = [ + ("ProbeSet", "HC_M2_0606_P", "mouse"), + ("ProbeSet", "UMUTAffyExon_0209_RMA","mouse") +] + + +def get_probes_meta(): + + # if you need to generate for all probes use this note 1000+ + query = "SELECT Id,NAME FROM ProbeSetFreeze" + cursor.execute(query) + return cursor.fetchall() + + +def parse_db_url(sql_uri: str) -> Tuple: + """function to parse SQL_URI env variable note:there\ + is a default value for SQL_URI so a tuple result is\ + always expected""" + parsed_db = urlparse(sql_uri) + return ( + parsed_db.hostname, parsed_db.username, parsed_db.password, + parsed_db.path[1:], parsed_db.port) + + +class Connection(Protocol): + + def cursor(self, *args) -> Any: + """A cursor in which queries may be performed""" + ... + + +@contextlib.contextmanager +def database_connection(sql_uri: str = "") -> Iterator[Connection]: + """Connect to MySQL database.""" + host, user, passwd, db_name, port = parse_db_url( + sql_uri) + + connection = mdb.connect(db=db_name, + user=user, + passwd=passwd or '', + host=host, + port=port or 3306) + try: + yield connection + finally: + connection.close() + + +def query_probes_metadata(dataset_type, dataset_name, species, sql_uri): + """query traits metadata in bulk for probeset""" + + if dataset_type.lower() != "probeset": + return [] + with database_connection(sql_uri) as conn: + with conn.cursor() as cursor: + query = """ + SELECT ProbeSet.Name,ProbeSet.Chr,ProbeSet.Mb, + ProbeSet.Symbol,ProbeSetXRef.mean, + CONCAT_WS('; ', ProbeSet.description, ProbeSet.Probe_Target_Description) AS description, + ProbeSetXRef.additive,ProbeSetXRef.LRS,Geno.Chr, Geno.Mb + FROM ProbeSet INNER JOIN ProbeSetXRef + ON ProbeSet.Id=ProbeSetXRef.ProbeSetId + INNER JOIN Geno + ON ProbeSetXRef.Locus = Geno.Name + INNER JOIN Species + ON Geno.SpeciesId = Species.Id + WHERE Species.Name = %s AND + ProbeSetXRef.ProbeSetFreezeId IN ( + SELECT ProbeSetFreeze.Id + FROM ProbeSetFreeze WHERE ProbeSetFreeze.Name = %s) + """ + cursor.execute(query, (species,) + (dataset_name,)) + return cursor.fetchall() + + +def get_metadata(dataset_type, dataset_name, species, sql_uri): + """Retrieve the metadata""" + def __location__(probe_chr, probe_mb): + if probe_mb: + return f"Chr{probe_chr}: {probe_mb:.6f}" + return f"Chr{probe_chr}: ???" + return {trait_name: { + "name": trait_name, + "view": True, + "symbol": symbol, + "dataset": dataset_name, + "dataset_name": dataset_name, + "mean": mean, + "description": description, + "additive": additive, + "lrs_score": f"{lrs:3.1f}" if lrs else "", + "location": __location__(probe_chr, probe_mb), + "chr": probe_chr, + "mb": probe_mb, + "lrs_location": f'Chr{chr_score}: {mb:{".6f" if mb else ""}}', + "lrs_chr": chr_score, + "lrs_mb": mb + + } for trait_name, probe_chr, probe_mb, symbol, mean, description, + additive, lrs, chr_score, mb + in query_probes_metadata(dataset_type, dataset_name, species, sql_uri)} + + +def cache_trait_metadata(dataset_name, data): + if not data: + return + try: + + path = os.path.join(TMPDIR, "metadata") + Path(path).mkdir(parents=True, exist_ok=True) + db_path = os.path.join(path, f"metadata_{dataset_name}") + if not check_file_expiry(db_path): + return + with lmdb.open(db_path, map_size=80971520) as env: + with env.begin(write=True) as txn: + data_bytes = pickle.dumps(data) + txn.put(f"{dataset_name}".encode(), data_bytes) + current_date = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + txn.put(b"creation_date", current_date.encode()) + except lmdb.Error as error: + raise error + + +def __sanitise_filename__(filename): + ttable = str.maketrans({" ": "_", "/": "_", "\\": "_"}) + return str.translate(filename, ttable) + +def __generate_file_name__(db_name,sql_uri): + # todo add expiry time and checker + + + with database_connection(sql_uri) as conn: + with conn.cursor() as cursor: + cursor.execute( + 'SELECT Id, FullName FROM ProbeSetFreeze WHERE Name = %s', (db_name,)) + results = cursor.fetchone() + if (results): + return __sanitise_filename__( + f"ProbeSetFreezeId_{results[0]}_{results[1]}") + + +def write_strains_data(sql_uri, dataset_name: str, col_names: list[str], data): + + def __sanitise_filename__(filename): + ttable = str.maketrans({" ": "_", "/": "_", "\\": "_"}) + return str.translate(filename, ttable) + + if not data: + return + try: + + db_path = os.path.join(TMPDIR, __generate_file_name__(dataset_name,sql_uri)) + breakpoint() + + with lmdb.open(db_path, map_size=80971520) as env: + with env.begin(write=True) as txn: + + txn.put(f"strain_names".encode(), pickle.dumps(col_names)) + + txn.put(f"data".encode(), pickle.dumps(data)) + current_date = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + txn.put(b"creation_date", current_date.encode()) + except lmdb.Error as error: + raise error + + + +def generate_probes_textfiles(db_name, db_type, sql_uri): + + def __parse_to_dict__(results): + ids = ["ID"] + data = {} + for (trait, strain, val) in results: + if strain not in ids: + ids.append(strain) + if trait in data: + data[trait].append(val) + else: + data[trait] = [trait, val] + return (data, ids) + with database_connection(sql_uri) as conn: + with conn.cursor() as cursor: + + cursor.execute( + "SELECT ProbeSet.Name, Strain.Name, ProbeSetData.value " + "FROM Strain LEFT JOIN ProbeSetData " + "ON Strain.Id = ProbeSetData.StrainId " + "LEFT JOIN ProbeSetXRef ON ProbeSetData.Id = ProbeSetXRef.DataId " + "LEFT JOIN ProbeSet ON ProbeSetXRef.ProbeSetId = ProbeSet.Id " + "WHERE ProbeSetXRef.ProbeSetFreezeId IN " + "(SELECT Id FROM ProbeSetFreeze WHERE Name = %s) " + "ORDER BY Strain.Name", + (db_name,)) + return __parse_to_dict__(cursor.fetchall()) + + +def argument_parser(): + parser = ArgumentParser() + + # add maybe dataset,species as args + parser.add_argument( + "SQL_URI", + help="The uri to use to connect to the database", + type=str) + + parser.add_argument( + "TMPDIR", + help="tmpdir to write the metadata to", + type=str) + parser.add_argument('--metadata', dest="metadata", action="store_true") + parser.add_argument('--textfiles', dest='textfiles', action='store_true') + + parser.set_defaults(textfiles=False) + parser.set_defaults(metadata=False) + return parser.parse_args() + + +def run_textfiles_generator(args): + try: + for (d_type, dataset_name, _species) in DATASET_NAMES: + if not check_file_expiry(os.path.join(args.TMPDIR,__generate_file_name__(dataset_name,args.SQL_URI))): + return + + breakpoint() + write_strains_data( + args.SQL_URI, dataset_name, *generate_probes_textfiles(dataset_name, d_type, args.SQL_URI)) + except Exception as error: + raise error + + +def run_metadata_files_generator(args): + for (dataset_type, dataset_name, species) in DATASET_NAMES: + try: + cache_trait_metadata(dataset_name, get_metadata( + dataset_type, dataset_name, species, args.SQL_URI)) + except Exception as error: + raise error + +def read_trait_metadata(dataset_name): + try: + with lmdb.open(os.path.join(TMPDIR,f"metadata_{dataset_name}"), + readonly=True, lock=False) as env: + with env.begin() as txn: + db_name = txn.get(dataset_name.encode()) + return (pickle.loads(db_name) if db_name else {}) + except lmdb.Error as error: + return {} + + + +def check_file_expiry(target_file_path,max_days=20): + + # return true if file has expired + try: + with lmdb.open(target_file_path,readonly=True, lock=False) as env: + with env.begin() as txn: + + creation_date = datetime.datetime.strptime(txn.get(b"creation_date").decode(), '%Y-%m-%d %H:%M:%S') + return ((datetime.datetime.now() - creation_date).days > max_days) + except lmdb.Error as error: + return True + + +if __name__ == '__main__': + args = argument_parser() + TMPDIR = args.TMPDIR + if args.metadata: + run_metadata_files_generator(args) + if args.textfiles: + run_textfiles_generator(args) -- cgit v1.2.3