diff options
Diffstat (limited to 'uploader/phenotypes')
| -rw-r--r-- | uploader/phenotypes/misc.py | 2 | ||||
| -rw-r--r-- | uploader/phenotypes/models.py | 439 | ||||
| -rw-r--r-- | uploader/phenotypes/views.py | 685 |
3 files changed, 827 insertions, 299 deletions
diff --git a/uploader/phenotypes/misc.py b/uploader/phenotypes/misc.py index cbe3b7f..1924c07 100644 --- a/uploader/phenotypes/misc.py +++ b/uploader/phenotypes/misc.py @@ -8,7 +8,7 @@ def phenotypes_data_differences( filedata: tuple[dict, ...], dbdata: tuple[dict, ...] ) -> tuple[dict, ...]: """Compute differences between file data and db data""" - diff = tuple() + diff: tuple[dict, ...] = tuple() for filerow, dbrow in zip( sorted(filedata, key=lambda item: (item["phenotype_id"], item["xref_id"])), sorted(dbdata, key=lambda item: (item["PhenotypeId"], item["xref_id"]))): diff --git a/uploader/phenotypes/models.py b/uploader/phenotypes/models.py index 20b8e77..3d656d2 100644 --- a/uploader/phenotypes/models.py +++ b/uploader/phenotypes/models.py @@ -1,17 +1,20 @@ """Database and utility functions for phenotypes.""" +import time +import random import logging import tempfile from pathlib import Path from functools import reduce from datetime import datetime -from typing import Optional, Iterable +from typing import Union, Optional, Iterable -import MySQLdb as mdb -from MySQLdb.cursors import Cursor, DictCursor +from MySQLdb.connections import Connection +from MySQLdb.cursors import Cursor, DictCursor, BaseCursor -from functional_tools import take from gn_libs.mysqldb import debug_query +from functional_tools import take + logger = logging.getLogger(__name__) @@ -26,7 +29,7 @@ __PHENO_DATA_TABLES__ = { def datasets_by_population( - conn: mdb.Connection, + conn: Connection, species_id: int, population_id: int ) -> tuple[dict, ...]: @@ -41,7 +44,7 @@ def datasets_by_population( return tuple(dict(row) for row in cursor.fetchall()) -def dataset_by_id(conn: mdb.Connection, +def dataset_by_id(conn: Connection, species_id: int, population_id: int, dataset_id: int) -> dict: @@ -56,7 +59,7 @@ def dataset_by_id(conn: mdb.Connection, return dict(cursor.fetchone()) -def phenotypes_count(conn: mdb.Connection, +def phenotypes_count(conn: Connection, population_id: int, dataset_id: int) -> int: """Count the number of phenotypes in the dataset.""" @@ -84,26 +87,46 @@ def phenotype_publication_data(conn, phenotype_id) -> Optional[dict]: return dict(res) -def dataset_phenotypes(conn: mdb.Connection, - population_id: int, - dataset_id: int, - offset: int = 0, - limit: Optional[int] = None) -> tuple[dict, ...]: +def dataset_phenotypes(# pylint: disable=[too-many-arguments, too-many-positional-arguments] + conn: Connection, + population_id: int, + dataset_id: int, + offset: int = 0, + limit: Optional[int] = None, + xref_ids: tuple[int, ...] = tuple() +) -> tuple[dict, ...]: """Fetch the actual phenotypes.""" - _query = ( - "SELECT pheno.*, pxr.Id AS xref_id, pxr.InbredSetId, ist.InbredSetCode FROM Phenotype AS pheno " + _narrow_by_ids = ( + f" AND pxr.Id IN ({', '.join(['%s'] * len(xref_ids))})" + if len(xref_ids) > 0 else "") + _narrow_by_limit = ( + f" LIMIT {limit} OFFSET {offset}" if bool(limit) else "") + _pub_query = ( + "SELECT pub.* " + "FROM PublishXRef AS pxr " + "INNER JOIN Publication AS pub ON pxr.PublicationId=pub.Id " + "WHERE pxr.InbredSetId=%s") + _narrow_by_ids + _pheno_query = (( + "SELECT pheno.*, pxr.Id AS xref_id, pxr.InbredSetId, pxr.PublicationId, " + "ist.InbredSetCode " + "FROM Phenotype AS pheno " "INNER JOIN PublishXRef AS pxr ON pheno.Id=pxr.PhenotypeId " "INNER JOIN PublishFreeze AS pf ON pxr.InbredSetId=pf.InbredSetId " "INNER JOIN InbredSet AS ist ON pf.InbredSetId=ist.Id " - "WHERE pxr.InbredSetId=%s AND pf.Id=%s") + ( - f" LIMIT {limit} OFFSET {offset}" if bool(limit) else "") + "WHERE pxr.InbredSetId=%s AND pf.Id=%s") + + _narrow_by_ids + + _narrow_by_limit) with conn.cursor(cursorclass=DictCursor) as cursor: - cursor.execute(_query, (population_id, dataset_id)) + cursor.execute(_pub_query, (population_id,) + xref_ids) debug_query(cursor, logger) - return tuple(dict(row) for row in cursor.fetchall()) + _pubs = {row["Id"]: dict(row) for row in cursor.fetchall()} + cursor.execute(_pheno_query, (population_id, dataset_id) + xref_ids) + debug_query(cursor, logger) + return tuple({**dict(row), "publication": _pubs[row["PublicationId"]]} + for row in cursor.fetchall()) -def __phenotype_se__(cursor: Cursor, xref_id, dataids_and_strainids): +def __phenotype_se__(cursor: BaseCursor, xref_id, dataids_and_strainids): """Fetch standard-error values (if they exist) for a phenotype.""" paramstr = ", ".join(["(%s, %s)"] * len(dataids_and_strainids)) flat = tuple(item for sublist in dataids_and_strainids for item in sublist) @@ -185,7 +208,7 @@ def __merge_pheno_data_and_se__(data, sedata) -> dict: def phenotype_by_id( - conn: mdb.Connection, + conn: Connection, species_id: int, population_id: int, dataset_id: int, @@ -217,13 +240,13 @@ def phenotype_by_id( ).values()) } if bool(_pheno) and len(_pheno.keys()) > 1: - raise Exception( + raise Exception(# pylint: disable=[broad-exception-raised] "We found more than one phenotype with the same identifier!") return None -def phenotypes_data(conn: mdb.Connection, +def phenotypes_data(conn: Connection, population_id: int, dataset_id: int, offset: int = 0, @@ -246,7 +269,60 @@ def phenotypes_data(conn: mdb.Connection, return tuple(dict(row) for row in cursor.fetchall()) -def save_new_dataset(cursor: Cursor, +def phenotypes_vector_data(# pylint: disable=[too-many-arguments, too-many-positional-arguments] + conn: Connection, + species_id: int, + population_id: int, + xref_ids: tuple[int, ...] = tuple(), + offset: int = 0, + limit: Optional[int] = None +) -> dict[tuple[int, int, int], dict[str, Union[int,float]]]: + """Retrieve the vector data values for traits in the database.""" + _params: tuple[int, ...] = (species_id, population_id) + _query = ("SELECT " + "Species.Id AS SpeciesId, iset.Id AS InbredSetId, " + "pxr.Id AS xref_id, pdata.*, Strain.Id AS StrainId, " + "Strain.Name AS StrainName " + "FROM " + "Species INNER JOIN InbredSet AS iset " + "ON Species.Id=iset.SpeciesId " + "INNER JOIN PublishXRef AS pxr " + "ON iset.Id=pxr.InbredSetId " + "INNER JOIN PublishData AS pdata " + "ON pxr.DataId=pdata.Id " + "INNER JOIN Strain " + "ON pdata.StrainId=Strain.Id " + "WHERE Species.Id=%s AND iset.Id=%s") + if len(xref_ids) > 0: + _paramstr = ", ".join(["%s"] * len(xref_ids)) + _query = _query + f" AND pxr.Id IN ({_paramstr})" + _params = _params + xref_ids + + def __organise__(acc, row): + _rowid = (species_id, population_id, row["xref_id"]) + _phenodata = { + **acc.get( + _rowid, { + "species_id": species_id, + "population_id": population_id, + "xref_id": row["xref_id"] + }), + row["StrainName"]: row["value"] + } + return { + **acc, + _rowid: _phenodata + } + + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute( + _query + (f" LIMIT {limit} OFFSET {offset}" if bool(limit) else ""), + _params) + debug_query(cursor, logger) + return reduce(__organise__, cursor.fetchall(), {}) + + +def save_new_dataset(cursor: BaseCursor, population_id: int, dataset_name: str, dataset_fullname: str, @@ -273,67 +349,153 @@ def save_new_dataset(cursor: Cursor, return {**params, "Id": cursor.lastrowid} -def phenotypes_data_by_ids( - conn: mdb.Connection, - inbred_pheno_xref: dict[str, int] -) -> tuple[dict, ...]: - """Fetch all phenotype data, filtered by the `inbred_pheno_xref` mapping.""" - _paramstr = ",".join(["(%s, %s, %s)"] * len(inbred_pheno_xref)) - _query = ("SELECT " - "pub.PubMed_ID, pheno.*, pxr.*, pd.*, str.*, iset.InbredSetCode " - "FROM Publication AS pub " - "RIGHT JOIN PublishXRef AS pxr0 ON pub.Id=pxr0.PublicationId " - "INNER JOIN Phenotype AS pheno ON pxr0.PhenotypeId=pheno.id " - "INNER JOIN PublishXRef AS pxr ON pheno.Id=pxr.PhenotypeId " - "INNER JOIN PublishData AS pd ON pxr.DataId=pd.Id " - "INNER JOIN Strain AS str ON pd.StrainId=str.Id " - "INNER JOIN StrainXRef AS sxr ON str.Id=sxr.StrainId " - "INNER JOIN PublishFreeze AS pf ON sxr.InbredSetId=pf.InbredSetId " - "INNER JOIN InbredSet AS iset ON pf.InbredSetId=iset.InbredSetId " - f"WHERE (pxr.InbredSetId, pheno.Id, pxr.Id) IN ({_paramstr}) " - "ORDER BY pheno.Id") - with conn.cursor(cursorclass=DictCursor) as cursor: - cursor.execute(_query, tuple(item for row in inbred_pheno_xref - for item in (row["population_id"], - row["phenoid"], - row["xref_id"]))) - debug_query(cursor, logger) - return tuple( - reduce(__organise_by_phenotype__, cursor.fetchall(), {}).values()) +def __pre_process_phenotype_data__(row): + _desc = row.get("description", "") + _pre_pub_desc = row.get("pre_publication_description", _desc) + _orig_desc = row.get("original_description", _desc) + _post_pub_desc = row.get("post_publication_description", _orig_desc) + _pre_pub_abbr = row.get("pre_publication_abbreviation", row["id"]) + _post_pub_abbr = row.get("post_publication_abbreviation", _pre_pub_abbr) + return { + "pre_publication_description": _pre_pub_desc, + "post_publication_description": _post_pub_desc, + "original_description": _orig_desc, + "units": row["units"], + "pre_publication_abbreviation": _pre_pub_abbr, + "post_publication_abbreviation": _post_pub_abbr + } -def create_new_phenotypes(conn: mdb.Connection, - phenotypes: Iterable[dict]) -> tuple[dict, ...]: - """Add entirely new phenotypes to the database.""" - _phenos = tuple() +def create_new_phenotypes(# pylint: disable=[too-many-locals] + conn: Connection, + population_id: int, + publication_id: int, + phenotypes: Iterable[dict] +) -> tuple[dict, ...]: + """Add entirely new phenotypes to the database. WARNING: Not thread-safe.""" + _phenos: tuple[dict, ...] = tuple() with conn.cursor(cursorclass=DictCursor) as cursor: + def make_next_id(idcol, table): + cursor.execute(f"SELECT MAX({idcol}) AS last_id FROM {table}") + _last_id = int(cursor.fetchone()["last_id"]) + def __next_id__(): + _next_id = _last_id + 1 + while True: + yield _next_id + _next_id = _next_id + 1 + + return __next_id__ + + ### Bottleneck: Everything below makes this function not ### + ### thread-safe because we have to retrieve the last IDs from ### + ### the database and increment those to compute the next IDs. ### + ### This is an unfortunate result from the current schema that ### + ### has a cross-reference table that requires that a phenotype ### + ### be linked to an existing publication, and have data IDs to ### + ### link to that phenotype's data. ### + ### The fact that the IDs are sequential also compounds the ### + ### bottleneck. ### + ### + ### For extra safety, ensure the following tables are locked ### + ### for `WRITE`: ### + ### - PublishXRef ### + ### - Phenotype ### + ### - PublishXRef ### + __next_xref_id = make_next_id("Id", "PublishXRef")() + __next_pheno_id__ = make_next_id("Id", "Phenotype")() + __next_data_id__ = make_next_id("DataId", "PublishXRef")() + + def __build_params_and_prepubabbrevs__(acc, row): + processed = __pre_process_phenotype_data__(row) + return ( + acc[0] + ({ + **processed, + "population_id": population_id, + "publication_id": publication_id, + "phenotype_id": next(__next_pheno_id__), + "xref_id": next(__next_xref_id), + "data_id": next(__next_data_id__) + },), + acc[1] + (processed["pre_publication_abbreviation"],)) while True: batch = take(phenotypes, 1000) if len(batch) == 0: break + params, abbrevs = reduce(#type: ignore[var-annotated] + __build_params_and_prepubabbrevs__, + batch, + (tuple(), tuple())) + # Check for uniqueness for all "Pre_publication_description" values + abbrevs_paramsstr = ", ".join(["%s"] * len(abbrevs)) + _query = ("SELECT PublishXRef.PhenotypeId, Phenotype.* " + "FROM PublishXRef " + "INNER JOIN Phenotype " + "ON PublishXRef.PhenotypeId=Phenotype.Id " + "WHERE PublishXRef.InbredSetId=%s " + "AND Phenotype.Pre_publication_abbreviation IN " + f"({abbrevs_paramsstr})") + cursor.execute(_query, + ((population_id,) + abbrevs)) + existing = tuple(row["Pre_publication_abbreviation"] + for row in cursor.fetchall()) + if len(existing) > 0: + # Narrow this exception, perhaps? + raise Exception(# pylint: disable=[broad-exception-raised] + "Found already existing phenotypes with the following " + "'Pre-publication abbreviations':\n\t" + "\n\t".join(f"* {item}" for item in existing)) + cursor.executemany( - ("INSERT INTO " - "Phenotype(Pre_publication_description, Original_description, Units, Authorized_Users) " - "VALUES (%(id)s, %(description)s, %(units)s, 'robwilliams')"), - tuple(batch)) - paramstr = ", ".join(["%s"] * len(batch)) - cursor.execute( - "SELECT * FROM Phenotype WHERE Pre_publication_description IN " - f"({paramstr})", - tuple(item["id"] for item in batch)) - _phenos = _phenos + tuple({ - "phenotype_id": row["Id"], - "id": row["Pre_publication_description"], - "description": row["Original_description"], - "units": row["Units"] - } for row in cursor.fetchall()) + ( + "INSERT INTO " + "Phenotype(" + "Id, " + "Pre_publication_description, " + "Post_publication_description, " + "Original_description, " + "Units, " + "Pre_publication_abbreviation, " + "Post_publication_abbreviation, " + "Authorized_Users" + ")" + "VALUES (" + "%(phenotype_id)s, " + "%(pre_publication_description)s, " + "%(post_publication_description)s, " + "%(original_description)s, " + "%(units)s, " + "%(pre_publication_abbreviation)s, " + "%(post_publication_abbreviation)s, " + "'robwilliams'" + ")"), + params) + _comments = f"Created at {datetime.now().isoformat()}" + cursor.executemany( + ("INSERT INTO PublishXRef(" + "Id, " + "InbredSetId, " + "PhenotypeId, " + "PublicationId, " + "DataId, " + "comments" + ")" + "VALUES(" + "%(xref_id)s, " + "%(population_id)s, " + "%(phenotype_id)s, " + "%(publication_id)s, " + "%(data_id)s, " + f"'{_comments}'" + ")"), + params) + _phenos = _phenos + params return _phenos def save_phenotypes_data( - conn: mdb.Connection, + conn: Connection, table: str, data: Iterable[dict] ) -> int: @@ -363,7 +525,7 @@ def save_phenotypes_data( def quick_save_phenotypes_data( - conn: mdb.Connection, + conn: Connection, table: str, dataitems: Iterable[dict], tmpdir: Path @@ -374,14 +536,14 @@ def quick_save_phenotypes_data( prefix=f"{table}_data", mode="wt", dir=tmpdir) as tmpfile, conn.cursor(cursorclass=DictCursor) as cursor): _count = 0 - console.debug("Write data rows to text file.") + logger.debug("Write data rows to text file.") for row in dataitems: tmpfile.write( f'{row["data_id"]}\t{row["sample_id"]}\t{row["value"]}\n') _count = _count + 1 tmpfile.flush() - console.debug("Load text file into database (table: %s)", + logger.debug("Load text file into database (table: %s)", _table_details["table"]) cursor.execute( f"LOAD DATA LOCAL INFILE '{tmpfile.name}' " @@ -393,3 +555,134 @@ def quick_save_phenotypes_data( ")") debug_query(cursor, logger) return _count + + +def __sleep_random__(): + """Sleep a random amount of time chosen from 0.05s to 1s in increments of 0.05""" + time.sleep(random.choice(tuple(i / 20.0 for i in range(1, 21)))) + + +def delete_phenotypes_data( + cursor: BaseCursor, + data_ids: tuple[int, ...] +) -> tuple[int, int, int]: + """Delete numeric data for phenotypes with the given data IDs.""" + if len(data_ids) == 0: + return (0, 0, 0) + + # Loop to handle big deletes i.e. ≥ 10000 rows + _dcount, _secount, _ncount = (0, 0, 0)# Count total rows deleted + while True: + _paramstr = ", ".join(["%s"] * len(data_ids)) + cursor.execute( + "DELETE FROM PublishData " + f"WHERE Id IN ({_paramstr}) " + "ORDER BY Id ASC, StrainId ASC "# Make deletions deterministic + "LIMIT 1000", + data_ids) + _dcount_curr = cursor.rowcount + _dcount += _dcount_curr + + cursor.execute( + "DELETE FROM PublishSE " + f"WHERE DataId IN ({_paramstr}) " + "ORDER BY DataId ASC, StrainId ASC "# Make deletions deterministic + "LIMIT 1000", + data_ids) + _secount_curr = cursor.rowcount + _secount += _secount_curr + + cursor.execute( + "DELETE FROM NStrain " + f"WHERE DataId IN ({_paramstr}) " + "ORDER BY DataId ASC, StrainId ASC "# Make deletions deterministic + "LIMIT 1000", + data_ids) + _ncount_curr = cursor.rowcount + _ncount += _ncount_curr + __sleep_random__() + + if all((_dcount_curr == 0, _secount_curr == 0, _ncount_curr == 0)): + # end loop if there are no more rows to delete. + break + + return (_dcount, _secount, _ncount) + + +def __linked_ids__( + cursor: BaseCursor, + population_id: int, + xref_ids: tuple[int, ...] +) -> tuple[tuple[int, int, int], ...]: + """Retrieve `DataId` values from `PublishXRef` table.""" + _paramstr = ", ".join(["%s"] * len(xref_ids)) + cursor.execute("SELECT PhenotypeId, PublicationId, DataId " + "FROM PublishXRef " + f"WHERE InbredSetId=%s AND Id IN ({_paramstr})", + (population_id,) + xref_ids) + return tuple( + (int(row["PhenotypeId"]), int(row["PublicationId"]), int(row["DataId"])) + for row in cursor.fetchall()) + + +def delete_phenotypes( + conn_or_cursor: Union[Connection, Cursor], + population_id: int, + xref_ids: tuple[int, ...] +) -> tuple[int, int, int, int]: + """Delete phenotypes and all their data.""" + def __delete_phenos__(cursor: BaseCursor, pheno_ids: tuple[int, ...]) -> int: + """Delete data from the `Phenotype` table.""" + _paramstr = ", ".join(["%s"] * len(pheno_ids)) + + _pcount = 0 + while True: + cursor.execute( + "DELETE FROM Phenotype " + f"WHERE Id IN ({_paramstr}) " + "ORDER BY Id " + "LIMIT 1000", + pheno_ids) + _pcount_curr = cursor.rowcount + _pcount += _pcount_curr + __sleep_random__() + if _pcount_curr == 0: + break + + return cursor.rowcount + + def __delete_xrefs__(cursor: BaseCursor) -> int: + _paramstr = ", ".join(["%s"] * len(xref_ids)) + + _xcount = 0 + while True: + cursor.execute( + "DELETE FROM PublishXRef " + f"WHERE InbredSetId=%s AND Id IN ({_paramstr}) " + "ORDER BY Id " + "LIMIT 10000", + (population_id,) + xref_ids) + _xcount_curr = cursor.rowcount + _xcount += _xcount_curr + __sleep_random__() + if _xcount_curr == 0: + break + + return _xcount + + def __with_cursor__(cursor): + _phenoids, _pubids, _dataids = reduce( + lambda acc, curr: (acc[0] + (curr[0],), + acc[1] + (curr[1],), + acc[2] + (curr[2],)), + __linked_ids__(cursor, population_id, xref_ids), + (tuple(), tuple(), tuple())) + __delete_phenos__(cursor, _phenoids) + return (__delete_xrefs__(cursor),) + delete_phenotypes_data( + cursor, _dataids) + + if isinstance(conn_or_cursor, BaseCursor): + return __with_cursor__(conn_or_cursor) + + with conn_or_cursor.cursor(cursorclass=DictCursor) as cursor: + return __with_cursor__(cursor) diff --git a/uploader/phenotypes/views.py b/uploader/phenotypes/views.py index 49c12b5..c03f3f5 100644 --- a/uploader/phenotypes/views.py +++ b/uploader/phenotypes/views.py @@ -1,60 +1,57 @@ -"""Views handling ('classical') phenotypes.""" -import sys +"""Views handling ('classical') phenotypes."""# pylint: disable=[too-many-lines] +import io import csv +import sys import uuid import json import logging -import tempfile from typing import Any from pathlib import Path from zipfile import ZipFile -from urllib.parse import urljoin from functools import wraps, reduce -from logging import INFO, ERROR, DEBUG, FATAL, CRITICAL, WARNING +from urllib.parse import urljoin, urlparse, ParseResult, urlunparse, urlencode import datetime -from datetime import timedelta from redis import Redis from pymonad.either import Left from requests.models import Response from MySQLdb.cursors import DictCursor -from werkzeug.utils import secure_filename from gn_libs import sqlite3 from gn_libs import jobs as gnlibs_jobs +from gn_libs.jobs.jobs import JobNotFound from gn_libs.mysqldb import database_connection -from gn_libs import monadic_requests as mrequests -from authlib.jose import jwt +from werkzeug.datastructures import Headers from flask import (flash, request, - url_for, jsonify, redirect, Blueprint, - send_file, - current_app as app) + current_app as app, + Response as FlaskResponse) -# from r_qtl import r_qtl2 as rqtl2 from r_qtl import r_qtl2_qc as rqc from r_qtl import exceptions as rqe - from uploader import jobs from uploader import session -from uploader.files import save_file#, fullpath +from uploader.files import save_file +from uploader.configutils import uploads_dir +from uploader.flask_extensions import url_for from uploader.ui import make_template_renderer from uploader.oauth2.client import oauth2_post +from uploader.oauth2.tokens import request_token from uploader.authorisation import require_login -from uploader.oauth2 import jwks, client as oauth2client +from uploader.oauth2 import client as oauth2client +from uploader.route_utils import build_next_argument from uploader.route_utils import generic_select_population from uploader.datautils import safe_int, enumerate_sequence from uploader.species.models import all_species, species_by_id from uploader.monadic_requests import make_either_error_handler from uploader.publications.models import fetch_publication_by_id from uploader.request_checks import with_species, with_population -from uploader.samples.models import samples_by_species_and_population from uploader.input_validation import (encode_errors, decode_errors, is_valid_representative_name) @@ -65,9 +62,9 @@ from .models import (dataset_by_id, save_new_dataset, dataset_phenotypes, datasets_by_population, - phenotypes_data_by_ids, phenotype_publication_data) +logger = logging.getLogger(__name__) phenotypesbp = Blueprint("phenotypes", __name__) render_template = make_template_renderer("phenotypes") @@ -241,11 +238,6 @@ def view_phenotype(# pylint: disable=[unused-argument] population["Id"], dataset["Id"], xref_id) - def __non_empty__(value) -> bool: - if isinstance(value, str): - return value.strip() != "" - return bool(value) - return render_template( "phenotypes/view-phenotype.html", species=species, @@ -254,19 +246,14 @@ def view_phenotype(# pylint: disable=[unused-argument] xref_id=xref_id, phenotype=phenotype, has_se=any(bool(item.get("error")) for item in phenotype["data"]), - publish_data={ - key.replace("_", " "): val - for key,val in - (phenotype_publication_data(conn, phenotype["Id"]) or {}).items() - if (key in ("PubMed_ID", "Authors", "Title", "Journal") - and __non_empty__(val)) - }, - privileges=(privileges - ### For demo! Do not commit this part - + ("group:resource:edit-resource", - "group:resource:delete-resource",) - ### END: For demo! Do not commit this part - ), + publication=(phenotype_publication_data(conn, phenotype["Id"]) or {}), + privileges=privileges, + next=build_next_argument( + uri="species.populations.phenotypes.view_phenotype", + species_id=species["SpeciesId"], + population_id=population["Id"], + dataset_id=dataset["Id"], + xref_id=xref_id), activelink="view-phenotype") def __fail__(error): @@ -330,6 +317,11 @@ def create_dataset(species: dict, population: dict, **kwargs):# pylint: disable= dataset_shortname = ( form["dataset-shortname"] or form["dataset-name"]).strip() _pheno_dataset = save_new_dataset( + # It's not necessary to update the authorisation server to register + # new phenotype resource here, since each phenotype trait can, in + # theory, have its own access control allowing/disallowing access to + # it. In practice, however, we tend to gather multiple traits into a + # single resource for access control. cursor, population["Id"], form["dataset-name"].strip(), @@ -347,7 +339,7 @@ def process_phenotypes_rqtl2_bundle(error_uri): try: ## Handle huge files here... phenobundle = save_file(request.files["phenotypes-bundle"], - Path(app.config["UPLOAD_FOLDER"])) + uploads_dir(app)) rqc.validate_bundle(phenobundle) return phenobundle except AssertionError as _aerr: @@ -370,22 +362,29 @@ def process_phenotypes_individual_files(error_uri): "comment.char": form["file-comment-character"], "na.strings": form["file-na"].split(" "), } - bundlepath = Path(app.config["UPLOAD_FOLDER"], + bundlepath = Path(uploads_dir(app), f"{str(uuid.uuid4()).replace('-', '')}.zip") with ZipFile(bundlepath,mode="w") as zfile: - for rqtlkey, formkey in (("phenocovar", "phenotype-descriptions"), - ("pheno", "phenotype-data"), - ("phenose", "phenotype-se"), - ("phenonum", "phenotype-n")): + for rqtlkey, formkey, _type in ( + ("phenocovar", "phenotype-descriptions", "mandatory"), + ("pheno", "phenotype-data", "mandatory"), + ("phenose", "phenotype-se", "optional"), + ("phenonum", "phenotype-n", "optional")): + if _type == "optional" and not bool(form.get(formkey)): + continue # skip if an optional key does not exist. + + cdata[f"{rqtlkey}_transposed"] = ( + (form.get(f"{formkey}-transposed") or "off") == "on") + if form.get("resumable-upload", False): # Chunked upload of large files was used filedata = json.loads(form[formkey]) zfile.write( - Path(app.config["UPLOAD_FOLDER"], filedata["uploaded-file"]), + Path(uploads_dir(app), filedata["uploaded-file"]), arcname=filedata["original-name"]) cdata[rqtlkey] = cdata.get(rqtlkey, []) + [filedata["original-name"]] else: - # TODO: Check this path: fix any bugs. + # T0DO: Check this path: fix any bugs. _sentfile = request.files[formkey] if not bool(_sentfile): flash(f"Expected file ('{formkey}') was not provided.", @@ -393,12 +392,13 @@ def process_phenotypes_individual_files(error_uri): return error_uri filepath = save_file( - _sentfile, Path(app.config["UPLOAD_FOLDER"]), hashed=False) + _sentfile, uploads_dir(app), hashed=False) zfile.write( - Path(app.config["UPLOAD_FOLDER"], filepath), + Path(uploads_dir(app), filepath), arcname=filepath.name) cdata[rqtlkey] = cdata.get(rqtlkey, []) + [filepath.name] + zfile.writestr("control_data.json", data=json.dumps(cdata, indent=2)) return bundlepath @@ -423,15 +423,13 @@ def add_phenotypes(species: dict, population: dict, dataset: dict, **kwargs):# p dataset_id=dataset["Id"])) _redisuri = app.config["REDIS_URL"] _sqluri = app.config["SQL_URI"] - with (Redis.from_url(_redisuri, decode_responses=True) as rconn, - # database_connection(_sqluri) as conn, - # conn.cursor(cursorclass=DictCursor) as cursor - ): + with Redis.from_url(_redisuri, decode_responses=True) as rconn: if request.method == "GET": today = datetime.date.today() return render_template( ("phenotypes/add-phenotypes-with-rqtl2-bundle.html" - if use_bundle else "phenotypes/add-phenotypes-raw-files.html"), + if use_bundle + else "phenotypes/add-phenotypes-raw-files.html"), species=species, population=population, dataset=dataset, @@ -461,7 +459,6 @@ def add_phenotypes(species: dict, population: dict, dataset: dict, **kwargs):# p [sys.executable, "-m", "scripts.rqtl2.phenotypes_qc", _sqluri, _redisuri, _namespace, str(_jobid), str(species["SpeciesId"]), str(population["Id"]), - # str(dataset["Id"]), str(phenobundle), "--loglevel", logging.getLevelName( @@ -477,7 +474,7 @@ def add_phenotypes(species: dict, population: dict, dataset: dict, **kwargs):# p **({"publicationid": request.form["publication-id"]} if request.form.get("publication-id") else {})})}), _redisuri, - f"{app.config['UPLOAD_FOLDER']}/job_errors") + f"{uploads_dir(app)}/job_errors") app.logger.debug("JOB DETAILS: %s", _job) jobstatusuri = url_for("species.populations.phenotypes.job_status", @@ -518,6 +515,7 @@ def job_status( job = jobs.job(rconn, jobs.jobsnamespace(), str(job_id)) except jobs.JobNotFound as _jnf: job = None + return render_template("phenotypes/job-status.html", species=species, population=population, @@ -533,6 +531,65 @@ def job_status( @phenotypesbp.route( "<int:species_id>/populations/<int:population_id>/phenotypes/datasets" + "/<int:dataset_id>/job/<uuid:job_id>/download-errors", + methods=["GET"]) +@require_login +@with_dataset( + species_redirect_uri="species.populations.phenotypes.index", + population_redirect_uri="species.populations.phenotypes.select_population", + redirect_uri="species.populations.phenotypes.list_datasets") +def download_errors( + species: dict, + population: dict, + dataset: dict, + job_id: uuid.UUID, + **kwargs):# pylint: disable=[unused-argument] + """Download the list of errors as a CSV file.""" + with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn: + try: + job = jobs.job(rconn, jobs.jobsnamespace(), str(job_id)) + _prefix_ = jobs.jobsnamespace() + _jobid_ = job['jobid'] + def __generate_chunks__(): + _errors_ = ( + json.loads(error) + for key in rconn.keys( + f"{_prefix_}:{str(_jobid_)}:*:errors:*") + for error in rconn.lrange(key, 0, -1)) + _chunk_no_ = 0 + _all_errors_printed_ = False + while not _all_errors_printed_: + _chunk_ = [] + try: + for _ in range(0, 1000): + _chunk_.append(next(_errors_)) + except StopIteration: + _all_errors_printed_ = True + if len(_chunk_) <= 0: + raise + + _out_ = io.StringIO() + _writer_ = csv.DictWriter(_out_, fieldnames=tuple(_chunk_[0].keys())) + if _chunk_no_ == 0: + _writer_.writeheader() + _writer_.writerows(_chunk_) + _chunk_no_ += 1 + yield _out_.getvalue() + if _all_errors_printed_: + return + + headers = Headers() + headers.set("Content-Disposition", + "attachment", + filename=f"{job['job-type']}_{job['jobid']}.csv") + return FlaskResponse( + __generate_chunks__(), mimetype="text/csv", headers=headers) + except jobs.JobNotFound as _jnf: + return render_template("jobs/job-not-found.html", job_id=job_id) + + +@phenotypesbp.route( + "<int:species_id>/populations/<int:population_id>/phenotypes/datasets" "/<int:dataset_id>/job/<uuid:job_id>/review", methods=["GET"]) @require_login @@ -610,9 +667,27 @@ def review_job_data( conn, int(_job_metadata["publicationid"])) if _job_metadata.get("publicationid") else None), + user=session.user_details(), + timestamp=datetime.datetime.now().isoformat(), activelink="add-phenotypes") +def load_phenotypes_success_handler(job): + """Handle loading new phenotypes into the database successfully.""" + return redirect(url_for( + "species.populations.phenotypes.load_data_success", + species_id=job["metadata"]["species_id"], + population_id=job["metadata"]["population_id"], + dataset_id=job["metadata"]["dataset_id"], + job_id=job["job_id"])) + + +def proceed_to_job_status(job): + """A generic 'job success' handler for asynchronous phenotype jobs.""" + app.logger.debug("The new job: %s", job) + return redirect(url_for("background-jobs.job_status", job_id=job["job_id"])) + + @phenotypesbp.route( "<int:species_id>/populations/<int:population_id>/phenotypes/datasets" "/<int:dataset_id>/load-data-to-database", @@ -629,67 +704,40 @@ def load_data_to_database( **kwargs ):# pylint: disable=[unused-argument] """Load the data from the given QC job into the database.""" - jobs_db = app.config["ASYNCHRONOUS_JOBS_SQLITE_DB"] + _jobs_db = app.config["ASYNCHRONOUS_JOBS_SQLITE_DB"] with (Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn, - sqlite3.connection(jobs_db) as conn): + sqlite3.connection(_jobs_db) as conn): + # T0DO: Maybe break the connection between the jobs here, pass: + # - the bundle name (rebuild the full path here.) + # - publication details, where separate + # - details about the files: e.g. total lines, etc qc_job = jobs.job(rconn, jobs.jobsnamespace(), request.form["data-qc-job-id"]) _meta = json.loads(qc_job["job-metadata"]) - load_job_id = uuid.uuid4() + _load_job_id = uuid.uuid4() + _loglevel = logging.getLevelName(app.logger.getEffectiveLevel()).lower() command = [ sys.executable, "-u", "-m", "scripts.load_phenotypes_to_db", app.config["SQL_URI"], - jobs_db, - str(load_job_id), + _jobs_db, + str(_load_job_id), "--log-level", - logging.getLevelName( - app.logger.getEffectiveLevel() - ).lower() + _loglevel ] def __handle_error__(resp): - raise Exception(resp) - - def __handle_success__(load_job): - app.logger.debug("The phenotypes loading job: %s", load_job) - return str(load_job) - issued = datetime.datetime.now() - jwtkey = jwks.newest_jwk_with_rotation( - jwks.jwks_directory(app, "UPLOADER_SECRETS"), - int(app.config["JWKS_ROTATION_AGE_DAYS"])) - - return mrequests.post( - urljoin(oauth2client.authserver_uri(), "auth/token"), - json={ - "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer", - "scope": oauth2client.SCOPE, - "assertion": jwt.encode( - header={ - "alg": "RS256", - "typ": "JWT", - "kid": jwtkey.as_dict()["kid"] - }, - payload={ - "iss": str(oauth2client.oauth2_clientid()), - "sub": str(session.user_details()["user_id"]), - "aud": urljoin(oauth2client.authserver_uri(), - "auth/token"), - # TODO: Update expiry time once fix is implemented in - # auth server. - "exp": (issued + timedelta(minutes=5)).timestamp(), - "nbf": int(issued.timestamp()), - "iat": int(issued.timestamp()), - "jti": str(uuid.uuid4()) - }, - key=jwtkey).decode("utf8"), - "client_id": oauth2client.oauth2_clientid() - } + return render_template("http-error.html", *resp.json()) + + + return request_token( + token_uri=urljoin(oauth2client.authserver_uri(), "auth/token"), + user_id=session.user_details()["user_id"] ).then( lambda token: gnlibs_jobs.initialise_job( conn, - load_job_id, + _load_job_id, command, "load-new-phenotypes-data", extra_meta={ @@ -697,16 +745,28 @@ def load_data_to_database( "population_id": population["Id"], "dataset_id": dataset["Id"], "bundle_file": _meta["bundle"], + "publication_id": _meta["publicationid"], "authserver": oauth2client.authserver_uri(), - "token": token["access_token"] - }) + "token": token["access_token"], + "dataname": request.form["data_name"].strip(), + "success_handler": ( + "uploader.phenotypes.views" + ".load_phenotypes_success_handler"), + **{ + key: request.form[key] + for key in ("data_description",) + if key in request.form.keys() + } + }, + external_id=session.logged_in_user_id()) ).then( lambda job: gnlibs_jobs.launch_job( - job, - jobs_db, - f"{app.config['UPLOAD_FOLDER']}/job_errors", - worker_manager="gn_libs.jobs.launcher") - ).either(__handle_error__, __handle_success__) + job, + _jobs_db, + Path(f"{uploads_dir(app)}/job_errors"), + worker_manager="gn_libs.jobs.launcher", + loglevel=_loglevel) + ).either(__handle_error__, proceed_to_job_status) def update_phenotype_metadata(conn, metadata: dict): @@ -821,7 +881,7 @@ def update_phenotype_data(conn, data: dict): } }) - values, serrs, counts = tuple( + values, serrs, counts = tuple(# type: ignore[var-annotated] tuple({ "data_id": row[0].split("::")[0], "strain_id": row[0].split("::")[1], @@ -861,12 +921,7 @@ def edit_phenotype_data(# pylint: disable=[unused-argument] def __render__(**kwargs): processed_kwargs = { **kwargs, - "privileges": (kwargs.get("privileges", tuple()) - ### For demo! Do not commit this part - + ("group:resource:edit-resource", - "group:resource:delete-resource",) - ### END: For demo! Do not commit this part - ) + "privileges": kwargs.get("privileges", tuple()) } return render_template( "phenotypes/edit-phenotype.html", @@ -966,151 +1021,331 @@ def edit_phenotype_data(# pylint: disable=[unused-argument] xref_id=xref_id)) -def process_phenotype_data_for_download(pheno: dict) -> dict: - """Sanitise data for download.""" - return { - "UniqueIdentifier": f"phId:{pheno['Id']}::xrId:{pheno['xref_id']}", - **{ - key: val for key, val in pheno.items() - if key not in ("Id", "xref_id", "data", "Units") - }, - **{ - data_item["StrainName"]: data_item["value"] - for data_item in pheno.get("data", {}).values() - } - } +@phenotypesbp.route( + "<int:species_id>/populations/<int:population_id>/phenotypes/datasets" + "/<int:dataset_id>/load-data-success/<uuid:job_id>", + methods=["GET"]) +@require_login +@with_dataset( + species_redirect_uri="species.populations.phenotypes.index", + population_redirect_uri="species.populations.phenotypes.select_population", + redirect_uri="species.populations.phenotypes.list_datasets") +def load_data_success( + species: dict, + population: dict, + dataset: dict, + job_id: uuid.UUID, + **kwargs +):# pylint: disable=[unused-argument] + """Display success page if loading data to database was successful.""" + with (database_connection(app.config["SQL_URI"]) as conn, + sqlite3.connection(app.config["ASYNCHRONOUS_JOBS_SQLITE_DB"]) + as jobsconn): + try: + gn2_uri = urlparse(app.config["GN2_SERVER_URL"]) + job = gnlibs_jobs.job(jobsconn, job_id, fulldetails=True) + app.logger.debug("THE JOB: %s", job) + _xref_ids = tuple( + str(item) for item + in json.loads(job["metadata"].get("xref_ids", "[]"))) + _publication = fetch_publication_by_id( + conn, int(job["metadata"].get("publication_id", "0"))) + _search_terms = (item for item in + (str(_publication["PubMed_ID"] or ""), + _publication["Authors"], + (_publication["Title"] or "")) + if item != "") + return render_template( + "phenotypes/load-phenotypes-success.html", + species=species, + population=population, + dataset=dataset, + job=job, + search_page_uri=urlunparse(ParseResult( + scheme=gn2_uri.scheme, + netloc=gn2_uri.netloc, + path="/search", + params="", + query=urlencode({ + "species": species["Name"], + "group": population["Name"], + "type": "Phenotypes", + "dataset": dataset["Name"], + "search_terms_or": ( + # Very long URLs will cause + # errors. + " ".join(_xref_ids) + if len(_xref_ids) <= 100 + else ""), + "search_terms_and": " ".join( + _search_terms).strip(), + "accession_id": "None", + "FormID": "searchResult" + }), + fragment=""))) + except JobNotFound as _jnf: + return render_template("jobs/job-not-found.html", job_id=job_id) -BULK_EDIT_COMMON_FIELDNAMES = [ - "UniqueIdentifier", - "Post_publication_description", - "Pre_publication_abbreviation", - "Pre_publication_description", - "Original_description", - "Post_publication_abbreviation", - "PubMed_ID" -] +@phenotypesbp.route( + "<int:species_id>/populations/<int:population_id>/phenotypes/datasets" + "/<int:dataset_id>/recompute-means", + methods=["POST"]) +@require_login +@with_dataset( + species_redirect_uri="species.populations.phenotypes.index", + population_redirect_uri="species.populations.phenotypes.select_population", + redirect_uri="species.populations.phenotypes.list_datasets") +def recompute_means(# pylint: disable=[unused-argument] + species: dict, + population: dict, + dataset: dict, + **kwargs +): + """Compute/Recompute the means for phenotypes in a particular population.""" + _jobs_db = app.config["ASYNCHRONOUS_JOBS_SQLITE_DB"] + _job_id = uuid.uuid4() + _xref_ids = tuple(int(item.split("_")[-1]) + for item in request.form.getlist("selected-phenotypes")) + + _loglevel = logging.getLevelName(app.logger.getEffectiveLevel()).lower() + command = [ + sys.executable, + "-u", + "-m", + "scripts.compute_phenotype_means", + app.config["SQL_URI"], + _jobs_db, + str(population["Id"]), + "--log-level", + _loglevel] + ( + ["--cross-ref-ids", ",".join(str(_id) for _id in _xref_ids)] + if len(_xref_ids) > 0 else + []) + logger.debug("%s.recompute_means: command (%s)", __name__, command) + + with sqlite3.connection(_jobs_db) as conn: + _job = gnlibs_jobs.launch_job( + gnlibs_jobs.initialise_job( + conn, + _job_id, + command, + "(re)compute-phenotype-means", + extra_meta={ + "species_id": species["SpeciesId"], + "population_id": population["Id"], + "dataset_id": dataset["Id"], + "success_handler": ( + "uploader.phenotypes.views." + "recompute_phenotype_means_success_handler") + }, + external_id=session.logged_in_user_id()), + _jobs_db, + Path(f"{uploads_dir(app)}/job_errors"), + worker_manager="gn_libs.jobs.launcher", + loglevel=_loglevel) + return redirect(url_for("background-jobs.job_status", + job_id=_job["job_id"])) + + +def return_to_dataset_view_handler(job, msg: str): + """Handler for background jobs: Returns to `View Dataset` page.""" + flash(msg, "alert alert-success") + return redirect(url_for( + "species.populations.phenotypes.view_dataset", + species_id=job["metadata"]["species_id"], + population_id=job["metadata"]["population_id"], + dataset_id=job["metadata"]["dataset_id"], + job_id=job["job_id"])) + +def recompute_phenotype_means_success_handler(job): + """Handle loading new phenotypes into the database successfully.""" + return return_to_dataset_view_handler(job, "Means computed successfully!") @phenotypesbp.route( "<int:species_id>/populations/<int:population_id>/phenotypes/datasets" - "/<int:dataset_id>/edit-download", + "/<int:dataset_id>/rerun-qtlreaper", methods=["POST"]) @require_login @with_dataset( species_redirect_uri="species.populations.phenotypes.index", population_redirect_uri="species.populations.phenotypes.select_population", redirect_uri="species.populations.phenotypes.list_datasets") -def edit_download_phenotype_data(# pylint: disable=[unused-argument] +def rerun_qtlreaper(# pylint: disable=[unused-argument] species: dict, population: dict, dataset: dict, **kwargs ): - formdata = request.json - with database_connection(app.config["SQL_URI"]) as conn: - samples_list = [ - sample["Name"] for sample in samples_by_species_and_population( - conn, species["SpeciesId"], population["Id"])] - data = ( - process_phenotype_data_for_download(pheno) - for pheno in phenotypes_data_by_ids(conn, tuple({ + """(Re)run QTLReaper for phenotypes in a particular population.""" + _jobs_db = app.config["ASYNCHRONOUS_JOBS_SQLITE_DB"] + _job_id = uuid.uuid4() + _loglevel = logging.getLevelName(app.logger.getEffectiveLevel()).lower() + + _workingdir = Path(app.config["SCRATCH_DIRECTORY"]).joinpath("qtlreaper") + _workingdir.mkdir(exist_ok=True) + command = [ + sys.executable, + "-u", + "-m", + "scripts.run_qtlreaper", + "--log-level", _loglevel, + app.config["SQL_URI"], + str(species["SpeciesId"]), + str(population["Id"]), + str(Path(app.config["GENOTYPE_FILES_DIRECTORY"]).joinpath( + "genotype")), + str(_workingdir) + ] + [ + str(_xref_id) for _xref_id in ( + int(item.split("_")[-1]) + for item in request.form.getlist("selected-phenotypes")) + ] + logger.debug("(Re)run QTLReaper: %s", command) + with sqlite3.connection(_jobs_db) as conn: + _job_id = uuid.uuid4() + _job = gnlibs_jobs.launch_job( + gnlibs_jobs.initialise_job( + conn, + _job_id, + command, + "(re)run-qtlreaper", + extra_meta={ + "species_id": species["SpeciesId"], "population_id": population["Id"], - "phenoid": row["phenotype_id"], - "xref_id": row["xref_id"] - } for row in formdata))) - - with (tempfile.TemporaryDirectory( - prefix=app.config["TEMPORARY_DIRECTORY"]) as tmpdir): - filename = Path(tmpdir).joinpath("tempfile.tsv") - with open(filename, mode="w") as outfile: - outfile.write( - "# **DO NOT** delete the 'UniqueIdentifier' row. It is used " - "by the system to identify and edit the correct rows and " - "columns in the database.\n") - outfile.write( - "# The '…_description' fields are useful for you to figure out " - "what row you are working on. Changing any of this fields will " - "also update the database, so do be careful.\n") - outfile.write( - "# Leave a field empty to delete the value in the database.\n") - outfile.write( - "# Any line beginning with a '#' character is considered a " - "comment line. This line, and all the lines above it, are " - "all comment lines. Comment lines will be ignored.\n") - writer = csv.DictWriter(outfile, - fieldnames= ( - BULK_EDIT_COMMON_FIELDNAMES + - samples_list), - dialect="excel-tab") - writer.writeheader() - writer.writerows(data) - outfile.flush() - - return send_file( - filename, - mimetype="text/csv", - as_attachment=True, - download_name=secure_filename(f"{dataset['Name']}_data")) + "dataset_id": dataset["Id"], + "success_handler": ( + "uploader.phenotypes.views." + "rerun_qtlreaper_success_handler") + }, + external_id=session.logged_in_user_id()), + _jobs_db, + Path(f"{uploads_dir(app)}/job_errors"), + worker_manager="gn_libs.jobs.launcher", + loglevel=_loglevel) + return redirect(url_for("background-jobs.job_status", + job_id=_job["job_id"])) + return redirect(url_for( + "background-jobs.job_status", job_id=_job["job_id"])) + + +def rerun_qtlreaper_success_handler(job): + """Handle success (re)running QTLReaper script.""" + return return_to_dataset_view_handler(job, "QTLReaper ran successfully!") + + +def delete_phenotypes_success_handler(job): + """Handle success running the 'delete-phenotypes' script.""" + return return_to_dataset_view_handler( + job, "Phenotypes deleted successfully.") @phenotypesbp.route( "<int:species_id>/populations/<int:population_id>/phenotypes/datasets" - "/<int:dataset_id>/edit-upload", + "/<int:dataset_id>/delete", methods=["GET", "POST"]) @require_login @with_dataset( species_redirect_uri="species.populations.phenotypes.index", population_redirect_uri="species.populations.phenotypes.select_population", redirect_uri="species.populations.phenotypes.list_datasets") -def edit_upload_phenotype_data(# pylint: disable=[unused-argument] +def delete_phenotypes(# pylint: disable=[unused-argument, too-many-locals] species: dict, population: dict, dataset: dict, **kwargs ): - if request.method == "GET": - return render_template( - "phenotypes/bulk-edit-upload.html", - species=species, - population=population, - dataset=dataset, - activelink="edit-phenotype") + """Delete the specified phenotype data.""" + _dataset_page = redirect(url_for( + "species.populations.phenotypes.view_dataset", + species_id=species["SpeciesId"], + population_id=population["Id"], + dataset_id=dataset["Id"])) - edit_file = save_file(request.files["file-upload-bulk-edit-upload"], - Path(app.config["UPLOAD_FOLDER"])) + def __handle_error__(resp): + flash( + "Error retrieving authorisation token. Phenotype deletion " + "failed. Please try again later.", + "alert alert-danger") + return _dataset_page - jobs_db = app.config["ASYNCHRONOUS_JOBS_SQLITE_DB"] - with sqlite3.connection(jobs_db) as conn: - job_id = uuid.uuid4() - job_cmd = [ - sys.executable, "-u", - "-m", "scripts.phenotypes_bulk_edit", - app.config["SQL_URI"], - jobs_db, - str(job_id), - "--log-level", - logging.getLevelName( - app.logger.getEffectiveLevel() - ).lower() - ] - app.logger.debug("Phenotype-edit, bulk-upload command: %s", job_cmd) - _job = gnlibs_jobs.launch_job( - gnlibs_jobs.initialise_job(conn, - job_id, - job_cmd, - "phenotype-bulk-edit", - extra_meta = { - "edit-file": str(edit_file), - "species-id": species["SpeciesId"], - "population-id": population["Id"], - "dataset-id": dataset["Id"] - }), - jobs_db, - f"{app.config['UPLOAD_FOLDER']}/job_errors", - worker_manager="gn_libs.jobs.launcher") - - - return redirect(url_for("background-jobs.job_status", - job_id=job_id, - job_type="phenotype-bulk-edit")) + _jobs_db = app.config["ASYNCHRONOUS_JOBS_SQLITE_DB"] + with (database_connection(app.config["SQL_URI"]) as conn, + sqlite3.connection(_jobs_db) as jobsconn): + form = request.form + xref_ids = tuple(int(item) for item in set(form.getlist("xref_ids"))) + + match form.get("action"): + case "cancel": + return redirect(url_for( + "species.populations.phenotypes.view_dataset", + species_id=species["SpeciesId"], + population_id=population["Id"], + dataset_id=dataset["Id"])) + case "delete": + _loglevel = logging.getLevelName( + app.logger.getEffectiveLevel()).lower() + if form.get("confirm_delete_all_phenotypes", "") == "on": + _cmd = ["--delete-all"] + else: + # setup phenotypes xref_ids file + _xref_ids_file = Path( + app.config["SCRATCH_DIRECTORY"], + f"delete-phenotypes-{uuid.uuid4()}.txt") + with _xref_ids_file.open(mode="w", encoding="utf8") as ptr: + ptr.write("\n".join(str(_id) for _id in xref_ids)) + + _cmd = ["--xref_ids_file", str(_xref_ids_file)] + + _job_id = uuid.uuid4() + return request_token( + token_uri=urljoin( + oauth2client.authserver_uri(), "auth/token"), + user_id=session.user_details()["user_id"] + ).then( + lambda token: gnlibs_jobs.initialise_job( + jobsconn, + _job_id, + [ + sys.executable, + "-u", + "-m", + "scripts.phenotypes.delete_phenotypes", + "--log-level", _loglevel, + app.config["SQL_URI"], + str(species["SpeciesId"]), + str(population["Id"]), + str(dataset["Id"]), + app.config["AUTH_SERVER_URL"], + token["access_token"]] + _cmd, + "delete-phenotypes", + extra_meta={ + "species_id": species["SpeciesId"], + "population_id": population["Id"], + "dataset_id": dataset["Id"], + "success_handler": ( + "uploader.phenotypes.views." + "delete_phenotypes_success_handler") + }, + external_id=session.logged_in_user_id()) + ).then( + lambda _job: gnlibs_jobs.launch_job( + _job, + _jobs_db, + Path(f"{uploads_dir(app)}/job_errors"), + worker_manager="gn_libs.jobs.launcher", + loglevel=_loglevel) + ).either(__handle_error__, proceed_to_job_status) + case _: + _phenos: tuple[dict, ...] = tuple() + if len(xref_ids) > 0: + _phenos = dataset_phenotypes( + conn, population["Id"], dataset["Id"], xref_ids=xref_ids) + + return render_template( + "phenotypes/confirm-delete-phenotypes.html", + species=species, + population=population, + dataset=dataset, + phenotypes=_phenos) |
