aboutsummaryrefslogtreecommitdiff
path: root/uploader
diff options
context:
space:
mode:
Diffstat (limited to 'uploader')
-rw-r--r--uploader/phenotypes/models.py103
-rw-r--r--uploader/phenotypes/views.py78
-rw-r--r--uploader/samples/models.py10
-rw-r--r--uploader/static/js/debug.js40
4 files changed, 191 insertions, 40 deletions
diff --git a/uploader/phenotypes/models.py b/uploader/phenotypes/models.py
index 48e64da..20b8e77 100644
--- a/uploader/phenotypes/models.py
+++ b/uploader/phenotypes/models.py
@@ -1,5 +1,7 @@
"""Database and utility functions for phenotypes."""
import logging
+import tempfile
+from pathlib import Path
from functools import reduce
from datetime import datetime
from typing import Optional, Iterable
@@ -13,6 +15,16 @@ from gn_libs.mysqldb import debug_query
logger = logging.getLogger(__name__)
+__PHENO_DATA_TABLES__ = {
+ "PublishData": {
+ "table": "PublishData", "valueCol": "value", "DataIdCol": "Id"},
+ "PublishSE": {
+ "table": "PublishSE", "valueCol": "error", "DataIdCol": "DataId"},
+ "NStrain": {
+ "table": "NStrain", "valueCol": "count", "DataIdCol": "DataId"}
+}
+
+
def datasets_by_population(
conn: mdb.Connection,
species_id: int,
@@ -36,10 +48,10 @@ def dataset_by_id(conn: mdb.Connection,
"""Fetch dataset details by identifier"""
with conn.cursor(cursorclass=DictCursor) as cursor:
cursor.execute(
- "SELECT s.SpeciesId, pf.* FROM Species AS s "
- "INNER JOIN InbredSet AS iset ON s.Id=iset.SpeciesId "
- "INNER JOIN PublishFreeze AS pf ON iset.Id=pf.InbredSetId "
- "WHERE s.Id=%s AND iset.Id=%s AND pf.Id=%s",
+ "SELECT Species.SpeciesId, PublishFreeze.* FROM Species "
+ "INNER JOIN InbredSet ON Species.Id=InbredSet.SpeciesId "
+ "INNER JOIN PublishFreeze ON InbredSet.Id=PublishFreeze.InbredSetId "
+ "WHERE Species.Id=%s AND InbredSet.Id=%s AND PublishFreeze.Id=%s",
(species_id, population_id, dataset_id))
return dict(cursor.fetchone())
@@ -295,13 +307,21 @@ def create_new_phenotypes(conn: mdb.Connection,
"""Add entirely new phenotypes to the database."""
_phenos = tuple()
with conn.cursor(cursorclass=DictCursor) as cursor:
- for batch in take(phenotypes, 1000):
+ while True:
+ batch = take(phenotypes, 1000)
+ if len(batch) == 0:
+ break
+
cursor.executemany(
("INSERT INTO "
- "Phenotypes(Pre_publication_description, Original_description, Units, Authorized_Users) "
- "VALUES (%(id)s, %(description)s, %(units)s, 'robwilliams') "
- "RETURNING *"),
+ "Phenotype(Pre_publication_description, Original_description, Units, Authorized_Users) "
+ "VALUES (%(id)s, %(description)s, %(units)s, 'robwilliams')"),
tuple(batch))
+ paramstr = ", ".join(["%s"] * len(batch))
+ cursor.execute(
+ "SELECT * FROM Phenotype WHERE Pre_publication_description IN "
+ f"({paramstr})",
+ tuple(item["id"] for item in batch))
_phenos = _phenos + tuple({
"phenotype_id": row["Id"],
"id": row["Pre_publication_description"],
@@ -316,23 +336,60 @@ def save_phenotypes_data(
conn: mdb.Connection,
table: str,
data: Iterable[dict]
-) -> tuple[dict, ...]:
+) -> int:
"""Save new phenotypes data into the database."""
- _table_details = {
- "PublishData": {"table": "PublishData", "valueCol": "value"},
- "PublishSE": {"table": "PublishSE", "valueCol": "error"},
- "NStrain": {"table": "PublishData", "valueCol": "count"}
- }[table]
- saved_data = tuple()
+ _table_details = __PHENO_DATA_TABLES__[table]
with conn.cursor(cursorclass=DictCursor) as cursor:
- for batch in take(data, 5000):
+ _count = 0
+ while True:
+ batch = take(data, 100000)
+ if len(batch) == 0:
+ logger.warning("Got an empty batch. This needs investigation.")
+ break
+
+ logger.debug("Saving batch of %s items.", len(batch))
cursor.executemany(
- (f"INSERT INTO {_table_details['table']}"
- f"(Id, StrainId, {_table_details['valueCol']}) "
- "VALUES "
- f"(%(data_id)s, %(sample_id)s, %({_table_details['valueCol']})s) "
- "RETURNING *"),
+ (f"INSERT INTO {_table_details['table']}"
+ f"({_table_details['DataIdCol']}, StrainId, {_table_details['valueCol']}) "
+ "VALUES "
+ f"(%(data_id)s, %(sample_id)s, %(value)s) "),
tuple(batch))
- _data = data + tuple(cursor.fetchall())
+ debug_query(cursor, logger)
+ _count = _count + len(batch)
- return saved_data
+
+ logger.debug("Saved a total of %s data rows", _count)
+ return _count
+
+
+def quick_save_phenotypes_data(
+ conn: mdb.Connection,
+ table: str,
+ dataitems: Iterable[dict],
+ tmpdir: Path
+) -> int:
+ """Save data items to the database, but using """
+ _table_details = __PHENO_DATA_TABLES__[table]
+ with (tempfile.NamedTemporaryFile(
+ prefix=f"{table}_data", mode="wt", dir=tmpdir) as tmpfile,
+ conn.cursor(cursorclass=DictCursor) as cursor):
+ _count = 0
+ console.debug("Write data rows to text file.")
+ for row in dataitems:
+ tmpfile.write(
+ f'{row["data_id"]}\t{row["sample_id"]}\t{row["value"]}\n')
+ _count = _count + 1
+ tmpfile.flush()
+
+ console.debug("Load text file into database (table: %s)",
+ _table_details["table"])
+ cursor.execute(
+ f"LOAD DATA LOCAL INFILE '{tmpfile.name}' "
+ f"INTO TABLE {_table_details['table']} "
+ "("
+ f"{_table_details['DataIdCol']}, "
+ "StrainId, "
+ f"{_table_details['valueCol']}"
+ ")")
+ debug_query(cursor, logger)
+ return _count
diff --git a/uploader/phenotypes/views.py b/uploader/phenotypes/views.py
index 4d733bc..49c12b5 100644
--- a/uploader/phenotypes/views.py
+++ b/uploader/phenotypes/views.py
@@ -4,14 +4,17 @@ import csv
import uuid
import json
import logging
-import datetime
import tempfile
from typing import Any
from pathlib import Path
from zipfile import ZipFile
+from urllib.parse import urljoin
from functools import wraps, reduce
from logging import INFO, ERROR, DEBUG, FATAL, CRITICAL, WARNING
+import datetime
+from datetime import timedelta
+
from redis import Redis
from pymonad.either import Left
from requests.models import Response
@@ -21,7 +24,9 @@ from werkzeug.utils import secure_filename
from gn_libs import sqlite3
from gn_libs import jobs as gnlibs_jobs
from gn_libs.mysqldb import database_connection
+from gn_libs import monadic_requests as mrequests
+from authlib.jose import jwt
from flask import (flash,
request,
url_for,
@@ -35,11 +40,14 @@ from flask import (flash,
from r_qtl import r_qtl2_qc as rqc
from r_qtl import exceptions as rqe
+
from uploader import jobs
+from uploader import session
from uploader.files import save_file#, fullpath
from uploader.ui import make_template_renderer
from uploader.oauth2.client import oauth2_post
from uploader.authorisation import require_login
+from uploader.oauth2 import jwks, client as oauth2client
from uploader.route_utils import generic_select_population
from uploader.datautils import safe_int, enumerate_sequence
from uploader.species.models import all_species, species_by_id
@@ -625,6 +633,7 @@ def load_data_to_database(
with (Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn,
sqlite3.connection(jobs_db) as conn):
qc_job = jobs.job(rconn, jobs.jobsnamespace(), request.form["data-qc-job-id"])
+ _meta = json.loads(qc_job["job-metadata"])
load_job_id = uuid.uuid4()
command = [
sys.executable,
@@ -639,20 +648,65 @@ def load_data_to_database(
app.logger.getEffectiveLevel()
).lower()
]
- load_job = gnlibs_jobs.launch_job(
- gnlibs_jobs.initialise_job(conn,
- load_job_id,
- command,
- "load-new-phenotypes-data",
- extra_meta={
- "species_id": species["SpeciesId"],
- "population_id": population["Id"],
- "dataset_id": dataset["Id"]
- }),
+
+ def __handle_error__(resp):
+ raise Exception(resp)
+
+ def __handle_success__(load_job):
+ app.logger.debug("The phenotypes loading job: %s", load_job)
+ return str(load_job)
+ issued = datetime.datetime.now()
+ jwtkey = jwks.newest_jwk_with_rotation(
+ jwks.jwks_directory(app, "UPLOADER_SECRETS"),
+ int(app.config["JWKS_ROTATION_AGE_DAYS"]))
+
+ return mrequests.post(
+ urljoin(oauth2client.authserver_uri(), "auth/token"),
+ json={
+ "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
+ "scope": oauth2client.SCOPE,
+ "assertion": jwt.encode(
+ header={
+ "alg": "RS256",
+ "typ": "JWT",
+ "kid": jwtkey.as_dict()["kid"]
+ },
+ payload={
+ "iss": str(oauth2client.oauth2_clientid()),
+ "sub": str(session.user_details()["user_id"]),
+ "aud": urljoin(oauth2client.authserver_uri(),
+ "auth/token"),
+ # TODO: Update expiry time once fix is implemented in
+ # auth server.
+ "exp": (issued + timedelta(minutes=5)).timestamp(),
+ "nbf": int(issued.timestamp()),
+ "iat": int(issued.timestamp()),
+ "jti": str(uuid.uuid4())
+ },
+ key=jwtkey).decode("utf8"),
+ "client_id": oauth2client.oauth2_clientid()
+ }
+ ).then(
+ lambda token: gnlibs_jobs.initialise_job(
+ conn,
+ load_job_id,
+ command,
+ "load-new-phenotypes-data",
+ extra_meta={
+ "species_id": species["SpeciesId"],
+ "population_id": population["Id"],
+ "dataset_id": dataset["Id"],
+ "bundle_file": _meta["bundle"],
+ "authserver": oauth2client.authserver_uri(),
+ "token": token["access_token"]
+ })
+ ).then(
+ lambda job: gnlibs_jobs.launch_job(
+ job,
jobs_db,
f"{app.config['UPLOAD_FOLDER']}/job_errors",
worker_manager="gn_libs.jobs.launcher")
- return str(load_job)
+ ).either(__handle_error__, __handle_success__)
def update_phenotype_metadata(conn, metadata: dict):
diff --git a/uploader/samples/models.py b/uploader/samples/models.py
index d7d5384..b419d61 100644
--- a/uploader/samples/models.py
+++ b/uploader/samples/models.py
@@ -15,11 +15,11 @@ def samples_by_species_and_population(
"""Fetch the samples by their species and population."""
with conn.cursor(cursorclass=DictCursor) as cursor:
cursor.execute(
- "SELECT iset.InbredSetId, s.* FROM InbredSet AS iset "
- "INNER JOIN StrainXRef AS sxr ON iset.InbredSetId=sxr.InbredSetId "
- "INNER JOIN Strain AS s ON sxr.StrainId=s.Id "
- "WHERE s.SpeciesId=%(species_id)s "
- "AND iset.InbredSetId=%(population_id)s",
+ "SELECT InbredSet.InbredSetId, Strain.* FROM InbredSet "
+ "INNER JOIN StrainXRef ON InbredSet.InbredSetId=StrainXRef.InbredSetId "
+ "INNER JOIN Strain ON StrainXRef.StrainId=Strain.Id "
+ "WHERE Strain.SpeciesId=%(species_id)s "
+ "AND InbredSet.InbredSetId=%(population_id)s",
{"species_id": species_id, "population_id": population_id})
return tuple(cursor.fetchall())
diff --git a/uploader/static/js/debug.js b/uploader/static/js/debug.js
new file mode 100644
index 0000000..eb01209
--- /dev/null
+++ b/uploader/static/js/debug.js
@@ -0,0 +1,40 @@
+/**
+ * The entire purpose of this function is for use to debug values inline
+ * without changing the flow of the code too much.
+ *
+ * This **MUST** be a non-arrow function to allow access to the `arguments`
+ * object.
+ *
+ * This function expects at least one argument.
+ *
+ * If more than one argument is provided, then:
+ * a) the last argument is considered the value, and will be returned
+ * b) all other arguments will be converted to string and output
+ *
+ * If only one argument is provided, it is considered the value, and will be
+ * returned.
+ *
+ * Zero arguments is an error condition.
+ **/
+function __pk__(val) {
+ /* Handle zero arguments */
+ if (arguments.length < 1) {
+ throw new Error("Invalid arguments: Expected at least one argument.");
+ }
+
+ msg = "/********** DEBUG **********/";
+ if (arguments.length > 1) {
+ msg = Array.from(
+ arguments
+ ).slice(
+ 0,
+ arguments.length - 1
+ ).map((val) => {
+ return String(val);
+ }).join("; ")
+ }
+
+ value = arguments[arguments.length - 1];
+ console.debug("/********** " + msg + " **********/", value);
+ return value;
+}