diff options
Diffstat (limited to 'uploader/phenotypes')
| -rw-r--r-- | uploader/phenotypes/models.py | 133 | ||||
| -rw-r--r-- | uploader/phenotypes/views.py | 224 |
2 files changed, 298 insertions, 59 deletions
diff --git a/uploader/phenotypes/models.py b/uploader/phenotypes/models.py index 04abcc9..3d656d2 100644 --- a/uploader/phenotypes/models.py +++ b/uploader/phenotypes/models.py @@ -1,4 +1,6 @@ """Database and utility functions for phenotypes.""" +import time +import random import logging import tempfile from pathlib import Path @@ -85,24 +87,43 @@ def phenotype_publication_data(conn, phenotype_id) -> Optional[dict]: return dict(res) -def dataset_phenotypes(conn: Connection, - population_id: int, - dataset_id: int, - offset: int = 0, - limit: Optional[int] = None) -> tuple[dict, ...]: +def dataset_phenotypes(# pylint: disable=[too-many-arguments, too-many-positional-arguments] + conn: Connection, + population_id: int, + dataset_id: int, + offset: int = 0, + limit: Optional[int] = None, + xref_ids: tuple[int, ...] = tuple() +) -> tuple[dict, ...]: """Fetch the actual phenotypes.""" - _query = ( - "SELECT pheno.*, pxr.Id AS xref_id, pxr.InbredSetId, ist.InbredSetCode " + _narrow_by_ids = ( + f" AND pxr.Id IN ({', '.join(['%s'] * len(xref_ids))})" + if len(xref_ids) > 0 else "") + _narrow_by_limit = ( + f" LIMIT {limit} OFFSET {offset}" if bool(limit) else "") + _pub_query = ( + "SELECT pub.* " + "FROM PublishXRef AS pxr " + "INNER JOIN Publication AS pub ON pxr.PublicationId=pub.Id " + "WHERE pxr.InbredSetId=%s") + _narrow_by_ids + _pheno_query = (( + "SELECT pheno.*, pxr.Id AS xref_id, pxr.InbredSetId, pxr.PublicationId, " + "ist.InbredSetCode " "FROM Phenotype AS pheno " "INNER JOIN PublishXRef AS pxr ON pheno.Id=pxr.PhenotypeId " "INNER JOIN PublishFreeze AS pf ON pxr.InbredSetId=pf.InbredSetId " "INNER JOIN InbredSet AS ist ON pf.InbredSetId=ist.Id " - "WHERE pxr.InbredSetId=%s AND pf.Id=%s") + ( - f" LIMIT {limit} OFFSET {offset}" if bool(limit) else "") + "WHERE pxr.InbredSetId=%s AND pf.Id=%s") + + _narrow_by_ids + + _narrow_by_limit) with conn.cursor(cursorclass=DictCursor) as cursor: - cursor.execute(_query, (population_id, dataset_id)) + cursor.execute(_pub_query, (population_id,) + xref_ids) debug_query(cursor, logger) - return tuple(dict(row) for row in cursor.fetchall()) + _pubs = {row["Id"]: dict(row) for row in cursor.fetchall()} + cursor.execute(_pheno_query, (population_id, dataset_id) + xref_ids) + debug_query(cursor, logger) + return tuple({**dict(row), "publication": _pubs[row["PublicationId"]]} + for row in cursor.fetchall()) def __phenotype_se__(cursor: BaseCursor, xref_id, dataids_and_strainids): @@ -536,6 +557,11 @@ def quick_save_phenotypes_data( return _count +def __sleep_random__(): + """Sleep a random amount of time chosen from 0.05s to 1s in increments of 0.05""" + time.sleep(random.choice(tuple(i / 20.0 for i in range(1, 21)))) + + def delete_phenotypes_data( cursor: BaseCursor, data_ids: tuple[int, ...] @@ -544,17 +570,42 @@ def delete_phenotypes_data( if len(data_ids) == 0: return (0, 0, 0) - _paramstr = ", ".join(["%s"] * len(data_ids)) - cursor.execute(f"DELETE FROM PublishData WHERE Id IN ({_paramstr})", - data_ids) - _dcount = cursor.rowcount - - cursor.execute(f"DELETE FROM PublishSE WHERE DataId IN ({_paramstr})", - data_ids) - _secount = cursor.rowcount - cursor.execute(f"DELETE FROM NStrain WHERE DataId IN ({_paramstr})", - data_ids) - _ncount = cursor.rowcount + # Loop to handle big deletes i.e. ≥ 10000 rows + _dcount, _secount, _ncount = (0, 0, 0)# Count total rows deleted + while True: + _paramstr = ", ".join(["%s"] * len(data_ids)) + cursor.execute( + "DELETE FROM PublishData " + f"WHERE Id IN ({_paramstr}) " + "ORDER BY Id ASC, StrainId ASC "# Make deletions deterministic + "LIMIT 1000", + data_ids) + _dcount_curr = cursor.rowcount + _dcount += _dcount_curr + + cursor.execute( + "DELETE FROM PublishSE " + f"WHERE DataId IN ({_paramstr}) " + "ORDER BY DataId ASC, StrainId ASC "# Make deletions deterministic + "LIMIT 1000", + data_ids) + _secount_curr = cursor.rowcount + _secount += _secount_curr + + cursor.execute( + "DELETE FROM NStrain " + f"WHERE DataId IN ({_paramstr}) " + "ORDER BY DataId ASC, StrainId ASC "# Make deletions deterministic + "LIMIT 1000", + data_ids) + _ncount_curr = cursor.rowcount + _ncount += _ncount_curr + __sleep_random__() + + if all((_dcount_curr == 0, _secount_curr == 0, _ncount_curr == 0)): + # end loop if there are no more rows to delete. + break + return (_dcount, _secount, _ncount) @@ -583,17 +634,41 @@ def delete_phenotypes( def __delete_phenos__(cursor: BaseCursor, pheno_ids: tuple[int, ...]) -> int: """Delete data from the `Phenotype` table.""" _paramstr = ", ".join(["%s"] * len(pheno_ids)) - cursor.execute("DELETE FROM Phenotype " - f"WHERE Id IN ({_paramstr})", - pheno_ids) + + _pcount = 0 + while True: + cursor.execute( + "DELETE FROM Phenotype " + f"WHERE Id IN ({_paramstr}) " + "ORDER BY Id " + "LIMIT 1000", + pheno_ids) + _pcount_curr = cursor.rowcount + _pcount += _pcount_curr + __sleep_random__() + if _pcount_curr == 0: + break + return cursor.rowcount def __delete_xrefs__(cursor: BaseCursor) -> int: _paramstr = ", ".join(["%s"] * len(xref_ids)) - cursor.execute("DELETE FROM PublishXRef " - f"WHERE InbredSetId=%s AND Id IN ({_paramstr})", - (population_id,) + xref_ids) - return cursor.rowcount + + _xcount = 0 + while True: + cursor.execute( + "DELETE FROM PublishXRef " + f"WHERE InbredSetId=%s AND Id IN ({_paramstr}) " + "ORDER BY Id " + "LIMIT 10000", + (population_id,) + xref_ids) + _xcount_curr = cursor.rowcount + _xcount += _xcount_curr + __sleep_random__() + if _xcount_curr == 0: + break + + return _xcount def __with_cursor__(cursor): _phenoids, _pubids, _dataids = reduce( diff --git a/uploader/phenotypes/views.py b/uploader/phenotypes/views.py index f7d8e55..c03f3f5 100644 --- a/uploader/phenotypes/views.py +++ b/uploader/phenotypes/views.py @@ -1,4 +1,6 @@ """Views handling ('classical') phenotypes."""# pylint: disable=[too-many-lines] +import io +import csv import sys import uuid import json @@ -21,12 +23,14 @@ from gn_libs import jobs as gnlibs_jobs from gn_libs.jobs.jobs import JobNotFound from gn_libs.mysqldb import database_connection +from werkzeug.datastructures import Headers from flask import (flash, request, jsonify, redirect, Blueprint, - current_app as app) + current_app as app, + Response as FlaskResponse) from r_qtl import r_qtl2_qc as rqc from r_qtl import exceptions as rqe @@ -34,6 +38,7 @@ from r_qtl import exceptions as rqe from uploader import jobs from uploader import session from uploader.files import save_file +from uploader.configutils import uploads_dir from uploader.flask_extensions import url_for from uploader.ui import make_template_renderer from uploader.oauth2.client import oauth2_post @@ -312,6 +317,11 @@ def create_dataset(species: dict, population: dict, **kwargs):# pylint: disable= dataset_shortname = ( form["dataset-shortname"] or form["dataset-name"]).strip() _pheno_dataset = save_new_dataset( + # It's not necessary to update the authorisation server to register + # new phenotype resource here, since each phenotype trait can, in + # theory, have its own access control allowing/disallowing access to + # it. In practice, however, we tend to gather multiple traits into a + # single resource for access control. cursor, population["Id"], form["dataset-name"].strip(), @@ -329,7 +339,7 @@ def process_phenotypes_rqtl2_bundle(error_uri): try: ## Handle huge files here... phenobundle = save_file(request.files["phenotypes-bundle"], - Path(app.config["UPLOAD_FOLDER"])) + uploads_dir(app)) rqc.validate_bundle(phenobundle) return phenobundle except AssertionError as _aerr: @@ -352,7 +362,7 @@ def process_phenotypes_individual_files(error_uri): "comment.char": form["file-comment-character"], "na.strings": form["file-na"].split(" "), } - bundlepath = Path(app.config["UPLOAD_FOLDER"], + bundlepath = Path(uploads_dir(app), f"{str(uuid.uuid4()).replace('-', '')}.zip") with ZipFile(bundlepath,mode="w") as zfile: for rqtlkey, formkey, _type in ( @@ -370,7 +380,7 @@ def process_phenotypes_individual_files(error_uri): # Chunked upload of large files was used filedata = json.loads(form[formkey]) zfile.write( - Path(app.config["UPLOAD_FOLDER"], filedata["uploaded-file"]), + Path(uploads_dir(app), filedata["uploaded-file"]), arcname=filedata["original-name"]) cdata[rqtlkey] = cdata.get(rqtlkey, []) + [filedata["original-name"]] else: @@ -382,9 +392,9 @@ def process_phenotypes_individual_files(error_uri): return error_uri filepath = save_file( - _sentfile, Path(app.config["UPLOAD_FOLDER"]), hashed=False) + _sentfile, uploads_dir(app), hashed=False) zfile.write( - Path(app.config["UPLOAD_FOLDER"], filepath), + Path(uploads_dir(app), filepath), arcname=filepath.name) cdata[rqtlkey] = cdata.get(rqtlkey, []) + [filepath.name] @@ -464,7 +474,7 @@ def add_phenotypes(species: dict, population: dict, dataset: dict, **kwargs):# p **({"publicationid": request.form["publication-id"]} if request.form.get("publication-id") else {})})}), _redisuri, - f"{app.config['UPLOAD_FOLDER']}/job_errors") + f"{uploads_dir(app)}/job_errors") app.logger.debug("JOB DETAILS: %s", _job) jobstatusuri = url_for("species.populations.phenotypes.job_status", @@ -521,6 +531,65 @@ def job_status( @phenotypesbp.route( "<int:species_id>/populations/<int:population_id>/phenotypes/datasets" + "/<int:dataset_id>/job/<uuid:job_id>/download-errors", + methods=["GET"]) +@require_login +@with_dataset( + species_redirect_uri="species.populations.phenotypes.index", + population_redirect_uri="species.populations.phenotypes.select_population", + redirect_uri="species.populations.phenotypes.list_datasets") +def download_errors( + species: dict, + population: dict, + dataset: dict, + job_id: uuid.UUID, + **kwargs):# pylint: disable=[unused-argument] + """Download the list of errors as a CSV file.""" + with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn: + try: + job = jobs.job(rconn, jobs.jobsnamespace(), str(job_id)) + _prefix_ = jobs.jobsnamespace() + _jobid_ = job['jobid'] + def __generate_chunks__(): + _errors_ = ( + json.loads(error) + for key in rconn.keys( + f"{_prefix_}:{str(_jobid_)}:*:errors:*") + for error in rconn.lrange(key, 0, -1)) + _chunk_no_ = 0 + _all_errors_printed_ = False + while not _all_errors_printed_: + _chunk_ = [] + try: + for _ in range(0, 1000): + _chunk_.append(next(_errors_)) + except StopIteration: + _all_errors_printed_ = True + if len(_chunk_) <= 0: + raise + + _out_ = io.StringIO() + _writer_ = csv.DictWriter(_out_, fieldnames=tuple(_chunk_[0].keys())) + if _chunk_no_ == 0: + _writer_.writeheader() + _writer_.writerows(_chunk_) + _chunk_no_ += 1 + yield _out_.getvalue() + if _all_errors_printed_: + return + + headers = Headers() + headers.set("Content-Disposition", + "attachment", + filename=f"{job['job-type']}_{job['jobid']}.csv") + return FlaskResponse( + __generate_chunks__(), mimetype="text/csv", headers=headers) + except jobs.JobNotFound as _jnf: + return render_template("jobs/job-not-found.html", job_id=job_id) + + +@phenotypesbp.route( + "<int:species_id>/populations/<int:population_id>/phenotypes/datasets" "/<int:dataset_id>/job/<uuid:job_id>/review", methods=["GET"]) @require_login @@ -598,6 +667,8 @@ def review_job_data( conn, int(_job_metadata["publicationid"])) if _job_metadata.get("publicationid") else None), + user=session.user_details(), + timestamp=datetime.datetime.now().isoformat(), activelink="add-phenotypes") @@ -611,6 +682,12 @@ def load_phenotypes_success_handler(job): job_id=job["job_id"])) +def proceed_to_job_status(job): + """A generic 'job success' handler for asynchronous phenotype jobs.""" + app.logger.debug("The new job: %s", job) + return redirect(url_for("background-jobs.job_status", job_id=job["job_id"])) + + @phenotypesbp.route( "<int:species_id>/populations/<int:population_id>/phenotypes/datasets" "/<int:dataset_id>/load-data-to-database", @@ -653,11 +730,6 @@ def load_data_to_database( def __handle_error__(resp): return render_template("http-error.html", *resp.json()) - def __handle_success__(load_job): - app.logger.debug("The phenotypes loading job: %s", load_job) - return redirect(url_for( - "background-jobs.job_status", job_id=load_job["job_id"])) - return request_token( token_uri=urljoin(oauth2client.authserver_uri(), "auth/token"), @@ -676,19 +748,25 @@ def load_data_to_database( "publication_id": _meta["publicationid"], "authserver": oauth2client.authserver_uri(), "token": token["access_token"], + "dataname": request.form["data_name"].strip(), "success_handler": ( "uploader.phenotypes.views" - ".load_phenotypes_success_handler") + ".load_phenotypes_success_handler"), + **{ + key: request.form[key] + for key in ("data_description",) + if key in request.form.keys() + } }, external_id=session.logged_in_user_id()) ).then( lambda job: gnlibs_jobs.launch_job( job, _jobs_db, - Path(f"{app.config['UPLOAD_FOLDER']}/job_errors"), + Path(f"{uploads_dir(app)}/job_errors"), worker_manager="gn_libs.jobs.launcher", loglevel=_loglevel) - ).either(__handle_error__, __handle_success__) + ).either(__handle_error__, proceed_to_job_status) def update_phenotype_metadata(conn, metadata: dict): @@ -1063,7 +1141,7 @@ def recompute_means(# pylint: disable=[unused-argument] }, external_id=session.logged_in_user_id()), _jobs_db, - Path(f"{app.config['UPLOAD_FOLDER']}/job_errors"), + Path(f"{uploads_dir(app)}/job_errors"), worker_manager="gn_libs.jobs.launcher", loglevel=_loglevel) return redirect(url_for("background-jobs.job_status", @@ -1105,7 +1183,7 @@ def rerun_qtlreaper(# pylint: disable=[unused-argument] _job_id = uuid.uuid4() _loglevel = logging.getLevelName(app.logger.getEffectiveLevel()).lower() - _workingdir = Path(app.config["TEMPORARY_DIRECTORY"]).joinpath("qtlreaper") + _workingdir = Path(app.config["SCRATCH_DIRECTORY"]).joinpath("qtlreaper") _workingdir.mkdir(exist_ok=True) command = [ sys.executable, @@ -1143,7 +1221,7 @@ def rerun_qtlreaper(# pylint: disable=[unused-argument] }, external_id=session.logged_in_user_id()), _jobs_db, - Path(f"{app.config['UPLOAD_FOLDER']}/job_errors"), + Path(f"{uploads_dir(app)}/job_errors"), worker_manager="gn_libs.jobs.launcher", loglevel=_loglevel) return redirect(url_for("background-jobs.job_status", @@ -1157,6 +1235,12 @@ def rerun_qtlreaper_success_handler(job): return return_to_dataset_view_handler(job, "QTLReaper ran successfully!") +def delete_phenotypes_success_handler(job): + """Handle success running the 'delete-phenotypes' script.""" + return return_to_dataset_view_handler( + job, "Phenotypes deleted successfully.") + + @phenotypesbp.route( "<int:species_id>/populations/<int:population_id>/phenotypes/datasets" "/<int:dataset_id>/delete", @@ -1166,22 +1250,102 @@ def rerun_qtlreaper_success_handler(job): species_redirect_uri="species.populations.phenotypes.index", population_redirect_uri="species.populations.phenotypes.select_population", redirect_uri="species.populations.phenotypes.list_datasets") -def delete_phenotypes(# pylint: disable=[unused-argument] +def delete_phenotypes(# pylint: disable=[unused-argument, too-many-locals] species: dict, population: dict, dataset: dict, **kwargs ): """Delete the specified phenotype data.""" - if request.form.get("confirm", "").lower() == "confirm": - return f"Would delete! {request.form}" + _dataset_page = redirect(url_for( + "species.populations.phenotypes.view_dataset", + species_id=species["SpeciesId"], + population_id=population["Id"], + dataset_id=dataset["Id"])) - with database_connection(app.config["SQL_URI"]) as conn: - xref_ids = tuple( - int(item) for item in set(request.form.getlist("xref_ids"))) - return render_template( - "phenotypes/confirm-delete-phenotypes.html", - species=species, - population=population, - dataset=dataset, - phenotypes=xref_ids) + def __handle_error__(resp): + flash( + "Error retrieving authorisation token. Phenotype deletion " + "failed. Please try again later.", + "alert alert-danger") + return _dataset_page + + _jobs_db = app.config["ASYNCHRONOUS_JOBS_SQLITE_DB"] + with (database_connection(app.config["SQL_URI"]) as conn, + sqlite3.connection(_jobs_db) as jobsconn): + form = request.form + xref_ids = tuple(int(item) for item in set(form.getlist("xref_ids"))) + + match form.get("action"): + case "cancel": + return redirect(url_for( + "species.populations.phenotypes.view_dataset", + species_id=species["SpeciesId"], + population_id=population["Id"], + dataset_id=dataset["Id"])) + case "delete": + _loglevel = logging.getLevelName( + app.logger.getEffectiveLevel()).lower() + if form.get("confirm_delete_all_phenotypes", "") == "on": + _cmd = ["--delete-all"] + else: + # setup phenotypes xref_ids file + _xref_ids_file = Path( + app.config["SCRATCH_DIRECTORY"], + f"delete-phenotypes-{uuid.uuid4()}.txt") + with _xref_ids_file.open(mode="w", encoding="utf8") as ptr: + ptr.write("\n".join(str(_id) for _id in xref_ids)) + + _cmd = ["--xref_ids_file", str(_xref_ids_file)] + + _job_id = uuid.uuid4() + return request_token( + token_uri=urljoin( + oauth2client.authserver_uri(), "auth/token"), + user_id=session.user_details()["user_id"] + ).then( + lambda token: gnlibs_jobs.initialise_job( + jobsconn, + _job_id, + [ + sys.executable, + "-u", + "-m", + "scripts.phenotypes.delete_phenotypes", + "--log-level", _loglevel, + app.config["SQL_URI"], + str(species["SpeciesId"]), + str(population["Id"]), + str(dataset["Id"]), + app.config["AUTH_SERVER_URL"], + token["access_token"]] + _cmd, + "delete-phenotypes", + extra_meta={ + "species_id": species["SpeciesId"], + "population_id": population["Id"], + "dataset_id": dataset["Id"], + "success_handler": ( + "uploader.phenotypes.views." + "delete_phenotypes_success_handler") + }, + external_id=session.logged_in_user_id()) + ).then( + lambda _job: gnlibs_jobs.launch_job( + _job, + _jobs_db, + Path(f"{uploads_dir(app)}/job_errors"), + worker_manager="gn_libs.jobs.launcher", + loglevel=_loglevel) + ).either(__handle_error__, proceed_to_job_status) + case _: + _phenos: tuple[dict, ...] = tuple() + if len(xref_ids) > 0: + _phenos = dataset_phenotypes( + conn, population["Id"], dataset["Id"], xref_ids=xref_ids) + + return render_template( + "phenotypes/confirm-delete-phenotypes.html", + species=species, + population=population, + dataset=dataset, + phenotypes=_phenos) |
