diff options
-rw-r--r-- | wqflask/wqflask/correlation/pre_computes.py | 160 |
1 files changed, 36 insertions, 124 deletions
diff --git a/wqflask/wqflask/correlation/pre_computes.py b/wqflask/wqflask/correlation/pre_computes.py index f21ec06a..2831bd39 100644 --- a/wqflask/wqflask/correlation/pre_computes.py +++ b/wqflask/wqflask/correlation/pre_computes.py @@ -2,6 +2,10 @@ import csv import json import os import hashlib +import datetime + +import lmdb +import pickle from pathlib import Path from base.data_set import query_table_timestamp @@ -10,6 +14,31 @@ from base.webqtlConfig import TMPDIR from json.decoder import JSONDecodeError +def cache_trait_metadata(dataset_name, data): + + + try: + with lmdb.open(os.path.join(TMPDIR,f"metadata_{dataset_name}"),map_size=20971520) as env: + with env.begin(write=True) as txn: + data_bytes = pickle.dumps(data) + txn.put(f"{dataset_name}".encode(), data_bytes) + current_date = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + txn.put(b"creation_date", current_date.encode()) + return "success" + + except lmdb.Error as error: + pass + +def read_trait_metadata(dataset_name): + try: + with lmdb.open(os.path.join(TMPDIR,f"metadata_{dataset_name}"), + readonly=True, lock=False) as env: + with env.begin() as txn: + db_name = txn.get(dataset_name.encode()) + return (pickle.loads(db_name) if db_name else {}) + except lmdb.Error as error: + return {} + def fetch_all_cached_metadata(dataset_name): """in a gvein dataset fetch all the traits metadata""" @@ -53,132 +82,15 @@ def generate_filename(*args, suffix="", file_ext="json"): return f"{hashlib.md5(string_unicode).hexdigest()}_{suffix}.{file_ext}" -def cache_compute_results(base_dataset_type, - base_dataset_name, - target_dataset_name, - corr_method, - correlation_results, - trait_name): - """function to cache correlation results for heavy computations""" - - base_timestamp = query_table_timestamp(base_dataset_type) - - target_dataset_timestamp = base_timestamp - - file_name = generate_filename( - base_dataset_name, target_dataset_name, - base_timestamp, target_dataset_timestamp, - suffix="corr_precomputes") - - file_path = os.path.join(TMPDIR, file_name) - - try: - with open(file_path, "r+") as json_file_handler: - data = json.load(json_file_handler) - - data[trait_name] = correlation_results - - json_file_handler.seek(0) - - json.dump(data, json_file_handler) - - json_file_handler.truncate() - - except FileNotFoundError: - with open(file_path, "w+") as file_handler: - data = {} - data[trait_name] = correlation_results - - json.dump(data, file_handler) - - -def fetch_precompute_results(base_dataset_name, - target_dataset_name, - dataset_type, - trait_name): - """function to check for precomputed results""" - - base_timestamp = target_dataset_timestamp = query_table_timestamp( - dataset_type) - file_name = generate_filename( - base_dataset_name, target_dataset_name, - base_timestamp, target_dataset_timestamp, - suffix="corr_precomputes") - file_path = os.path.join(TMPDIR, file_name) - try: - with open(file_path, "r+") as json_handler: - correlation_results = json.load(json_handler) - - return correlation_results.get(trait_name) - - except FileNotFoundError: - pass - - -def pre_compute_dataset_vs_dataset(base_dataset, - target_dataset, - corr_method): - """compute sample correlation between dataset vs dataset - wn:heavy function should be invoked less frequently - input:datasets_data(two dicts),corr_method - - output:correlation results for entire dataset against entire dataset - """ - dataset_correlation_results = {} - - target_traits_data, base_traits_data = get_datasets_data( - base_dataset, target_dataset_data) - - for (primary_trait_name, strain_values) in base_traits_data: - - this_trait_data = { - "trait_sample_data": strain_values, - "trait_id": primary_trait_name - } - - trait_correlation_result = compute_all_sample_correlation( - corr_method=corr_method, - this_trait=this_trait_data, - target_dataset=target_traits_data) - - dataset_correlation_results[primary_trait_name] = trait_correlation_result - - return dataset_correlation_results - - -def get_datasets_data(base_dataset, target_dataset_data): - """required to pass data in a given format to the pre compute - function - - (works for bxd only probeset datasets) - - output:two dicts for datasets with key==trait and value==strains - """ - samples_fetched = base_dataset.group.all_samples_ordered() - target_traits_data = target_dataset.get_trait_data( - samples_fetched) - - base_traits_data = base_dataset.get_trait_data( - samples_fetched) - - target_results = map_shared_keys_to_values( - samples_fetched, target_traits_data) - base_results = map_shared_keys_to_values( - samples_fetched, base_traits_data) - - return (target_results, base_results) - - -def fetch_text_file(dataset_name, conn, text_dir=TEXTDIR): +def fetch_text_file(dataset_name, conn, text_dir=TMPDIR): """fetch textfiles with strain vals if exists""" - - def __file_scanner__(text_dir,target_file): - for file in os.listdir(text_dir): - if file.startswith(f"ProbeSetFreezeId_{results[0]}_"): - return os.path.join(text_dir,file) + def __file_scanner__(text_dir, target_file): + for file in os.listdir(text_dir): + if file.startswith(f"ProbeSetFreezeId_{target_file}_"): + return os.path.join(text_dir, file) with conn.cursor() as cursor: cursor.execute( @@ -186,9 +98,9 @@ def fetch_text_file(dataset_name, conn, text_dir=TEXTDIR): results = cursor.fetchone() if results: try: - # addition check for matrix file in gn_matrix folder + # checks first for recently generated textfiles if not use gn1 datamatrix - return __file_scanner__(text_dir,results) or __file_scanner__(TEXTDIR,results) + return __file_scanner__(text_dir, results[0]) or __file_scanner__(TEXTDIR, results[0]) except Exception: pass |