about summary refs log tree commit diff
path: root/uploader/phenotypes
diff options
context:
space:
mode:
Diffstat (limited to 'uploader/phenotypes')
-rw-r--r--uploader/phenotypes/models.py133
-rw-r--r--uploader/phenotypes/views.py224
2 files changed, 298 insertions, 59 deletions
diff --git a/uploader/phenotypes/models.py b/uploader/phenotypes/models.py
index 04abcc9..3d656d2 100644
--- a/uploader/phenotypes/models.py
+++ b/uploader/phenotypes/models.py
@@ -1,4 +1,6 @@
 """Database and utility functions for phenotypes."""
+import time
+import random
 import logging
 import tempfile
 from pathlib import Path
@@ -85,24 +87,43 @@ def phenotype_publication_data(conn, phenotype_id) -> Optional[dict]:
         return dict(res)
 
 
-def dataset_phenotypes(conn: Connection,
-                       population_id: int,
-                       dataset_id: int,
-                       offset: int = 0,
-                       limit: Optional[int] = None) -> tuple[dict, ...]:
+def dataset_phenotypes(# pylint: disable=[too-many-arguments, too-many-positional-arguments]
+        conn: Connection,
+        population_id: int,
+        dataset_id: int,
+        offset: int = 0,
+        limit: Optional[int] = None,
+        xref_ids: tuple[int, ...] = tuple()
+) -> tuple[dict, ...]:
     """Fetch the actual phenotypes."""
-    _query = (
-        "SELECT pheno.*, pxr.Id AS xref_id, pxr.InbredSetId, ist.InbredSetCode "
+    _narrow_by_ids = (
+            f" AND pxr.Id IN ({', '.join(['%s'] * len(xref_ids))})"
+            if len(xref_ids) > 0 else "")
+    _narrow_by_limit = (
+        f" LIMIT {limit} OFFSET {offset}" if bool(limit) else "")
+    _pub_query = (
+        "SELECT pub.* "
+        "FROM PublishXRef AS pxr "
+        "INNER JOIN  Publication AS pub ON pxr.PublicationId=pub.Id "
+        "WHERE pxr.InbredSetId=%s") + _narrow_by_ids
+    _pheno_query = ((
+        "SELECT pheno.*, pxr.Id AS xref_id, pxr.InbredSetId, pxr.PublicationId, "
+        "ist.InbredSetCode "
         "FROM Phenotype AS pheno "
         "INNER JOIN PublishXRef AS pxr ON pheno.Id=pxr.PhenotypeId "
         "INNER JOIN PublishFreeze AS pf ON pxr.InbredSetId=pf.InbredSetId "
         "INNER JOIN InbredSet AS ist ON pf.InbredSetId=ist.Id "
-        "WHERE pxr.InbredSetId=%s AND pf.Id=%s") + (
-            f" LIMIT {limit} OFFSET {offset}" if bool(limit) else "")
+        "WHERE pxr.InbredSetId=%s AND pf.Id=%s") +
+                    _narrow_by_ids +
+                    _narrow_by_limit)
     with conn.cursor(cursorclass=DictCursor) as cursor:
-        cursor.execute(_query, (population_id, dataset_id))
+        cursor.execute(_pub_query, (population_id,) + xref_ids)
         debug_query(cursor, logger)
-        return tuple(dict(row) for row in cursor.fetchall())
+        _pubs = {row["Id"]: dict(row) for row in cursor.fetchall()}
+        cursor.execute(_pheno_query, (population_id, dataset_id) + xref_ids)
+        debug_query(cursor, logger)
+        return tuple({**dict(row), "publication": _pubs[row["PublicationId"]]}
+                     for row in cursor.fetchall())
 
 
 def __phenotype_se__(cursor: BaseCursor, xref_id, dataids_and_strainids):
