diff options
Diffstat (limited to 'scripts')
| -rw-r--r-- | scripts/cli/__init__.py | 3 | ||||
| -rw-r--r-- | scripts/cli/logging.py | 18 | ||||
| -rw-r--r-- | scripts/cli/options.py | 46 | ||||
| -rw-r--r-- | scripts/cli/validators.py | 10 | ||||
| -rw-r--r-- | scripts/cli_parser.py | 24 | ||||
| -rw-r--r-- | scripts/compute_phenotype_means.py | 101 | ||||
| -rw-r--r-- | scripts/insert_data.py | 6 | ||||
| -rw-r--r-- | scripts/insert_samples.py | 4 | ||||
| -rw-r--r-- | scripts/load_phenotypes_to_db.py | 228 | ||||
| -rw-r--r-- | scripts/phenotypes_bulk_edit.py | 266 | ||||
| -rw-r--r-- | scripts/process_rqtl2_bundle.py | 4 | ||||
| -rw-r--r-- | scripts/qc_on_rqtl2_bundle.py | 4 | ||||
| -rw-r--r-- | scripts/redis_logger.py | 2 | ||||
| -rw-r--r-- | scripts/rqtl2/phenotypes_qc.py | 33 | ||||
| -rw-r--r-- | scripts/run_qtlreaper.py | 211 | ||||
| -rw-r--r-- | scripts/worker.py | 2 |
16 files changed, 557 insertions, 405 deletions
diff --git a/scripts/cli/__init__.py b/scripts/cli/__init__.py new file mode 100644 index 0000000..45bbda9 --- /dev/null +++ b/scripts/cli/__init__.py @@ -0,0 +1,3 @@ +"""Package to hold CLI-specific utilities.""" + +from . import options diff --git a/scripts/cli/logging.py b/scripts/cli/logging.py new file mode 100644 index 0000000..30ecf17 --- /dev/null +++ b/scripts/cli/logging.py @@ -0,0 +1,18 @@ +"""Logging for scripts.""" +import logging + +def setup_logging( + script_logger: logging.Logger, + loglevel: str, + modules: tuple[str, ...] = tuple() +): + """Setup module-level loggers to the same log-level as the application.""" + logging.basicConfig( + encoding="utf-8", + format=("%(asctime)s — %(filename)s:%(lineno)s — %(levelname)s: " + "%(message)s"), + level=logging.INFO) + script_logger.setLevel(getattr(logging, loglevel.upper())) + effective_loglevel = logging.getLevelName(script_logger.getEffectiveLevel()) + for module in modules: + logging.getLogger(module).setLevel(effective_loglevel) diff --git a/scripts/cli/options.py b/scripts/cli/options.py new file mode 100644 index 0000000..67f35dc --- /dev/null +++ b/scripts/cli/options.py @@ -0,0 +1,46 @@ +"""General options to be added to CLI scripts.""" +from argparse import ArgumentParser + + +def add_logging(parser: ArgumentParser) -> ArgumentParser: + """Add optional log-level option""" + loglevels = ("debug", "info", "warning", "error", "critical") + parser.add_argument( + "--log_level", + "--log-level", + "--loglevel", + metavar="LOG-LEVEL", + type=str, + default="INFO", + choices=loglevels, + help=(f"Controls the severity of events to log. Valid values are: " + + ", ".join(f"'{level}'" for level in loglevels))) + return parser + + +def add_mariadb_uri(parser: ArgumentParser) -> ArgumentParser: + """Add the MySQL/MariaDB URI argument.""" + parser.add_argument("db_uri", + metavar="DB-URI", + type=str, + help="MariaDB/MySQL connection URL") + return parser + + +def add_species_id(parser: ArgumentParser) -> ArgumentParser: + """Add species-id as a mandatory argument.""" + parser.add_argument("species_id", + metavar="SPECIES-ID", + type=int, + help="The species to operate on.") + return parser + + +def add_population_id(parser: ArgumentParser) -> ArgumentParser: + """Add population-id as a mandatory argument.""" + parser = add_species_id(parser) + parser.add_argument("population_id", + metavar="POPULATION-ID", + type=int, + help="The ID for the population to operate on.") + return parser diff --git a/scripts/cli/validators.py b/scripts/cli/validators.py new file mode 100644 index 0000000..6d16e4c --- /dev/null +++ b/scripts/cli/validators.py @@ -0,0 +1,10 @@ +"""CLI options validators.""" +from pathlib import Path + + +def directory_exists(val: str) -> Path: + """Check that directory path specified actually exists.""" + _dir = Path(val).absolute() + if _dir.is_dir() and _dir.exists(): + return _dir + raise FileNotFoundError(f"The path '{_dir}' MUST exist and be a directory.") diff --git a/scripts/cli_parser.py b/scripts/cli_parser.py index 0c91c5e..bf39731 100644 --- a/scripts/cli_parser.py +++ b/scripts/cli_parser.py @@ -3,6 +3,20 @@ from uuid import UUID from typing import Optional from argparse import ArgumentParser + +def add_logging_option(parser: ArgumentParser) -> ArgumentParser: + """Add optional log-level option""" + parser.add_argument( + "--log-level", + "--loglevel", + type=str, + default="INFO", + choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", + "debug", "info", "warning", "error", "critical"], + help="The severity of events to track with the logger.") + return parser + + def init_cli_parser(program: str, description: Optional[str] = None) -> ArgumentParser: """Initialise the CLI arguments parser.""" parser = ArgumentParser(prog=program, description=description) @@ -19,14 +33,8 @@ def init_cli_parser(program: str, description: Optional[str] = None) -> Argument type=int, default=86400, help="How long to keep any redis keys around.") - parser.add_argument( - "--loglevel", - type=str, - default="INFO", - choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", - "debug", "info", "warning", "error", "critical"], - help="The severity of events to track with the logger.") - return parser + return add_logging_option(parser) + def add_global_data_arguments(parser: ArgumentParser) -> ArgumentParser: """Add the global (present in nearly ALL scripts) CLI arguments.""" diff --git a/scripts/compute_phenotype_means.py b/scripts/compute_phenotype_means.py new file mode 100644 index 0000000..ef2fabc --- /dev/null +++ b/scripts/compute_phenotype_means.py @@ -0,0 +1,101 @@ +"""Compute phenotype means.""" +import sys +import logging +from pathlib import Path +from typing import TypeVar +from argparse import Namespace, ArgumentParser + +import MySQLdb + +from gn_libs import mysqldb +from uploader import setup_modules_logging + +from .cli_parser import add_logging_option +from .load_phenotypes_to_db import update_means + +logger = logging.getLogger(__name__) +logging.basicConfig( + encoding="utf-8", + format="%(asctime)s - %(name)s - %(levelname)s — %(message)s", + level=logging.INFO) + + +def fetch_xref_id(conn: mysqldb.Connection, population_id: int) -> tuple[int, ...]: + """Fetch a population's cross-reference IDs.""" + logger.debug("Fetching the xref IDs.") + with conn.cursor(cursorclass=MySQLdb.cursors.DictCursor) as cursor: + query = "SELECT Id FROM PublishXRef WHERE InbredSetId=%(population_id)s" + cursor.execute(query, {"population_id": population_id}) + return tuple(int(row["Id"]) for row in cursor.fetchall()) + + +def run(args) -> int: + """Run the script.""" + logger.debug("Running the script!") + with mysqldb.database_connection(args.db_uri) as mariadb_conn: + xref_ids = args.cross_ref_ids or fetch_xref_id(mariadb_conn, args.population_id) + if len(xref_ids): + update_means(mariadb_conn, + args.population_id, + xref_ids) + logger.debug("Successfully computed means for %02d phenotypes.", + len(xref_ids)) + return 0 + _reasons = ( + f"no population exists with the ID {args.population_id}", + "the population exists but it has no phenotypes linked to it yet") + logger.error( + "No cross-reference IDs to run against. Likely causes are: %s", + " OR ".join(_reasons) + ".") + return 1 + + +T = TypeVar("T") +def comma_separated_list(val: str, itemstype: T = str) -> tuple[T, ...]: + """Convert val into a list of items of type 'itemstype'.""" + return tuple(itemstype(item.strip()) for item in val.split(",")) + + +def comma_separated_list_of_integers(val: str) -> tuple[int, ...]: + """Convert 'val' into list of items of type 'int'.""" + return comma_separated_list(val, int) + + +if __name__ == "__main__": + def parse_args() -> Namespace: + """Define and parse the CLI parsers accepted by this script.""" + parser = ArgumentParser( + "compute-phenotype-means", + description="Compute/Recompute the phenotype means.") + parser.add_argument("db_uri", + metavar="db-uri", + type=str, + help="MariaDB/MySQL connection URL") + parser.add_argument("jobs_db_path", + metavar="jobs-db-path", + type=Path, + help="Path to jobs' SQLite database.") + parser.add_argument("population_id", + metavar="population-id", + type=int, + help=("Identifier for the InbredSet group/" + "population to run means against.")) + ## Optional arguments + parser = add_logging_option(parser) + parser.add_argument( + "--cross-ref-ids", + type=comma_separated_list_of_integers, + help=("Provide cross-reference IDs to narrow the number of " + "phenotypes that the means are computed against."), + default=[]) + + return parser.parse_args() + + def main() -> int: + """compute-phenotype-means: Entry-point function.""" + args = parse_args() + logger.setLevel(getattr(logging, args.log_level.upper())) + setup_modules_logging(logger, ("scripts.load_phenotypes_to_db",)) + return run(args) + + sys.exit(main()) diff --git a/scripts/insert_data.py b/scripts/insert_data.py index 67038f8..aec0251 100644 --- a/scripts/insert_data.py +++ b/scripts/insert_data.py @@ -197,7 +197,7 @@ def probeset_ids(dbconn: mdb.Connection, break yield row -def insert_means(# pylint: disable=[too-many-locals, too-many-arguments] +def insert_means(# pylint: disable=[too-many-locals, too-many-arguments, too-many-positional-arguments] filepath: str, speciesid: int, platform_id: int, datasetid: int, dbconn: mdb.Connection, rconn: Redis) -> int: # pylint: disable=[unused-argument] "Insert the means/averages data into the database" @@ -232,7 +232,7 @@ def insert_means(# pylint: disable=[too-many-locals, too-many-arguments] item for sublist in read_datavalues(filepath, headings, strains).values() for item in sublist), - start=(last_data_id(dbconn)+1))) + start=last_data_id(dbconn)+1)) with dbconn.cursor(cursorclass=DictCursor) as cursor: while True: means = tuple(take(the_means, 10000)) @@ -245,7 +245,7 @@ def insert_means(# pylint: disable=[too-many-locals, too-many-arguments] cursor.executemany(xref_query, means) return 0 -def insert_se(# pylint: disable = [too-many-arguments,too-many-locals] +def insert_se(# pylint: disable = [too-many-arguments,too-many-locals, too-many-positional-arguments] filepath: str, speciesid: int, platformid: int, datasetid: int, dbconn: mdb.Connection, rconn: Redis) -> int: # pylint: disable=[unused-argument] "Insert the standard-error data into the database" diff --git a/scripts/insert_samples.py b/scripts/insert_samples.py index 742c4ae..fc029f9 100644 --- a/scripts/insert_samples.py +++ b/scripts/insert_samples.py @@ -34,7 +34,7 @@ class SeparatorAction(argparse.Action): """Process the value passed in.""" setattr(namespace, self.dest, (chr(9) if values == "\\t" else values)) -def insert_samples(conn: mdb.Connection,# pylint: disable=[too-many-arguments] +def insert_samples(conn: mdb.Connection,# pylint: disable=[too-many-arguments, too-many-positional-arguments] rconn: Redis,# pylint: disable=[unused-argument] speciesid: int, populationid: int, @@ -149,7 +149,7 @@ if __name__ == "__main__": args.separator, args.firstlineheading, args.quotechar) - except Exception as _exc: + except Exception as _exc:# pylint: disable=[broad-exception-caught] print(traceback.format_exc(), file=sys.stderr) return status_code diff --git a/scripts/load_phenotypes_to_db.py b/scripts/load_phenotypes_to_db.py index 8f49e10..e449b82 100644 --- a/scripts/load_phenotypes_to_db.py +++ b/scripts/load_phenotypes_to_db.py @@ -1,16 +1,18 @@ +"""Load phenotypes and their data provided in files into the database.""" import sys import uuid import json +import time import logging import argparse import datetime +from typing import Any from pathlib import Path from zipfile import ZipFile -from typing import Any, Union from urllib.parse import urljoin from functools import reduce, partial -from MySQLdb.cursors import Cursor, DictCursor +from MySQLdb.cursors import DictCursor from gn_libs import jobs, mysqldb, sqlite3, monadic_requests as mrequests @@ -23,12 +25,12 @@ from uploader.phenotypes.models import ( save_phenotypes_data, create_new_phenotypes, quick_save_phenotypes_data) -from uploader.publications.models import ( - create_new_publications, - fetch_publication_by_id) +from uploader.publications.models import fetch_publication_by_id from scripts.rqtl2.bundleutils import build_line_joiner, build_line_splitter +from functional_tools import take + logging.basicConfig( format="%(asctime)s — %(filename)s:%(lineno)s — %(levelname)s: %(message)s") logger = logging.getLogger(__name__) @@ -40,17 +42,13 @@ def __replace_na_strings__(line, na_strings): def save_phenotypes( - cursor: mysqldb.Connection, + conn: mysqldb.Connection, control_data: dict[str, Any], + population_id, + publication_id, filesdir: Path ) -> tuple[dict, ...]: """Read `phenofiles` and save the phenotypes therein.""" - ## TODO: Replace with something like this: ## - # phenofiles = control_data["phenocovar"] + control_data.get( - # "gn-metadata", {}).get("pheno", []) - # - # This is meant to load (and merge) data from the "phenocovar" and - # "gn-metadata -> pheno" files into a single collection of phenotypes. phenofiles = tuple(filesdir.joinpath(_file) for _file in control_data["phenocovar"]) if len(phenofiles) <= 0: return tuple() @@ -69,23 +67,20 @@ def save_phenotypes( control_data["sep"], control_data["comment.char"]) return create_new_phenotypes( - cursor, + conn, + population_id, + publication_id, (dict(zip(_headers, __replace_na_strings__(line, control_data["na.strings"]))) for filecontent - in (rqtl2.read_csv_file(path) for path in phenofiles) + in (rqtl2.read_csv_file(path, + separator=control_data["sep"], + comment_char=control_data["comment.char"]) + for path in phenofiles) for idx, line in enumerate(filecontent) if idx != 0)) -def __fetch_next_dataid__(conn: mysqldb.Connection) -> int: - """Fetch the next available DataId value from the database.""" - with conn.cursor(cursorclass=DictCursor) as cursor: - cursor.execute( - "SELECT MAX(DataId) AS CurrentMaxDataId FROM PublishXRef") - return int(cursor.fetchone()["CurrentMaxDataId"]) + 1 - - def __row_to_dataitems__( sample_row: dict, dataidmap: dict, @@ -104,7 +99,6 @@ def __row_to_dataitems__( def __build_dataitems__( - filetype, phenofiles, control_data, samples, @@ -113,7 +107,7 @@ def __build_dataitems__( ): _headers = rqtl2.read_csv_file_headers( phenofiles[0], - control_data[f"{filetype}_transposed"], + False, # Any transposed files have been un-transposed by this point control_data["sep"], control_data["comment.char"]) _filescontents = ( @@ -123,7 +117,7 @@ def __build_dataitems__( for path in phenofiles) _linescontents = ( __row_to_dataitems__( - dict(zip(_headers, + dict(zip(("id",) + _headers[1:], __replace_na_strings__(line, control_data["na.strings"]))), dataidmap, pheno_name2id, @@ -136,7 +130,7 @@ def __build_dataitems__( if item["value"] is not None) -def save_numeric_data( +def save_numeric_data(# pylint: disable=[too-many-positional-arguments,too-many-arguments] conn: mysqldb.Connection, dataidmap: dict, pheno_name2id: dict[str, int], @@ -154,7 +148,7 @@ def save_numeric_data( if control_data[f"{filetype}_transposed"]: logger.info("Undoing transposition of the files rows and columns.") - phenofiles = ( + phenofiles = tuple( rqtl2.transpose_csv_with_rename( _file, build_line_splitter(control_data), @@ -167,22 +161,20 @@ def save_numeric_data( conn, table, __build_dataitems__( - filetype, phenofiles, control_data, samples, dataidmap, pheno_name2id), filesdir) - except Exception as _exc: + except Exception as _exc:# pylint: disable=[broad-exception-caught] logger.debug("Could not use `LOAD … INFILE`, using raw query", exc_info=True) - import time;time.sleep(60) + time.sleep(60) return save_phenotypes_data( conn, table, __build_dataitems__( - filetype, phenofiles, control_data, samples, @@ -205,44 +197,15 @@ save_phenotypes_n = partial(save_numeric_data, table="NStrain") -def cross_reference_phenotypes_publications_and_data( - conn: mysqldb.Connection, xref_data: tuple[dict, ...] -): - """Crossreference the phenotypes, publication and data.""" - with conn.cursor(cursorclass=DictCursor) as cursor: - cursor.execute("SELECT MAX(Id) CurrentMaxId FROM PublishXRef") - _nextid = int(cursor.fetchone()["CurrentMaxId"]) + 1 - _params = tuple({**row, "xref_id": _id} - for _id, row in enumerate(xref_data, start=_nextid)) - cursor.executemany( - ("INSERT INTO PublishXRef(" - "Id, InbredSetId, PhenotypeId, PublicationId, DataId, comments" - ") " - "VALUES (" - "%(xref_id)s, %(population_id)s, %(phenotype_id)s, " - "%(publication_id)s, %(data_id)s, 'Upload of new data.'" - ")"), - _params) - return _params - return tuple() - - -def update_auth(authserver, token, species, population, dataset, xrefdata): +def update_auth(# pylint: disable=[too-many-locals,too-many-positional-arguments,too-many-arguments] + authserver, + token, + species, + population, + dataset, + xrefdata): """Grant the user access to their data.""" - # TODO Call into the auth server to: - # 1. Link the phenotypes with a user group - # - fetch group: http://localhost:8081/auth/user/group - # - link data to group: http://localhost:8081/auth/data/link/phenotype - # - *might need code update in gn-auth: remove restriction, perhaps* - # 2. Create resource (perhaps?) - # - Get resource categories: http://localhost:8081/auth/resource/categories - # - Create a new resource: http://localhost:80host:8081/auth/resource/create - # - single resource for all phenotypes - # - resource name from user, species, population, dataset, datetime? - # - User will have "ownership" of resource by default - # 3. Link data to the resource: http://localhost:8081/auth/resource/data/link - # - Update code to allow linking multiple items in a single request - _tries = 0 # TODO use this to limit how many tries before quiting and bailing + _tries = 0 _delay = 1 headers = { "Authorization": f"Bearer {token}", @@ -320,13 +283,21 @@ def update_auth(authserver, token, species, population, dataset, xrefdata): }).then(lambda attc: (user, linkeddata, resource, attc)) def __handle_error__(resp): + error = resp.json() + if error.get("error") == "IntegrityError": + # This is hacky. If the auth already exists, something went wrong + # somewhere. + # This needs investigation to recover correctly. + logger.info( + "The authorisation for the data was already set up.") + return 0 logger.error("ERROR: Updating the authorisation for the data failed.") logger.debug( "ERROR: The response from the authorisation server was:\n\t%s", - resp.json()) + error) return 1 - def __handle_success__(val): + def __handle_success__(_val): logger.info( "The authorisation for the data has been updated successfully.") return 0 @@ -340,7 +311,7 @@ def update_auth(authserver, token, species, population, dataset, xrefdata): ).either(__handle_error__, __handle_success__) -def load_data(conn: mysqldb.Connection, job: dict) -> int: +def load_data(conn: mysqldb.Connection, job: dict) -> int:#pylint: disable=[too-many-locals] """Load the data attached in the given job.""" _job_metadata = job["metadata"] # Steps @@ -368,25 +339,35 @@ def load_data(conn: mysqldb.Connection, job: dict) -> int: with ZipFile(str(bundle), "r") as zfile: _files = rqtl2.extract(zfile, _outdir) logger.info("Saving new phenotypes.") - _phenos = save_phenotypes(conn, _control_data, _outdir) - def __build_phenos_maps__(accumulator, current): - dataid, row = current + _phenos = save_phenotypes(conn, + _control_data, + _population["Id"], + _publication["Id"], + _outdir) + + def __build_phenos_maps__(accumulator, row): return ({ **accumulator[0], row["phenotype_id"]: { "population_id": _population["Id"], "phenotype_id": row["phenotype_id"], - "data_id": dataid, - "publication_id": _publication["Id"], + "data_id": row["data_id"], + "publication_id": row["publication_id"], } }, { **accumulator[1], - row["id"]: row["phenotype_id"] - }) - dataidmap, pheno_name2id = reduce( - __build_phenos_maps__, - enumerate(_phenos, start=__fetch_next_dataid__(conn)), - ({},{})) + row["pre_publication_abbreviation"]: row["phenotype_id"] + }, ( + accumulator[2] + ({ + "xref_id": row["xref_id"], + "population_id": row["population_id"], + "phenotype_id": row["phenotype_id"], + "publication_id": row["publication_id"], + "data_id": row["data_id"] + },))) + dataidmap, pheno_name2id, _xrefs = reduce(__build_phenos_maps__, + _phenos, + ({},{}, tuple())) # 3. a. Fetch the strain names and IDS: create name->ID map samples = { row["Name"]: row @@ -401,32 +382,60 @@ def load_data(conn: mysqldb.Connection, job: dict) -> int: control_data=_control_data, filesdir=_outdir) logger.info("Saved %s new phenotype data rows.", _num_data_rows) - # 4. Cross-reference Phenotype, Publication, and PublishData in PublishXRef - logger.info("Cross-referencing new phenotypes to their data and publications.") - _xrefs = cross_reference_phenotypes_publications_and_data( - conn, tuple(dataidmap.values())) - # 5. If standard errors and N exist, save them too + + # 4. If standard errors and N exist, save them too # (use IDs returned in `3. b.` above). - logger.info("Saving new phenotypes standard errors.") - _num_se_rows = save_phenotypes_se(conn=conn, - dataidmap=dataidmap, - pheno_name2id=pheno_name2id, - samples=samples, - control_data=_control_data, - filesdir=_outdir) - logger.info("Saved %s new phenotype standard error rows.", _num_se_rows) - - logger.info("Saving new phenotypes sample counts.") - _num_n_rows = save_phenotypes_n(conn=conn, - dataidmap=dataidmap, - pheno_name2id=pheno_name2id, - samples=samples, - control_data=_control_data, - filesdir=_outdir) - logger.info("Saved %s new phenotype sample counts rows.", _num_n_rows) + if _control_data.get("phenose"): + logger.info("Saving new phenotypes standard errors.") + _num_se_rows = save_phenotypes_se(conn=conn, + dataidmap=dataidmap, + pheno_name2id=pheno_name2id, + samples=samples, + control_data=_control_data, + filesdir=_outdir) + logger.info("Saved %s new phenotype standard error rows.", _num_se_rows) + + if _control_data.get("phenonum"): + logger.info("Saving new phenotypes sample counts.") + _num_n_rows = save_phenotypes_n(conn=conn, + dataidmap=dataidmap, + pheno_name2id=pheno_name2id, + samples=samples, + control_data=_control_data, + filesdir=_outdir) + logger.info("Saved %s new phenotype sample counts rows.", _num_n_rows) + return (_species, _population, _dataset, _xrefs) +def update_means( + conn: mysqldb.Connection, + population_id: int, + xref_ids: tuple[int, ...] +): + """Compute the means from the data and update them in the database.""" + logger.info("Computing means for %02d phenotypes.", len(xref_ids)) + query = ( + "UPDATE PublishXRef SET mean = " + "(SELECT AVG(value) FROM PublishData" + " WHERE PublishData.Id=PublishXRef.DataId) " + "WHERE PublishXRef.Id=%(xref_id)s " + "AND PublishXRef.InbredSetId=%(population_id)s") + _xref_iterator = (_xref_id for _xref_id in xref_ids) + with conn.cursor(cursorclass=DictCursor) as cursor: + while True: + batch = take(_xref_iterator, 10000) + if len(batch) == 0: + break + logger.info("\tComputing means for batch of %02d phenotypes.", len(batch)) + cursor.executemany( + query, + tuple({ + "population_id": population_id, + "xref_id": _xref_id + } for _xref_id in batch)) + + if __name__ == "__main__": def parse_args(): """Setup command-line arguments.""" @@ -490,10 +499,19 @@ if __name__ == "__main__": f"{_table} WRITE" for _table in _db_tables_)) db_results = load_data(conn, job) + _xref_ids = tuple(xref["xref_id"] for xref in db_results[3]) + jobs.update_metadata( + jobs_conn, + args.job_id, + "xref_ids", + json.dumps(_xref_ids)) logger.info("Unlocking all database tables.") cursor.execute("UNLOCK TABLES") + logger.info("Updating means.") + update_means(conn, db_results[1]["Id"], _xref_ids) + # Update authorisations (break this down) — maybe loop until it works? logger.info("Updating authorisation.") _job_metadata = job["metadata"] @@ -504,7 +522,7 @@ if __name__ == "__main__": try: sys.exit(main()) - except Exception as _exc: + except Exception as _exc:# pylint: disable=[broad-exception-caught] logger.debug("Data loading failed… Halting!", exc_info=True) sys.exit(1) diff --git a/scripts/phenotypes_bulk_edit.py b/scripts/phenotypes_bulk_edit.py deleted file mode 100644 index cee5f4e..0000000 --- a/scripts/phenotypes_bulk_edit.py +++ /dev/null @@ -1,266 +0,0 @@ -import sys -import uuid -import logging -import argparse -from pathlib import Path -from typing import Iterator -from functools import reduce - -from MySQLdb.cursors import DictCursor - -from gn_libs import jobs, mysqldb, sqlite3 - -from uploader.phenotypes.models import phenotypes_data_by_ids -from uploader.phenotypes.misc import phenotypes_data_differences -from uploader.phenotypes.views import BULK_EDIT_COMMON_FIELDNAMES - -import uploader.publications.pubmed as pmed -from uploader.publications.misc import publications_differences -from uploader.publications.models import ( - update_publications, fetch_phenotype_publications) - -logging.basicConfig( - format="%(asctime)s — %(filename)s:%(lineno)s — %(levelname)s: %(message)s") -logger = logging.getLogger(__name__) - - -def check_ids(conn, ids: tuple[tuple[int, int], ...]) -> bool: - """Verify that all the `UniqueIdentifier` values are valid.""" - logger.info("Checking the 'UniqueIdentifier' values.") - with conn.cursor(cursorclass=DictCursor) as cursor: - paramstr = ",".join(["(%s, %s)"] * len(ids)) - cursor.execute( - "SELECT PhenotypeId AS phenotype_id, Id AS xref_id " - "FROM PublishXRef " - f"WHERE (PhenotypeId, Id) IN ({paramstr})", - tuple(item for row in ids for item in row)) - mysqldb.debug_query(cursor, logger) - found = tuple((row["phenotype_id"], row["xref_id"]) - for row in cursor.fetchall()) - - not_found = tuple(item for item in ids if item not in found) - if len(not_found) == 0: - logger.info("All 'UniqueIdentifier' are valid.") - return True - - for item in not_found: - logger.error(f"Invalid 'UniqueIdentifier' value: phId:%s::xrId:%s", item[0], item[1]) - - return False - - -def check_for_mandatory_fields(): - """Verify that mandatory fields have values.""" - pass - - -def __fetch_phenotypes__(conn, ids: tuple[int, ...]) -> tuple[dict, ...]: - """Fetch basic (non-numeric) phenotypes data from the database.""" - with conn.cursor(cursorclass=DictCursor) as cursor: - paramstr = ",".join(["%s"] * len(ids)) - cursor.execute(f"SELECT * FROM Phenotype WHERE Id IN ({paramstr}) " - "ORDER BY Id ASC", - ids) - return tuple(dict(row) for row in cursor.fetchall()) - - -def descriptions_differences(file_data, db_data) -> dict[str, str]: - """Compute differences in the descriptions.""" - logger.info("Computing differences in phenotype descriptions.") - assert len(file_data) == len(db_data), "The counts of phenotypes differ!" - description_columns = ("Pre_publication_description", - "Post_publication_description", - "Original_description", - "Pre_publication_abbreviation", - "Post_publication_abbreviation") - diff = tuple() - for file_row, db_row in zip(file_data, db_data): - assert file_row["phenotype_id"] == db_row["Id"] - inner_diff = { - key: file_row[key] - for key in description_columns - if not file_row[key] == db_row[key] - } - if bool(inner_diff): - diff = diff + ({ - "phenotype_id": file_row["phenotype_id"], - **inner_diff - },) - - return diff - - -def update_descriptions(): - """Update descriptions in the database""" - logger.info("Updating descriptions") - # Compute differences between db data and uploaded file - # Only run query for changed descriptions - pass - - -def link_publications(): - """Link phenotypes to relevant publications.""" - logger.info("Linking phenotypes to publications.") - # Create publication if PubMed_ID doesn't exist in db - pass - - -def update_values(): - """Update the phenotype values.""" - logger.info("Updating phenotypes values.") - # Compute differences between db data and uploaded file - # Only run query for changed data - pass - - -def parse_args(): - parser = argparse.ArgumentParser( - prog="Phenotypes Bulk-Edit Processor", - description="Process the bulk-edits to phenotype data and descriptions.") - parser.add_argument("db_uri", type=str, help="MariaDB/MySQL connection URL") - parser.add_argument( - "jobs_db_path", type=Path, help="Path to jobs' SQLite database.") - parser.add_argument("job_id", type=uuid.UUID, help="ID of the running job") - parser.add_argument( - "--log-level", - type=str, - help="Determines what is logged out.", - choices=("debug", "info", "warning", "error", "critical"), - default="info") - return parser.parse_args() - - -def read_file(filepath: Path) -> Iterator[str]: - """Read the file, one line at a time.""" - with filepath.open(mode="r", encoding="utf-8") as infile: - count = 0 - headers = None - for line in infile: - if line.startswith("#"): # ignore comments - continue; - - fields = line.strip().split("\t") - if count == 0: - headers = fields - count = count + 1 - continue - - _dict = dict(zip( - headers, - ((None if item.strip() == "" else item.strip()) - for item in fields))) - _pheno, _xref = _dict.pop("UniqueIdentifier").split("::") - _dict = { - key: ((float(val) if bool(val) else val) - if key not in BULK_EDIT_COMMON_FIELDNAMES - else val) - for key, val in _dict.items() - } - _dict["phenotype_id"] = int(_pheno.split(":")[1]) - _dict["xref_id"] = int(_xref.split(":")[1]) - if _dict["PubMed_ID"] is not None: - _dict["PubMed_ID"] = int(_dict["PubMed_ID"]) - - yield _dict - count = count + 1 - - -def run(conn, job): - """Process the data and update it.""" - file_contents = tuple(sorted(read_file(Path(job["metadata"]["edit-file"])), - key=lambda item: item["phenotype_id"])) - pheno_ids, pheno_xref_ids, pubmed_ids = reduce( - lambda coll, curr: ( - coll[0] + (curr["phenotype_id"],), - coll[1] + ((curr["phenotype_id"], curr["xref_id"]),), - coll[2].union(set([curr["PubMed_ID"]]))), - file_contents, - (tuple(), tuple(), set([None]))) - check_ids(conn, pheno_xref_ids) - check_for_mandatory_fields() - # stop running here if any errors are found. - - ### Compute differences - logger.info("Computing differences.") - # 1. Basic Phenotype data differences - # a. Descriptions differences - _desc_diff = descriptions_differences( - file_contents, __fetch_phenotypes__(conn, pheno_ids)) - logger.debug("DESCRIPTIONS DIFFERENCES: %s", _desc_diff) - - # b. Publications differences - _db_publications = fetch_phenotype_publications(conn, pheno_xref_ids) - logger.debug("DB PUBLICATIONS: %s", _db_publications) - - _pubmed_map = { - (int(row["PubMed_ID"]) if bool(row["PubMed_ID"]) else None): f"{row['phenotype_id']}::{row['xref_id']}" - for row in file_contents - } - _pub_id_map = { - f"{pub['PhenotypeId']}::{pub['xref_id']}": pub["PublicationId"] - for pub in _db_publications - } - - _new_publications = update_publications( - conn, tuple({ - **pub, "publication_id": _pub_id_map[_pubmed_map[pub["pubmed_id"]]] - } for pub in pmed.fetch_publications(tuple( - pubmed_id for pubmed_id in pubmed_ids - if pubmed_id not in - tuple(row["PubMed_ID"] for row in _db_publications))))) - _pub_diff = publications_differences( - file_contents, _db_publications, { - row["PubMed_ID" if "PubMed_ID" in row else "pubmed_id"]: row[ - "PublicationId" if "PublicationId" in row else "publication_id"] - for row in _db_publications + _new_publications}) - logger.debug("Publications diff: %s", _pub_diff) - # 2. Data differences - _db_pheno_data = phenotypes_data_by_ids(conn, tuple({ - "population_id": job["metadata"]["population-id"], - "phenoid": row[0], - "xref_id": row[1] - } for row in pheno_xref_ids)) - - data_diff = phenotypes_data_differences( - ({ - "phenotype_id": row["phenotype_id"], - "xref_id": row["xref_id"], - "data": { - key:val for key,val in row.items() - if key not in BULK_EDIT_COMMON_FIELDNAMES + [ - "phenotype_id", "xref_id"] - } - } for row in file_contents), - ({ - **row, - "PhenotypeId": row["Id"], - "data": { - dataitem["StrainName"]: dataitem - for dataitem in row["data"].values() - } - } for row in _db_pheno_data)) - logger.debug("Data differences: %s", data_diff) - ### END: Compute differences - update_descriptions() - link_publications() - update_values() - return 0 - - -def main(): - """Entry-point for this script.""" - args = parse_args() - logger.setLevel(args.log_level.upper()) - logger.debug("Arguments: %s", args) - - logging.getLogger("uploader.phenotypes.misc").setLevel(args.log_level.upper()) - logging.getLogger("uploader.phenotypes.models").setLevel(args.log_level.upper()) - logging.getLogger("uploader.publications.models").setLevel(args.log_level.upper()) - - with (mysqldb.database_connection(args.db_uri) as conn, - sqlite3.connection(args.jobs_db_path) as jobs_conn): - return run(conn, jobs.job(jobs_conn, args.job_id)) - - -if __name__ == "__main__": - sys.exit(main()) diff --git a/scripts/process_rqtl2_bundle.py b/scripts/process_rqtl2_bundle.py index 8b7a0fb..e2ce420 100644 --- a/scripts/process_rqtl2_bundle.py +++ b/scripts/process_rqtl2_bundle.py @@ -104,7 +104,7 @@ def process_bundle(dbconn: mdb.Connection, rqtl2bundle=Path(meta["rqtl2-bundle-file"])), logger) if genoexit != 0: - raise Exception("Processing 'geno' file failed.") + raise Exception("Processing 'geno' file failed.")# pylint: disable=[broad-exception-raised] logger.debug( "geno file processing completed successfully. (ExitCode: %s)", genoexit) @@ -122,7 +122,7 @@ def process_bundle(dbconn: mdb.Connection, rqtl2bundle=Path(meta["rqtl2-bundle-file"])), logger) if phenoexit != 0: - raise Exception("Processing 'pheno' file failed.") + raise Exception("Processing 'pheno' file failed.")# pylint: disable=[broad-exception-raised] logger.debug( "pheno file processing completed successfully. (ExitCode: %s)", phenoexit) diff --git a/scripts/qc_on_rqtl2_bundle.py b/scripts/qc_on_rqtl2_bundle.py index 9f9248c..0207938 100644 --- a/scripts/qc_on_rqtl2_bundle.py +++ b/scripts/qc_on_rqtl2_bundle.py @@ -191,7 +191,7 @@ def check_pheno_samples( return allerrors -def qc_pheno_errors(# pylint: disable=[too-many-arguments] +def qc_pheno_errors(# pylint: disable=[too-many-arguments, too-many-positional-arguments] rconn, fqjobid, dburi, speciesid, zfile, logger) -> bool: """Check for errors in `pheno` file(s).""" cdata = rqtl2.control_data(zfile) @@ -260,7 +260,7 @@ def run_qc(rconn: Redis, if qc_missing_files(rconn, fqjobid, zfile, logger): return 1 - def with_zipfile(# pylint: disable=[too-many-arguments] + def with_zipfile(# pylint: disable=[too-many-arguments,too-many-positional-arguments] rconn, fqjobid, dbconn, speciesid, filename, logger, func ): with ZipFile(filename, "r") as zfile: diff --git a/scripts/redis_logger.py b/scripts/redis_logger.py index d3fde5f..a74e5e4 100644 --- a/scripts/redis_logger.py +++ b/scripts/redis_logger.py @@ -6,7 +6,7 @@ from redis import Redis class RedisLogger(logging.Handler): """Log out to redis for our worker scripts""" - def __init__(self,#pylint: disable=[too-many-arguments] + def __init__(self,#pylint: disable=[too-many-arguments, too-many-positional-arguments] rconn: Redis, fullyqualifiedjobid: str, messageslistname: str, diff --git a/scripts/rqtl2/phenotypes_qc.py b/scripts/rqtl2/phenotypes_qc.py index 699c291..9f11f57 100644 --- a/scripts/rqtl2/phenotypes_qc.py +++ b/scripts/rqtl2/phenotypes_qc.py @@ -41,7 +41,10 @@ logging.basicConfig( "(%(pathname)s: %(lineno)d) %(message)s")) logger = logging.getLogger(__MODULE__) -def validate(phenobundle: Path, logger: Logger) -> dict: +def validate( + phenobundle: Path, + logger: Logger# pylint: disable=[redefined-outer-name] +) -> dict: """Check that the bundle is generally valid""" try: rqc.validate_bundle(phenobundle) @@ -63,7 +66,7 @@ def validate(phenobundle: Path, logger: Logger) -> dict: def check_for_mandatory_pheno_keys( phenobundle: Path, - logger: Logger, + logger: Logger,# pylint: disable=[redefined-outer-name] **kwargs ) -> dict: """Check that the mandatory keys exist for phenotypes.""" @@ -90,7 +93,7 @@ def check_for_mandatory_pheno_keys( def check_for_averages_files( phenobundle: Path, - logger: Logger, + logger: Logger,# pylint: disable=[redefined-outer-name] **kwargs ) -> dict: """Check that averages files appear together""" @@ -144,15 +147,15 @@ def redis_logger( ) -> Iterator[logging.Logger]: """Build a Redis message-list logger.""" rconn = Redis.from_url(redisuri, decode_responses=True) - logger = logging.getLogger(loggername) - logger.propagate = False + _logger = logging.getLogger(loggername) + _logger.propagate = False handler = RedisMessageListHandler( rconn, fullyqualifiedkey(fqkey, filename))#type: ignore[arg-type] handler.setFormatter(logging.getLogger().handlers[0].formatter) - logger.addHandler(handler) + _logger.addHandler(handler) try: - yield logger + yield _logger finally: rconn.close() @@ -179,7 +182,7 @@ def qc_phenocovar_file( redisuri, f"{__MODULE__}.qc_phenocovar_file", filepath.name, - f"{fqkey}:logs") as logger, + f"{fqkey}:logs") as _logger, Redis.from_url(redisuri, decode_responses=True) as rconn): print("Running QC on file: ", filepath.name) _csvfile = rqtl2.read_csv_file(filepath, separator, comment_char) @@ -199,7 +202,7 @@ def qc_phenocovar_file( def collect_errors(errors_and_linecount, line): _errs, _lc = errors_and_linecount - logger.info("Testing record '%s'", line[0]) + _logger.info("Testing record '%s'", line[0]) if len(line) != len(_headings): _errs = _errs + (save_error(InvalidValue( filepath.name, @@ -209,12 +212,12 @@ def qc_phenocovar_file( (f"Record {_lc} in file {filepath.name} has a different " "number of columns than the number of headings"))),) _line = dict(zip(_headings, line)) - if not bool(_line["description"]): + if not bool(_line.get("description")): _errs = _errs + ( save_error(InvalidValue(filepath.name, _line[_headings[0]], "description", - _line["description"], + _line.get("description"), "The description is not provided!")),) rconn.hset(file_fqkey(fqkey, "metadata", filepath), @@ -240,7 +243,7 @@ def merge_dicts(*dicts): return reduce(lambda merged, dct: {**merged, **dct}, dicts, {}) -def decimal_points_error(# pylint: disable=[too-many-arguments] +def decimal_points_error(# pylint: disable=[too-many-arguments,too-many-positional-arguments] filename: str, rowtitle: str, coltitle: str, @@ -271,7 +274,7 @@ def integer_error( return InvalidValue(filename, rowtitle, coltitle, cellvalue, message) -def qc_pheno_file(# pylint: disable=[too-many-locals, too-many-arguments] +def qc_pheno_file(# pylint: disable=[too-many-locals, too-many-arguments, too-many-positional-arguments] filepath: Path, redisuri: str, fqkey: str, @@ -287,7 +290,7 @@ def qc_pheno_file(# pylint: disable=[too-many-locals, too-many-arguments] redisuri, f"{__MODULE__}.qc_pheno_file", filepath.name, - f"{fqkey}:logs") as logger, + f"{fqkey}:logs") as _logger, Redis.from_url(redisuri, decode_responses=True) as rconn): print("Running QC on file: ", filepath.name) save_error = partial( @@ -314,7 +317,7 @@ def qc_pheno_file(# pylint: disable=[too-many-locals, too-many-arguments] def collect_errors(errors_and_linecount, line): _errs, _lc = errors_and_linecount - logger.debug("Checking row %s", line[0]) + _logger.debug("Checking row %s", line[0]) if line[0] not in samples: _errs = _errs + (save_error(InvalidValue( filepath.name, diff --git a/scripts/run_qtlreaper.py b/scripts/run_qtlreaper.py new file mode 100644 index 0000000..ab58203 --- /dev/null +++ b/scripts/run_qtlreaper.py @@ -0,0 +1,211 @@ +"""Script to run rust-qtlreaper and update database with results.""" +import sys +import csv +import time +import secrets +import logging +import traceback +import subprocess +from pathlib import Path +from typing import Union +from functools import reduce +from argparse import Namespace, ArgumentParser + +from gn_libs import mysqldb + +from uploader.phenotypes.models import phenotypes_vector_data +from uploader.population.models import population_by_species_and_id +from uploader.samples.models import samples_by_species_and_population + +from scripts.cli.logging import setup_logging +from scripts.cli.validators import directory_exists +from scripts.cli.options import add_logging, add_mariadb_uri, add_population_id + +logger = logging.getLogger(__name__) + + +def retrieve_genotype_file(genotypes_dir: Path, population_code: str) -> Path: + """Retrieves the genotype file""" + _genofile = genotypes_dir.joinpath(f"{population_code}.geno") + if _genofile.exists(): + return _genofile + raise FileNotFoundError(f"Could not find the genotype file '{population_code}.geno'") + + +def samples_from_genofile(genofile: Path) -> tuple[str, ...]: + """Read samples from the genotype file.""" + with genofile.open(mode="r", encoding="utf-8") as inptr: + while True: + line = inptr.readline() + if (line.startswith("#") # comment line + or line.startswith("@") # allele? heterozygosity? + or line.strip() == "" # empty line + ): + continue + return tuple(line.strip().split("\t")[4:]) + + +def reconcile_samples( + genosamples: tuple[str, ...], + dbsamples: tuple[str, ...] +) -> tuple[tuple[str, ...], tuple[str, ...]]: + """merge samples in genosamples and dbsamples and retain order in genosamples.""" + in_db_not_geno = set(dbsamples).difference(genosamples) + return genosamples, tuple(in_db_not_geno) + + +def generate_qtlreaper_traits_file( + outdir: Path, + samples: tuple[str, ...], + traits_data: dict[str, Union[int, float]], + filename_prefix: str = "" +) -> Path: + """Generate a file for use with qtlreaper that contains the traits' data.""" + _dialect = csv.unix_dialect() + _dialect.delimiter="\t" + _dialect.quoting=0 + + _traitsfile = outdir.joinpath( + f"{filename_prefix}_{secrets.token_urlsafe(15)}.tsv") + with _traitsfile.open(mode="w", encoding="utf-8") as outptr: + writer = csv.DictWriter( + outptr, fieldnames=("Trait",) + samples, dialect=_dialect) + writer.writeheader() + for row in traits_data: + writer.writerow({ + "Trait": row["xref_id"], + **{sample: row.get(sample, "") for sample in samples} + }) + + return _traitsfile + + +def parse_tsv_file(results_file: Path) -> list[dict]: + """Parse the rust-qtlreaper output into usable python objects.""" + with results_file.open("r", encoding="utf-8") as readptr: + _dialect = csv.unix_dialect() + _dialect.delimiter = "\t" + reader = csv.DictReader(readptr, dialect=_dialect) + for row in reader: + yield row + + +def __qtls_by_trait__(qtls, current): + """Organise QTL results by trait""" + return { + **qtls, + current["ID"]: qtls.get(current["ID"], tuple()) + (current,) + } + + +def save_qtl_values_to_db(conn, qtls: dict): + with conn.cursor() as cursor: + cursor.executemany( + "UPDATE PublishXRef SET " + "Locus=%(Locus)s, LRS=%(LRS)s, additive=%(Additive)s " + "WHERE Id=%(ID)s", + qtls) + + +def dispatch(args: Namespace) -> int: + """Dispatch the actual logic.""" + exitcode = 1 + with mysqldb.database_connection(args.db_uri) as conn: + try: + population = population_by_species_and_id(conn, args.species_id, args.population_id) + assert population, (f"No population with ID '{args.population_id} for " + f"species with ID '{args.species_id}'.") + _genofile = retrieve_genotype_file(args.genotypes_dir, population["Name"]) + logger.debug("Genotype file: %s", _genofile) + samples, _samples_not_in_genofile = reconcile_samples( + samples_from_genofile(_genofile), + tuple( + sample["Name"] for sample in + samples_by_species_and_population( + conn, args.species_id, args.population_id))) + if len(_samples_not_in_genofile) > 0: + logger.warning( + "Ignoring %d samples that are in the database but not in " + "the provided genotype file.", + len(_samples_not_in_genofile)) + logger.debug("Ignored the following samples: %s", + ", ".join(_samples_not_in_genofile)) + + # Fetch traits data: provided list, or all traits in db + _traitsdata = phenotypes_vector_data( + conn, + args.species_id, + args.population_id, + xref_ids=tuple(args.xref_ids)).values() + logger.debug("Successfully got traits data. Generating the QTLReaper's traits file…") + _traitsfile = generate_qtlreaper_traits_file( + args.working_dir, + samples, + _traitsdata, + filename_prefix="qtlreaper_input_traits_file") + logger.debug("QTLReaper's Traits file: %s", _traitsfile) + + _qtlreaper_main_output = args.working_dir.joinpath( + f"main-output-{secrets.token_urlsafe(15)}.tsv") + logger.debug("Main output filename: %s", _qtlreaper_main_output) + with subprocess.Popen( + ("qtlreaper", + "--n_permutations", "1000", + "--geno", _genofile, + "--traits", _traitsfile, + "--main_output", _qtlreaper_main_output)) as _qtlreaper: + while _qtlreaper.poll() is None: + logger.debug("QTLReaper process running…") + time.sleep(1) + results = tuple(max(qtls, key=lambda qtl: qtl["LRS"]) + for qtls in + reduce(__qtls_by_trait__, + parse_tsv_file(_qtlreaper_main_output), + {}).values()) + save_qtl_values_to_db(conn, results) + logger.debug("Cleaning up temporary files.") + _traitsfile.unlink() + _qtlreaper_main_output.unlink() + logger.info("Successfully computed p values for %s traits.", len(_traitsdata)) + exitcode = 0 + except FileNotFoundError as fnf: + logger.error(", ".join(fnf.args), exc_info=False) + except AssertionError as aserr: + logger.error(", ".join(aserr.args), exc_info=False) + except Exception as _exc: + logger.debug("Type of exception: %s", type(_exc)) + logger.error("General exception!", exc_info=True) + finally: + return exitcode + + +if __name__ == "__main__": + def main(): + """run_qtlreaper.py: entry point.""" + parser = add_logging(add_population_id(add_mariadb_uri( + ArgumentParser("run_qtlreaper")))) + parser.add_argument( + "genotypes_dir", + metavar="GENOTYPES-DIRECTORY", + type=directory_exists, + help="Path to directory with the genotypes.") + parser.add_argument( + "working_dir", + metavar="WORKING-DIRECTORY", + type=directory_exists, + help="Directory where the script will write temporary files.") + parser.add_argument( + "xref_ids", + metavar="CROSS-REFERENCE-IDS", + type=int, + nargs="*", + help=("Optional list of specific cross-reference IDs to narrow down" + " to. If provided, QTLReaper will only run against them. " + "If NOT provided, QTLReaper will run against all the traits " + "in the population.")) + args = parser.parse_args() + setup_logging(logger, args.log_level) + + return dispatch(args) + + sys.exit(main()) diff --git a/scripts/worker.py b/scripts/worker.py index 91b0332..3165fe7 100644 --- a/scripts/worker.py +++ b/scripts/worker.py @@ -79,7 +79,7 @@ def main(): fqjobid = jobs.job_key(args.redisprefix, args.jobid) rconn.hset(fqjobid, "stderr", f"No such job. '{args.job_id}'.") rconn.expire(name=jobs.job_key(args.redisprefix, args.job_id), - time=timedelta(seconds=(2 * 60 * 60))) + time=timedelta(seconds=2 * 60 * 60)) print(f"No such job. '{args.job_id}'.", file=sys.stderr) return 2 return 3 |
