diff options
Diffstat (limited to 'uploader')
-rw-r--r-- | uploader/phenotypes/models.py | 103 | ||||
-rw-r--r-- | uploader/phenotypes/views.py | 78 | ||||
-rw-r--r-- | uploader/samples/models.py | 10 | ||||
-rw-r--r-- | uploader/static/js/debug.js | 40 |
4 files changed, 191 insertions, 40 deletions
diff --git a/uploader/phenotypes/models.py b/uploader/phenotypes/models.py index 48e64da..20b8e77 100644 --- a/uploader/phenotypes/models.py +++ b/uploader/phenotypes/models.py @@ -1,5 +1,7 @@ """Database and utility functions for phenotypes.""" import logging +import tempfile +from pathlib import Path from functools import reduce from datetime import datetime from typing import Optional, Iterable @@ -13,6 +15,16 @@ from gn_libs.mysqldb import debug_query logger = logging.getLogger(__name__) +__PHENO_DATA_TABLES__ = { + "PublishData": { + "table": "PublishData", "valueCol": "value", "DataIdCol": "Id"}, + "PublishSE": { + "table": "PublishSE", "valueCol": "error", "DataIdCol": "DataId"}, + "NStrain": { + "table": "NStrain", "valueCol": "count", "DataIdCol": "DataId"} +} + + def datasets_by_population( conn: mdb.Connection, species_id: int, @@ -36,10 +48,10 @@ def dataset_by_id(conn: mdb.Connection, """Fetch dataset details by identifier""" with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute( - "SELECT s.SpeciesId, pf.* FROM Species AS s " - "INNER JOIN InbredSet AS iset ON s.Id=iset.SpeciesId " - "INNER JOIN PublishFreeze AS pf ON iset.Id=pf.InbredSetId " - "WHERE s.Id=%s AND iset.Id=%s AND pf.Id=%s", + "SELECT Species.SpeciesId, PublishFreeze.* FROM Species " + "INNER JOIN InbredSet ON Species.Id=InbredSet.SpeciesId " + "INNER JOIN PublishFreeze ON InbredSet.Id=PublishFreeze.InbredSetId " + "WHERE Species.Id=%s AND InbredSet.Id=%s AND PublishFreeze.Id=%s", (species_id, population_id, dataset_id)) return dict(cursor.fetchone()) @@ -295,13 +307,21 @@ def create_new_phenotypes(conn: mdb.Connection, """Add entirely new phenotypes to the database.""" _phenos = tuple() with conn.cursor(cursorclass=DictCursor) as cursor: - for batch in take(phenotypes, 1000): + while True: + batch = take(phenotypes, 1000) + if len(batch) == 0: + break + cursor.executemany( ("INSERT INTO " - "Phenotypes(Pre_publication_description, Original_description, Units, Authorized_Users) " - "VALUES (%(id)s, %(description)s, %(units)s, 'robwilliams') " - "RETURNING *"), + "Phenotype(Pre_publication_description, Original_description, Units, Authorized_Users) " + "VALUES (%(id)s, %(description)s, %(units)s, 'robwilliams')"), tuple(batch)) + paramstr = ", ".join(["%s"] * len(batch)) + cursor.execute( + "SELECT * FROM Phenotype WHERE Pre_publication_description IN " + f"({paramstr})", + tuple(item["id"] for item in batch)) _phenos = _phenos + tuple({ "phenotype_id": row["Id"], "id": row["Pre_publication_description"], @@ -316,23 +336,60 @@ def save_phenotypes_data( conn: mdb.Connection, table: str, data: Iterable[dict] -) -> tuple[dict, ...]: +) -> int: """Save new phenotypes data into the database.""" - _table_details = { - "PublishData": {"table": "PublishData", "valueCol": "value"}, - "PublishSE": {"table": "PublishSE", "valueCol": "error"}, - "NStrain": {"table": "PublishData", "valueCol": "count"} - }[table] - saved_data = tuple() + _table_details = __PHENO_DATA_TABLES__[table] with conn.cursor(cursorclass=DictCursor) as cursor: - for batch in take(data, 5000): + _count = 0 + while True: + batch = take(data, 100000) + if len(batch) == 0: + logger.warning("Got an empty batch. This needs investigation.") + break + + logger.debug("Saving batch of %s items.", len(batch)) cursor.executemany( - (f"INSERT INTO {_table_details['table']}" - f"(Id, StrainId, {_table_details['valueCol']}) " - "VALUES " - f"(%(data_id)s, %(sample_id)s, %({_table_details['valueCol']})s) " - "RETURNING *"), + (f"INSERT INTO {_table_details['table']}" + f"({_table_details['DataIdCol']}, StrainId, {_table_details['valueCol']}) " + "VALUES " + f"(%(data_id)s, %(sample_id)s, %(value)s) "), tuple(batch)) - _data = data + tuple(cursor.fetchall()) + debug_query(cursor, logger) + _count = _count + len(batch) - return saved_data + + logger.debug("Saved a total of %s data rows", _count) + return _count + + +def quick_save_phenotypes_data( + conn: mdb.Connection, + table: str, + dataitems: Iterable[dict], + tmpdir: Path +) -> int: + """Save data items to the database, but using """ + _table_details = __PHENO_DATA_TABLES__[table] + with (tempfile.NamedTemporaryFile( + prefix=f"{table}_data", mode="wt", dir=tmpdir) as tmpfile, + conn.cursor(cursorclass=DictCursor) as cursor): + _count = 0 + console.debug("Write data rows to text file.") + for row in dataitems: + tmpfile.write( + f'{row["data_id"]}\t{row["sample_id"]}\t{row["value"]}\n') + _count = _count + 1 + tmpfile.flush() + + console.debug("Load text file into database (table: %s)", + _table_details["table"]) + cursor.execute( + f"LOAD DATA LOCAL INFILE '{tmpfile.name}' " + f"INTO TABLE {_table_details['table']} " + "(" + f"{_table_details['DataIdCol']}, " + "StrainId, " + f"{_table_details['valueCol']}" + ")") + debug_query(cursor, logger) + return _count diff --git a/uploader/phenotypes/views.py b/uploader/phenotypes/views.py index 4d733bc..49c12b5 100644 --- a/uploader/phenotypes/views.py +++ b/uploader/phenotypes/views.py @@ -4,14 +4,17 @@ import csv import uuid import json import logging -import datetime import tempfile from typing import Any from pathlib import Path from zipfile import ZipFile +from urllib.parse import urljoin from functools import wraps, reduce from logging import INFO, ERROR, DEBUG, FATAL, CRITICAL, WARNING +import datetime +from datetime import timedelta + from redis import Redis from pymonad.either import Left from requests.models import Response @@ -21,7 +24,9 @@ from werkzeug.utils import secure_filename from gn_libs import sqlite3 from gn_libs import jobs as gnlibs_jobs from gn_libs.mysqldb import database_connection +from gn_libs import monadic_requests as mrequests +from authlib.jose import jwt from flask import (flash, request, url_for, @@ -35,11 +40,14 @@ from flask import (flash, from r_qtl import r_qtl2_qc as rqc from r_qtl import exceptions as rqe + from uploader import jobs +from uploader import session from uploader.files import save_file#, fullpath from uploader.ui import make_template_renderer from uploader.oauth2.client import oauth2_post from uploader.authorisation import require_login +from uploader.oauth2 import jwks, client as oauth2client from uploader.route_utils import generic_select_population from uploader.datautils import safe_int, enumerate_sequence from uploader.species.models import all_species, species_by_id @@ -625,6 +633,7 @@ def load_data_to_database( with (Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn, sqlite3.connection(jobs_db) as conn): qc_job = jobs.job(rconn, jobs.jobsnamespace(), request.form["data-qc-job-id"]) + _meta = json.loads(qc_job["job-metadata"]) load_job_id = uuid.uuid4() command = [ sys.executable, @@ -639,20 +648,65 @@ def load_data_to_database( app.logger.getEffectiveLevel() ).lower() ] - load_job = gnlibs_jobs.launch_job( - gnlibs_jobs.initialise_job(conn, - load_job_id, - command, - "load-new-phenotypes-data", - extra_meta={ - "species_id": species["SpeciesId"], - "population_id": population["Id"], - "dataset_id": dataset["Id"] - }), + + def __handle_error__(resp): + raise Exception(resp) + + def __handle_success__(load_job): + app.logger.debug("The phenotypes loading job: %s", load_job) + return str(load_job) + issued = datetime.datetime.now() + jwtkey = jwks.newest_jwk_with_rotation( + jwks.jwks_directory(app, "UPLOADER_SECRETS"), + int(app.config["JWKS_ROTATION_AGE_DAYS"])) + + return mrequests.post( + urljoin(oauth2client.authserver_uri(), "auth/token"), + json={ + "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer", + "scope": oauth2client.SCOPE, + "assertion": jwt.encode( + header={ + "alg": "RS256", + "typ": "JWT", + "kid": jwtkey.as_dict()["kid"] + }, + payload={ + "iss": str(oauth2client.oauth2_clientid()), + "sub": str(session.user_details()["user_id"]), + "aud": urljoin(oauth2client.authserver_uri(), + "auth/token"), + # TODO: Update expiry time once fix is implemented in + # auth server. + "exp": (issued + timedelta(minutes=5)).timestamp(), + "nbf": int(issued.timestamp()), + "iat": int(issued.timestamp()), + "jti": str(uuid.uuid4()) + }, + key=jwtkey).decode("utf8"), + "client_id": oauth2client.oauth2_clientid() + } + ).then( + lambda token: gnlibs_jobs.initialise_job( + conn, + load_job_id, + command, + "load-new-phenotypes-data", + extra_meta={ + "species_id": species["SpeciesId"], + "population_id": population["Id"], + "dataset_id": dataset["Id"], + "bundle_file": _meta["bundle"], + "authserver": oauth2client.authserver_uri(), + "token": token["access_token"] + }) + ).then( + lambda job: gnlibs_jobs.launch_job( + job, jobs_db, f"{app.config['UPLOAD_FOLDER']}/job_errors", worker_manager="gn_libs.jobs.launcher") - return str(load_job) + ).either(__handle_error__, __handle_success__) def update_phenotype_metadata(conn, metadata: dict): diff --git a/uploader/samples/models.py b/uploader/samples/models.py index d7d5384..b419d61 100644 --- a/uploader/samples/models.py +++ b/uploader/samples/models.py @@ -15,11 +15,11 @@ def samples_by_species_and_population( """Fetch the samples by their species and population.""" with conn.cursor(cursorclass=DictCursor) as cursor: cursor.execute( - "SELECT iset.InbredSetId, s.* FROM InbredSet AS iset " - "INNER JOIN StrainXRef AS sxr ON iset.InbredSetId=sxr.InbredSetId " - "INNER JOIN Strain AS s ON sxr.StrainId=s.Id " - "WHERE s.SpeciesId=%(species_id)s " - "AND iset.InbredSetId=%(population_id)s", + "SELECT InbredSet.InbredSetId, Strain.* FROM InbredSet " + "INNER JOIN StrainXRef ON InbredSet.InbredSetId=StrainXRef.InbredSetId " + "INNER JOIN Strain ON StrainXRef.StrainId=Strain.Id " + "WHERE Strain.SpeciesId=%(species_id)s " + "AND InbredSet.InbredSetId=%(population_id)s", {"species_id": species_id, "population_id": population_id}) return tuple(cursor.fetchall()) diff --git a/uploader/static/js/debug.js b/uploader/static/js/debug.js new file mode 100644 index 0000000..eb01209 --- /dev/null +++ b/uploader/static/js/debug.js @@ -0,0 +1,40 @@ +/** + * The entire purpose of this function is for use to debug values inline + * without changing the flow of the code too much. + * + * This **MUST** be a non-arrow function to allow access to the `arguments` + * object. + * + * This function expects at least one argument. + * + * If more than one argument is provided, then: + * a) the last argument is considered the value, and will be returned + * b) all other arguments will be converted to string and output + * + * If only one argument is provided, it is considered the value, and will be + * returned. + * + * Zero arguments is an error condition. + **/ +function __pk__(val) { + /* Handle zero arguments */ + if (arguments.length < 1) { + throw new Error("Invalid arguments: Expected at least one argument."); + } + + msg = "/********** DEBUG **********/"; + if (arguments.length > 1) { + msg = Array.from( + arguments + ).slice( + 0, + arguments.length - 1 + ).map((val) => { + return String(val); + }).join("; ") + } + + value = arguments[arguments.length - 1]; + console.debug("/********** " + msg + " **********/", value); + return value; +} |