@@ -536,6 +557,11 @@ def quick_save_phenotypes_data(
         return _count
 
 
+def __sleep_random__():
+    """Sleep a random amount of time chosen from 0.05s to 1s in increments of 0.05"""
+    time.sleep(random.choice(tuple(i / 20.0 for i in range(1, 21))))
+
+
 def delete_phenotypes_data(
         cursor: BaseCursor,
         data_ids: tuple[int, ...]
@@ -544,17 +570,42 @@ def delete_phenotypes_data(
     if len(data_ids) == 0:
         return (0, 0, 0)
 
-    _paramstr = ", ".join(["%s"] * len(data_ids))
-    cursor.execute(f"DELETE FROM PublishData WHERE Id IN ({_paramstr})",
-                   data_ids)
-    _dcount = cursor.rowcount
-
-    cursor.execute(f"DELETE FROM PublishSE WHERE DataId IN ({_paramstr})",
-                   data_ids)
-    _secount = cursor.rowcount
-    cursor.execute(f"DELETE FROM NStrain WHERE DataId IN ({_paramstr})",
-                   data_ids)
-    _ncount = cursor.rowcount
+    # Loop to handle big deletes i.e. ≥ 10000 rows
+    _dcount, _secount, _ncount = (0, 0, 0)# Count total rows deleted
+    while True:
+        _paramstr = ", ".join(["%s"] * len(data_ids))
+        cursor.execute(
+            "DELETE FROM PublishData "
+            f"WHERE Id IN ({_paramstr}) "
+            "ORDER BY Id ASC, StrainId ASC "# Make deletions deterministic
+            "LIMIT 1000",
+            data_ids)
+        _dcount_curr = cursor.rowcount
+        _dcount += _dcount_curr
+
+        cursor.execute(
+            "DELETE FROM PublishSE "
+            f"WHERE DataId IN ({_paramstr}) "
+            "ORDER BY DataId ASC, StrainId ASC "# Make deletions deterministic
+            "LIMIT 1000",
+            data_ids)
+        _secount_curr = cursor.rowcount
+        _secount += _secount_curr
+
+        cursor.execute(
+            "DELETE FROM NStrain "
+            f"WHERE DataId IN ({_paramstr}) "
+            "ORDER BY DataId ASC, StrainId ASC "# Make deletions deterministic
+            "LIMIT 1000",
+            data_ids)
+        _ncount_curr = cursor.rowcount
+        _ncount += _ncount_curr
+        __sleep_random__()
+
+        if all((_dcount_curr == 0, _secount_curr == 0, _ncount_curr == 0)):
+            # end loop if there are no more rows to delete.
+            break
+
     return (_dcount, _secount, _ncount)
 
 
@@ -583,17 +634,41 @@ def delete_phenotypes(
     def __delete_phenos__(cursor: BaseCursor, pheno_ids: tuple[int, ...]) -> int:
         """Delete data from the `Phenotype` table."""
         _paramstr = ", ".join(["%s"] * len(pheno_ids))
-        cursor.execute("DELETE FROM Phenotype "
-                       f"WHERE Id IN ({_paramstr})",
-                       pheno_ids)
+
+        _pcount = 0
+        while True:
+            cursor.execute(
+                "DELETE FROM Phenotype "
+                f"WHERE Id IN ({_paramstr}) "
+                "ORDER BY Id "
+                "LIMIT 1000",
+                pheno_ids)
+            _pcount_curr = cursor.rowcount
+            _pcount += _pcount_curr
+            __sleep_random__()
+            if _pcount_curr == 0:
+                break
+
         return cursor.rowcount
 
     def __delete_xrefs__(cursor: BaseCursor) -> int:
         _paramstr = ", ".join(["%s"] * len(xref_ids))
-        cursor.execute("DELETE FROM PublishXRef "
-                       f"WHERE InbredSetId=%s AND Id IN ({_paramstr})",
-                       (population_id,) + xref_ids)
-        return cursor.rowcount
+
+        _xcount = 0
+        while True:
+            cursor.execute(
+                "DELETE FROM PublishXRef "
+                f"WHERE InbredSetId=%s AND Id IN ({_paramstr}) "
+                "ORDER BY Id "
+                "LIMIT 10000",
+                (population_id,) + xref_ids)
+            _xcount_curr = cursor.rowcount
+            _xcount += _xcount_curr
+            __sleep_random__()
+            if _xcount_curr == 0:
+                break
+
+        return _xcount
 
     def __with_cursor__(cursor):
         _phenoids, _pubids, _dataids = reduce(
diff --git a/uploader/phenotypes/views.py b/uploader/phenotypes/views.py
index f7d8e55..c03f3f5 100644
--- a/uploader/phenotypes/views.py
+++ b/uploader/phenotypes/views.py
@@ -1,4 +1,6 @@
 """Views handling ('classical') phenotypes."""# pylint: disable=[too-many-lines]
+import io
+import csv
 import sys
 import uuid
 import json
@@ -21,12 +23,14 @@ from gn_libs import jobs as gnlibs_jobs
 from gn_libs.jobs.jobs import JobNotFound
 from gn_libs.mysqldb import database_connection
 
+from werkzeug.datastructures import Headers
 from flask import (flash,
                    request,
                    jsonify,
                    redirect,
                    Blueprint,
-                   current_app as app)
+                   current_app as app,
+                   Response as FlaskResponse)
 
 from r_qtl import r_qtl2_qc as rqc
 from r_qtl import exceptions as rqe
@@ -34,6 +38,7 @@ from r_qtl import exceptions as rqe
 from uploader import jobs
 from uploader import session
 from uploader.files import save_file
+from uploader.configutils import uploads_dir
 from uploader.flask_extensions import url_for
 from uploader.ui import make_template_renderer
 from uploader.oauth2.client import oauth2_post
@@ -312,6 +317,11 @@ def create_dataset(species: dict, population: dict, **kwargs):# pylint: disable=
         dataset_shortname = (
             form["dataset-shortname"] or form["dataset-name"]).strip()
         _pheno_dataset = save_new_dataset(
+            # It's not necessary to update the authorisation server to register
+            # new phenotype resource here, since each phenotype trait can, in
+            # theory, have its own access control allowing/disallowing access to
+            # it. In practice, however, we tend to gather multiple traits into a
+            # single resource for access control.
             cursor,
             population["Id"],
             form["dataset-name"].strip(),
@@ -329,7 +339,7 @@ def process_phenotypes_rqtl2_bundle(error_uri):
     try:
         ## Handle huge files here...
         phenobundle = save_file(request.files["phenotypes-bundle"],
-                                Path(app.config["UPLOAD_FOLDER"]))
+                                uploads_dir(app))
         rqc.validate_bundle(phenobundle)
         return phenobundle
     except AssertionError as _aerr:
@@ -352,7 +362,7 @@ def process_phenotypes_individual_files(error_uri):
         "comment.char": form["file-comment-character"],
         "na.strings": form["file-na"].split(" "),
     }
-    bundlepath = Path(app.config["UPLOAD_FOLDER"],
+    bundlepath = Path(uploads_dir(app),
                       f"{str(uuid.uuid4()).replace('-', '')}.zip")
     with ZipFile(bundlepath,mode="w") as zfile:
         for rqtlkey, formkey, _type in (
@@ -370,7 +380,7 @@ def process_phenotypes_individual_files(error_uri):
                 # Chunked upload of large files was used
                 filedata = json.loads(form[formkey])
                 zfile.write(
-                    Path(app.config["UPLOAD_FOLDER"], filedata["uploaded-file"]),
+                    Path(uploads_dir(app), filedata["uploaded-file"]),
                     arcname=filedata["original-name"])
                 cdata[rqtlkey] = cdata.get(rqtlkey, []) + [filedata["original-name"]]
             else:
@@ -382,9 +392,9 @@ def process_phenotypes_individual_files(error_uri):
                     return error_uri
 
                 filepath = save_file(
-                    _sentfile, Path(app.config["UPLOAD_FOLDER"]), hashed=False)
+                    _sentfile, uploads_dir(app), hashed=False)
                 zfile.write(
-                    Path(app.config["UPLOAD_FOLDER"], filepath),
+                    Path(uploads_dir(app), filepath),
                     arcname=filepath.name)
                 cdata[rqtlkey] = cdata.get(rqtlkey, []) + [filepath.name]
 
@@ -464,7 +474,7 @@ def add_phenotypes(species: dict, population: dict, dataset: dict, **kwargs):# p
                     **({"publicationid": request.form["publication-id"]}
                        if request.form.get("publication-id") else {})})}),
             _redisuri,
