import csv import json import os import hashlib import datetime import lmdb import pickle from pathlib import Path from gn2.base.data_set import query_table_timestamp from gn2.base.webqtlConfig import TEXTDIR from gn2.base.webqtlConfig import TMPDIR from json.decoder import JSONDecodeError def cache_trait_metadata(dataset_name, data): try: with lmdb.open(os.path.join(TMPDIR,f"metadata_{dataset_name}"),map_size=20971520) as env: with env.begin(write=True) as txn: data_bytes = pickle.dumps(data) txn.put(f"{dataset_name}".encode(), data_bytes) current_date = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') txn.put(b"creation_date", current_date.encode()) return "success" except lmdb.Error as error: pass def read_trait_metadata(dataset_name): try: with lmdb.open(os.path.join(TMPDIR,f"metadata_{dataset_name}"), readonly=True, lock=False) as env: with env.begin() as txn: db_name = txn.get(dataset_name.encode()) return (pickle.loads(db_name) if db_name else {}) except lmdb.Error as error: return {} def fetch_all_cached_metadata(dataset_name): """in a gvein dataset fetch all the traits metadata""" file_name = generate_filename(dataset_name, suffix="metadata") file_path = Path(TMPDIR, file_name) try: with open(file_path, "r+") as file_handler: dataset_metadata = json.load(file_handler) return (file_path, dataset_metadata) except FileNotFoundError: pass except JSONDecodeError: file_path.unlink() file_path.touch(exist_ok=True) return (file_path, {}) def cache_new_traits_metadata(dataset_metadata: dict, new_traits_metadata, file_path: str): """function to cache the new traits metadata""" if (dataset_metadata == {} and new_traits_metadata == {}): return dataset_metadata.update(new_traits_metadata) with open(file_path, "w+") as file_handler: json.dump(dataset_metadata, file_handler) def generate_filename(*args, suffix="", file_ext="json"): """given a list of args generate a unique filename""" string_unicode = f"{*args,}".encode() return f"{hashlib.md5(string_unicode).hexdigest()}_{suffix}.{file_ext}" def fetch_text_file(dataset_name, conn, text_dir=TMPDIR): """fetch textfiles with strain vals if exists""" def __file_scanner__(text_dir, target_file): for file in os.listdir(text_dir): if file.startswith(f"ProbeSetFreezeId_{target_file}_"): return os.path.join(text_dir, file) with conn.cursor() as cursor: cursor.execute( 'SELECT Id, FullName FROM ProbeSetFreeze WHERE Name = %s', (dataset_name,)) results = cursor.fetchone() if results: try: # checks first for recently generated textfiles if not use gn1 datamatrix return __file_scanner__(text_dir, results[0]) or __file_scanner__(TEXTDIR, results[0]) except Exception: pass def read_text_file(sample_dict, file_path): def __fetch_id_positions__(all_ids, target_ids): _vals = [] _posit = [0] # alternative for parsing for (idx, strain) in enumerate(all_ids, 1): if strain in target_ids: _vals.append(target_ids[strain]) _posit.append(idx) return (_posit, _vals) with open(file_path) as csv_file: csv_reader = csv.reader(csv_file, delimiter=',') _posit, sample_vals = __fetch_id_positions__( next(csv_reader)[1:], sample_dict) return (sample_vals, [[line[i] for i in _posit] for line in csv_reader]) def write_db_to_textfile(db_name, conn, text_dir=TMPDIR): def __sanitise_filename__(filename): ttable = str.maketrans({" ": "_", "/": "_", "\\": "_"}) return str.translate(filename, ttable) def __generate_file_name__(db_name): # todo add expiry time and checker with conn.cursor() as cursor: cursor.execute( 'SELECT Id, FullName FROM ProbeSetFreeze WHERE Name = %s', (db_name,)) results = cursor.fetchone() if (results): return __sanitise_filename__( f"ProbeSetFreezeId_{results[0]}_{results[1]}") def __parse_to_dict__(results): ids = ["ID"] data = {} for (trait, strain, val) in results: if strain not in ids: ids.append(strain) if trait in data: data[trait].append(val) else: data[trait] = [trait, val] return (data, ids) def __write_to_file__(file_path, data, col_names): with open(file_path, 'w+', encoding='UTF8') as file_handler: writer = csv.writer(file_handler) writer.writerow(col_names) writer.writerows(data.values()) with conn.cursor() as cursor: cursor.execute( "SELECT ProbeSet.Name, Strain.Name, ProbeSetData.value " "FROM Strain LEFT JOIN ProbeSetData " "ON Strain.Id = ProbeSetData.StrainId " "LEFT JOIN ProbeSetXRef ON ProbeSetData.Id = ProbeSetXRef.DataId " "LEFT JOIN ProbeSet ON ProbeSetXRef.ProbeSetId = ProbeSet.Id " "WHERE ProbeSetXRef.ProbeSetFreezeId IN " "(SELECT Id FROM ProbeSetFreeze WHERE Name = %s) " "ORDER BY Strain.Name", (db_name,)) results = cursor.fetchall() file_name = __generate_file_name__(db_name) if (results and file_name): __write_to_file__(os.path.join(text_dir, file_name), *__parse_to_dict__(results))