diff options
author | zsloan | 2022-01-14 18:22:32 +0000 |
---|---|---|
committer | zsloan | 2022-01-14 18:22:32 +0000 |
commit | 68ac19153b128f60b660e11365e5fd4304c95300 (patch) | |
tree | 198e03522af43a2d41f3c02cf3785bcfd4635fc4 /wqflask/base/data_set.py | |
parent | f588ad96ae5045499860fa6e2740e101ad4410d7 (diff) | |
parent | 9ab0c3b6cc146e1711f1478242d4198eed720e4c (diff) | |
download | genenetwork2-68ac19153b128f60b660e11365e5fd4304c95300.tar.gz |
Merge branch 'testing' of github.com:genenetwork/genenetwork2 into feature/add_rqtl_pairscan
Diffstat (limited to 'wqflask/base/data_set.py')
-rw-r--r-- | wqflask/base/data_set.py | 187 |
1 files changed, 130 insertions, 57 deletions
diff --git a/wqflask/base/data_set.py b/wqflask/base/data_set.py index 8906ab69..af248659 100644 --- a/wqflask/base/data_set.py +++ b/wqflask/base/data_set.py @@ -20,7 +20,7 @@ from dataclasses import dataclass from dataclasses import field from dataclasses import InitVar -from typing import Optional, Dict +from typing import Optional, Dict, List from db.call import fetchall, fetchone, fetch1 from utility.logger import getLogger from utility.tools import USE_GN_SERVER, USE_REDIS, flat_files, flat_file_exists, GN2_BASE_URL @@ -39,6 +39,9 @@ from db import webqtlDatabaseFunction from base import species from base import webqtlConfig from flask import Flask, g +from base.webqtlConfig import TMPDIR +from urllib.parse import urlparse +from utility.tools import SQL_URI import os import math import string @@ -50,6 +53,8 @@ import requests import gzip import pickle as pickle import itertools +import hashlib +import datetime from redis import Redis @@ -397,7 +402,8 @@ class DatasetGroup: self.parlist = [maternal, paternal] def get_study_samplelists(self): - study_sample_file = locate_ignore_error(self.name + ".json", 'study_sample_lists') + study_sample_file = locate_ignore_error( + self.name + ".json", 'study_sample_lists') try: f = open(study_sample_file) except: @@ -423,8 +429,6 @@ class DatasetGroup: if result is not None: self.samplelist = json.loads(result) else: - logger.debug("Cache not hit") - genotype_fn = locate_ignore_error(self.name + ".geno", 'genotype') if genotype_fn: self.samplelist = get_group_samplelists.get_samplelist( @@ -447,7 +451,6 @@ class DatasetGroup: # genotype_1 is Dataset Object without parents and f1 # genotype_2 is Dataset Object with parents and f1 (not for intercross) - # reaper barfs on unicode filenames, so here we ensure it's a string if self.genofile: if "RData" in self.genofile: # ZS: This is a temporary fix; I need to change the way the JSON files that point to multiple genotype files are structured to point to other file types like RData @@ -726,7 +729,6 @@ class DataSet: data_results = self.chunk_dataset(query_results, len(sample_ids)) self.samplelist = sorted_samplelist self.trait_data = data_results - def get_trait_data(self, sample_list=None): if sample_list: @@ -745,66 +747,75 @@ class DataSet: and Species.name = '{}' """.format(create_in_clause(self.samplelist), *mescape(self.group.species)) results = dict(g.db.execute(query).fetchall()) - sample_ids = [results[item] for item in self.samplelist] + sample_ids = [results.get(item) + for item in self.samplelist if item is not None] # MySQL limits the number of tables that can be used in a join to 61, # so we break the sample ids into smaller chunks # Postgres doesn't have that limit, so we can get rid of this after we transition chunk_size = 50 number_chunks = int(math.ceil(len(sample_ids) / chunk_size)) - trait_sample_data = [] - for sample_ids_step in chunks.divide_into_chunks(sample_ids, number_chunks): - if self.type == "Publish": - dataset_type = "Phenotype" - else: - dataset_type = self.type - temp = ['T%s.value' % item for item in sample_ids_step] - if self.type == "Publish": - query = "SELECT {}XRef.Id,".format(escape(self.type)) - else: - query = "SELECT {}.Name,".format(escape(dataset_type)) - data_start_pos = 1 - query += ', '.join(temp) - query += ' FROM ({}, {}XRef, {}Freeze) '.format(*mescape(dataset_type, - self.type, - self.type)) - - for item in sample_ids_step: - query += """ - left join {}Data as T{} on T{}.Id = {}XRef.DataId - and T{}.StrainId={}\n - """.format(*mescape(self.type, item, item, self.type, item, item)) - - if self.type == "Publish": - query += """ - WHERE {}XRef.InbredSetId = {}Freeze.InbredSetId - and {}Freeze.Name = '{}' - and {}.Id = {}XRef.{}Id - order by {}.Id - """.format(*mescape(self.type, self.type, self.type, self.name, - dataset_type, self.type, dataset_type, dataset_type)) - else: - query += """ - WHERE {}XRef.{}FreezeId = {}Freeze.Id - and {}Freeze.Name = '{}' - and {}.Id = {}XRef.{}Id - order by {}.Id - """.format(*mescape(self.type, self.type, self.type, self.type, - self.name, dataset_type, self.type, self.type, dataset_type)) - results = g.db.execute(query).fetchall() - trait_sample_data.append(results) + cached_results = fetch_cached_results(self.name, self.type) + if cached_results is None: + trait_sample_data = [] + for sample_ids_step in chunks.divide_into_chunks(sample_ids, number_chunks): + if self.type == "Publish": + dataset_type = "Phenotype" + else: + dataset_type = self.type + temp = ['T%s.value' % item for item in sample_ids_step] + if self.type == "Publish": + query = "SELECT {}XRef.Id,".format(escape(self.type)) + else: + query = "SELECT {}.Name,".format(escape(dataset_type)) + data_start_pos = 1 + query += ', '.join(temp) + query += ' FROM ({}, {}XRef, {}Freeze) '.format(*mescape(dataset_type, + self.type, + self.type)) + + for item in sample_ids_step: + query += """ + left join {}Data as T{} on T{}.Id = {}XRef.DataId + and T{}.StrainId={}\n + """.format(*mescape(self.type, item, item, self.type, item, item)) + + if self.type == "Publish": + query += """ + WHERE {}XRef.InbredSetId = {}Freeze.InbredSetId + and {}Freeze.Name = '{}' + and {}.Id = {}XRef.{}Id + order by {}.Id + """.format(*mescape(self.type, self.type, self.type, self.name, + dataset_type, self.type, dataset_type, dataset_type)) + else: + query += """ + WHERE {}XRef.{}FreezeId = {}Freeze.Id + and {}Freeze.Name = '{}' + and {}.Id = {}XRef.{}Id + order by {}.Id + """.format(*mescape(self.type, self.type, self.type, self.type, + self.name, dataset_type, self.type, self.type, dataset_type)) - trait_count = len(trait_sample_data[0]) - self.trait_data = collections.defaultdict(list) + results = g.db.execute(query).fetchall() + trait_sample_data.append([list(result) for result in results]) - # put all of the separate data together into a dictionary where the keys are - # trait names and values are lists of sample values - for trait_counter in range(trait_count): - trait_name = trait_sample_data[0][trait_counter][0] - for chunk_counter in range(int(number_chunks)): - self.trait_data[trait_name] += ( - trait_sample_data[chunk_counter][trait_counter][data_start_pos:]) + trait_count = len(trait_sample_data[0]) + self.trait_data = collections.defaultdict(list) + + data_start_pos = 1 + for trait_counter in range(trait_count): + trait_name = trait_sample_data[0][trait_counter][0] + for chunk_counter in range(int(number_chunks)): + self.trait_data[trait_name] += ( + trait_sample_data[chunk_counter][trait_counter][data_start_pos:]) + + cache_dataset_results( + self.name, self.type, self.trait_data) + else: + + self.trait_data = cached_results class PhenotypeDataSet(DataSet): @@ -1242,3 +1253,65 @@ def geno_mrna_confidentiality(ob): if confidential: return True + + +def parse_db_url(): + parsed_db = urlparse(SQL_URI) + + return (parsed_db.hostname, parsed_db.username, + parsed_db.password, parsed_db.path[1:]) + + +def query_table_timestamp(dataset_type: str): + """function to query the update timestamp of a given dataset_type""" + + # computation data and actions + + fetch_db_name = parse_db_url() + query_update_time = f""" + SELECT UPDATE_TIME FROM information_schema.tables + WHERE TABLE_SCHEMA = '{fetch_db_name[-1]}' + AND TABLE_NAME = '{dataset_type}Data' + """ + + date_time_obj = g.db.execute(query_update_time).fetchone()[0] + return date_time_obj.strftime("%Y-%m-%d %H:%M:%S") + + +def generate_hash_file(dataset_name: str, dataset_type: str, dataset_timestamp: str): + """given the trait_name generate a unique name for this""" + string_unicode = f"{dataset_name}{dataset_timestamp}".encode() + md5hash = hashlib.md5(string_unicode) + return md5hash.hexdigest() + + +def cache_dataset_results(dataset_name: str, dataset_type: str, query_results: List): + """function to cache dataset query results to file + input dataset_name and type query_results(already processed in default dict format) + """ + # data computations actions + # store the file path on redis + + table_timestamp = query_table_timestamp(dataset_type) + + + file_name = generate_hash_file(dataset_name, dataset_type, table_timestamp) + file_path = os.path.join(TMPDIR, f"{file_name}.json") + + with open(file_path, "w") as file_handler: + json.dump(query_results, file_handler) + + +def fetch_cached_results(dataset_name: str, dataset_type: str): + """function to fetch the cached results""" + + table_timestamp = query_table_timestamp(dataset_type) + + file_name = generate_hash_file(dataset_name, dataset_type, table_timestamp) + file_path = os.path.join(TMPDIR, f"{file_name}.json") + try: + with open(file_path, "r") as file_handler: + + return json.load(file_handler) + except FileNotFoundError: + pass |