-            f"{app.config['UPLOAD_FOLDER']}/job_errors")
+            f"{uploads_dir(app)}/job_errors")
 
         app.logger.debug("JOB DETAILS: %s", _job)
         jobstatusuri = url_for("species.populations.phenotypes.job_status",
@@ -521,6 +531,65 @@ def job_status(
 
 @phenotypesbp.route(
     "<int:species_id>/populations/<int:population_id>/phenotypes/datasets"
+    "/<int:dataset_id>/job/<uuid:job_id>/download-errors",
+    methods=["GET"])
+@require_login
+@with_dataset(
+    species_redirect_uri="species.populations.phenotypes.index",
+    population_redirect_uri="species.populations.phenotypes.select_population",
+    redirect_uri="species.populations.phenotypes.list_datasets")
+def download_errors(
+        species: dict,
+        population: dict,
+        dataset: dict,
+        job_id: uuid.UUID,
+        **kwargs):# pylint: disable=[unused-argument]
+    """Download the list of errors as a CSV file."""
+    with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn:
+        try:
+            job = jobs.job(rconn, jobs.jobsnamespace(), str(job_id))
+            _prefix_ = jobs.jobsnamespace()
+            _jobid_ = job['jobid']
+            def __generate_chunks__():
+                _errors_ = (
+                    json.loads(error)
+                    for key in rconn.keys(
+                            f"{_prefix_}:{str(_jobid_)}:*:errors:*")
+                    for error in rconn.lrange(key, 0, -1))
+                _chunk_no_ = 0
+                _all_errors_printed_ = False
+                while not _all_errors_printed_:
+                    _chunk_ = []
+                    try:
+                        for _ in range(0, 1000):
+                            _chunk_.append(next(_errors_))
+                    except StopIteration:
+                        _all_errors_printed_ = True
+                        if len(_chunk_) <= 0:
+                            raise
+
+                    _out_ = io.StringIO()
+                    _writer_ = csv.DictWriter(_out_, fieldnames=tuple(_chunk_[0].keys()))
+                    if _chunk_no_ == 0:
+                        _writer_.writeheader()
+                    _writer_.writerows(_chunk_)
+                    _chunk_no_ += 1
+                    yield _out_.getvalue()
+                    if _all_errors_printed_:
+                        return
+
+            headers = Headers()
+            headers.set("Content-Disposition",
+                        "attachment",
+                        filename=f"{job['job-type']}_{job['jobid']}.csv")
+            return FlaskResponse(
+                __generate_chunks__(), mimetype="text/csv", headers=headers)
+        except jobs.JobNotFound as _jnf:
+            return render_template("jobs/job-not-found.html", job_id=job_id)
+
+
+@phenotypesbp.route(
+    "<int:species_id>/populations/<int:population_id>/phenotypes/datasets"
     "/<int:dataset_id>/job/<uuid:job_id>/review",
     methods=["GET"])
 @require_login
@@ -598,6 +667,8 @@ def review_job_data(
                                        conn, int(_job_metadata["publicationid"]))
                                    if _job_metadata.get("publicationid")
                                    else None),
+                               user=session.user_details(),
+                               timestamp=datetime.datetime.now().isoformat(),
                                activelink="add-phenotypes")
 
 
@@ -611,6 +682,12 @@ def load_phenotypes_success_handler(job):
         job_id=job["job_id"]))
 
 
+def proceed_to_job_status(job):
+    """A generic 'job success' handler for asynchronous phenotype jobs."""
+    app.logger.debug("The new job: %s", job)
+    return redirect(url_for("background-jobs.job_status", job_id=job["job_id"]))
+
+
 @phenotypesbp.route(
     "<int:species_id>/populations/<int:population_id>/phenotypes/datasets"
     "/<int:dataset_id>/load-data-to-database",
@@ -653,11 +730,6 @@ def load_data_to_database(
         def __handle_error__(resp):
             return render_template("http-error.html", *resp.json())
 
-        def __handle_success__(load_job):
-            app.logger.debug("The phenotypes loading job: %s", load_job)
-            return redirect(url_for(
-                "background-jobs.job_status", job_id=load_job["job_id"]))
-
 
         return request_token(
             token_uri=urljoin(oauth2client.authserver_uri(), "auth/token"),
@@ -676,19 +748,25 @@ def load_data_to_database(
                     "publication_id": _meta["publicationid"],
                     "authserver": oauth2client.authserver_uri(),
                     "token": token["access_token"],
+                    "dataname": request.form["data_name"].strip(),
                     "success_handler": (
                         "uploader.phenotypes.views"
-                        ".load_phenotypes_success_handler")
+                        ".load_phenotypes_success_handler"),
+                    **{
+                        key: request.form[key]
+                        for key in ("data_description",)
+                        if key in request.form.keys()
+                    }
                 },
                 external_id=session.logged_in_user_id())
         ).then(
             lambda job: gnlibs_jobs.launch_job(
                 job,
                 _jobs_db,
-                Path(f"{app.config['UPLOAD_FOLDER']}/job_errors"),
+                Path(f"{uploads_dir(app)}/job_errors"),
                 worker_manager="gn_libs.jobs.launcher",
                 loglevel=_loglevel)
-        ).either(__handle_error__, __handle_success__)
+        ).either(__handle_error__, proceed_to_job_status)
 
 
 def update_phenotype_metadata(conn, metadata: dict):
@@ -1063,7 +1141,7 @@ def recompute_means(# pylint: disable=[unused-argument]
                 },
                 external_id=session.logged_in_user_id()),
             _jobs_db,
-            Path(f"{app.config['UPLOAD_FOLDER']}/job_errors"),
+            Path(f"{uploads_dir(app)}/job_errors"),
             worker_manager="gn_libs.jobs.launcher",
             loglevel=_loglevel)
         return redirect(url_for("background-jobs.job_status",
@@ -1105,7 +1183,7 @@ def rerun_qtlreaper(# pylint: disable=[unused-argument]
     _job_id = uuid.uuid4()
     _loglevel = logging.getLevelName(app.logger.getEffectiveLevel()).lower()
 
-    _workingdir = Path(app.config["TEMPORARY_DIRECTORY"]).joinpath("qtlreaper")
+    _workingdir = Path(app.config["SCRATCH_DIRECTORY"]).joinpath("qtlreaper")
     _workingdir.mkdir(exist_ok=True)
     command = [
         sys.executable,
@@ -1143,7 +1221,7 @@ def rerun_qtlreaper(# pylint: disable=[unused-argument]
             },
             external_id=session.logged_in_user_id()),
             _jobs_db,
-            Path(f"{app.config['UPLOAD_FOLDER']}/job_errors"),
+            Path(f"{uploads_dir(app)}/job_errors"),
             worker_manager="gn_libs.jobs.launcher",
             loglevel=_loglevel)
         return redirect(url_for("background-jobs.job_status",
@@ -1157,6 +1235,12 @@ def rerun_qtlreaper_success_handler(job):
     return return_to_dataset_view_handler(job, "QTLReaper ran successfully!")
 
 
+def delete_phenotypes_success_handler(job):
+    """Handle success running the 'delete-phenotypes' script."""
+    return return_to_dataset_view_handler(
+        job, "Phenotypes deleted successfully.")
+
+
 @phenotypesbp.route(
     "<int:species_id>/populations/<int:population_id>/phenotypes/datasets"
     "/<int:dataset_id>/delete",
@@ -1166,22 +1250,102 @@ def rerun_qtlreaper_success_handler(job):
     species_redirect_uri="species.populations.phenotypes.index",
     population_redirect_uri="species.populations.phenotypes.select_population",
     redirect_uri="species.populations.phenotypes.list_datasets")
-def delete_phenotypes(# pylint: disable=[unused-argument]
+def delete_phenotypes(# pylint: disable=[unused-argument, too-many-locals]
         species: dict,
         population: dict,
         dataset: dict,
         **kwargs
 ):
     """Delete the specified phenotype data."""
-    if request.form.get("confirm", "").lower() == "confirm":
-        return f"Would delete! {request.form}"
+    _dataset_page = redirect(url_for(
+        "species.populations.phenotypes.view_dataset",
+        species_id=species["SpeciesId"],
+        population_id=population["Id"],
+        dataset_id=dataset["Id"]))
 
-    with database_connection(app.config["SQL_URI"]) as conn:
-        xref_ids = tuple(
-            int(item) for item in set(request.form.getlist("xref_ids")))
-        return render_template(
-            "phenotypes/confirm-delete-phenotypes.html",
-            species=species,
-            population=population,
-            dataset=dataset,
-            phenotypes=xref_ids)
+    def __handle_error__(resp):
+        flash(
+            "Error retrieving authorisation token. Phenotype deletion "
+            "failed. Please try again later.",
+            "alert alert-danger")
+        return _dataset_page
+
+    _jobs_db = app.config["ASYNCHRONOUS_JOBS_SQLITE_DB"]
+    with (database_connection(app.config["SQL_URI"]) as conn,
+          sqlite3.connection(_jobs_db) as jobsconn):
+        form = request.form
+        xref_ids = tuple(int(item) for item in set(form.getlist("xref_ids")))
+
+        match form.get("action"):
+            case "cancel":
+                return redirect(url_for(
+                    "species.populations.phenotypes.view_dataset",
+                    species_id=species["SpeciesId"],
+                    population_id=population["Id"],
+                    dataset_id=dataset["Id"]))
+            case "delete":
+                _loglevel = logging.getLevelName(
+                    app.logger.getEffectiveLevel()).lower()
+                if form.get("confirm_delete_all_phenotypes", "") == "on":
+                    _cmd = ["--delete-all"]
+                else:
+                    # setup phenotypes xref_ids file
+                    _xref_ids_file = Path(
+                        app.config["SCRATCH_DIRECTORY"],
+                        f"delete-phenotypes-{uuid.uuid4()}.txt")
+                    with _xref_ids_file.open(mode="w", encoding="utf8") as ptr:
+                        ptr.write("\n".join(str(_id) for _id in xref_ids))
+
+                    _cmd = ["--xref_ids_file", str(_xref_ids_file)]
+
+                _job_id = uuid.uuid4()
+                return request_token(
+                    token_uri=urljoin(
+                        oauth2client.authserver_uri(), "auth/token"),
+                    user_id=session.user_details()["user_id"]
+                ).then(
+                    lambda token: gnlibs_jobs.initialise_job(
+                        jobsconn,
+                        _job_id,
+                        [
+                            sys.executable,
+                            "-u",
+                            "-m",
+                            "scripts.phenotypes.delete_phenotypes",
+                            "--log-level", _loglevel,
+                            app.config["SQL_URI"],
+                            str(species["SpeciesId"]),
+                            str(population["Id"]),
+                            str(dataset["Id"]),
+                            app.config["AUTH_SERVER_URL"],
+                            token["access_token"]] + _cmd,
+                        "delete-phenotypes",
+                        extra_meta={
+                            "species_id": species["SpeciesId"],
+                            "population_id": population["Id"],
+                            "dataset_id": dataset["Id"],
+                            "success_handler": (
+                                "uploader.phenotypes.views."
+                                "delete_phenotypes_success_handler")
+                        },
+                        external_id=session.logged_in_user_id())
+                ).then(
+                    lambda _job: gnlibs_jobs.launch_job(
+                        _job,
+                        _jobs_db,
+                        Path(f"{uploads_dir(app)}/job_errors"),
+                        worker_manager="gn_libs.jobs.launcher",
+                        loglevel=_loglevel)
+                ).either(__handle_error__, proceed_to_job_status)
+            case _:
+                _phenos: tuple[dict, ...] = tuple()
+                if len(xref_ids) > 0:
+                    _phenos = dataset_phenotypes(
+                        conn, population["Id"], dataset["Id"], xref_ids=xref_ids)
+
+                return render_template(
+                    "phenotypes/confirm-delete-phenotypes.html",
+                    species=species,
+                    population=population,
+                    dataset=dataset,
+                    phenotypes=_phenos)