diff options
Diffstat (limited to 'scripts')
-rw-r--r-- | scripts/cli_parser.py | 3 | ||||
-rw-r--r-- | scripts/insert_data.py | 6 | ||||
-rw-r--r-- | scripts/insert_samples.py | 28 | ||||
-rw-r--r-- | scripts/load_phenotypes_to_db.py | 519 | ||||
-rw-r--r-- | scripts/process_rqtl2_bundle.py | 4 | ||||
-rw-r--r-- | scripts/qc_on_rqtl2_bundle.py | 4 | ||||
-rw-r--r-- | scripts/qc_on_rqtl2_bundle2.py | 346 | ||||
-rw-r--r-- | scripts/redis_logger.py | 2 | ||||
-rw-r--r-- | scripts/rqtl2/entry.py | 30 | ||||
-rw-r--r-- | scripts/rqtl2/phenotypes_qc.py | 65 | ||||
-rw-r--r-- | scripts/worker.py | 2 |
11 files changed, 599 insertions, 410 deletions
diff --git a/scripts/cli_parser.py b/scripts/cli_parser.py index d42ae66..0c91c5e 100644 --- a/scripts/cli_parser.py +++ b/scripts/cli_parser.py @@ -23,7 +23,8 @@ def init_cli_parser(program: str, description: Optional[str] = None) -> Argument "--loglevel", type=str, default="INFO", - choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", + "debug", "info", "warning", "error", "critical"], help="The severity of events to track with the logger.") return parser diff --git a/scripts/insert_data.py b/scripts/insert_data.py index 67038f8..aec0251 100644 --- a/scripts/insert_data.py +++ b/scripts/insert_data.py @@ -197,7 +197,7 @@ def probeset_ids(dbconn: mdb.Connection, break yield row -def insert_means(# pylint: disable=[too-many-locals, too-many-arguments] +def insert_means(# pylint: disable=[too-many-locals, too-many-arguments, too-many-positional-arguments] filepath: str, speciesid: int, platform_id: int, datasetid: int, dbconn: mdb.Connection, rconn: Redis) -> int: # pylint: disable=[unused-argument] "Insert the means/averages data into the database" @@ -232,7 +232,7 @@ def insert_means(# pylint: disable=[too-many-locals, too-many-arguments] item for sublist in read_datavalues(filepath, headings, strains).values() for item in sublist), - start=(last_data_id(dbconn)+1))) + start=last_data_id(dbconn)+1)) with dbconn.cursor(cursorclass=DictCursor) as cursor: while True: means = tuple(take(the_means, 10000)) @@ -245,7 +245,7 @@ def insert_means(# pylint: disable=[too-many-locals, too-many-arguments] cursor.executemany(xref_query, means) return 0 -def insert_se(# pylint: disable = [too-many-arguments,too-many-locals] +def insert_se(# pylint: disable = [too-many-arguments,too-many-locals, too-many-positional-arguments] filepath: str, speciesid: int, platformid: int, datasetid: int, dbconn: mdb.Connection, rconn: Redis) -> int: # pylint: disable=[unused-argument] "Insert the standard-error data into the database" diff --git a/scripts/insert_samples.py b/scripts/insert_samples.py index 1b0a052..fc029f9 100644 --- a/scripts/insert_samples.py +++ b/scripts/insert_samples.py @@ -3,6 +3,7 @@ import sys import logging import pathlib import argparse +import traceback import MySQLdb as mdb from redis import Redis @@ -33,7 +34,7 @@ class SeparatorAction(argparse.Action): """Process the value passed in.""" setattr(namespace, self.dest, (chr(9) if values == "\\t" else values)) -def insert_samples(conn: mdb.Connection,# pylint: disable=[too-many-arguments] +def insert_samples(conn: mdb.Connection,# pylint: disable=[too-many-arguments, too-many-positional-arguments] rconn: Redis,# pylint: disable=[unused-argument] speciesid: int, populationid: int, @@ -73,6 +74,7 @@ def insert_samples(conn: mdb.Connection,# pylint: disable=[too-many-arguments] print("Samples upload successfully completed.") return 0 + if __name__ == "__main__": def cli_args(): @@ -127,7 +129,7 @@ if __name__ == "__main__": def main(): """Run script to insert samples into the database.""" - + status_code = 1 # Exit with an Exception args = cli_args() check_db(args.databaseuri) check_redis(args.redisuri) @@ -137,13 +139,19 @@ if __name__ == "__main__": with (Redis.from_url(args.redisuri, decode_responses=True) as rconn, database_connection(args.databaseuri) as dbconn): - return insert_samples(dbconn, - rconn, - args.speciesid, - args.populationid, - args.samplesfile, - args.separator, - args.firstlineheading, - args.quotechar) + + try: + status_code = insert_samples(dbconn, + rconn, + args.speciesid, + args.populationid, + args.samplesfile, + args.separator, + args.firstlineheading, + args.quotechar) + except Exception as _exc:# pylint: disable=[broad-exception-caught] + print(traceback.format_exc(), file=sys.stderr) + + return status_code sys.exit(main()) diff --git a/scripts/load_phenotypes_to_db.py b/scripts/load_phenotypes_to_db.py new file mode 100644 index 0000000..c1a7687 --- /dev/null +++ b/scripts/load_phenotypes_to_db.py @@ -0,0 +1,519 @@ +"""Load phenotypes and their data provided in files into the database.""" +import sys +import uuid +import json +import time +import logging +import argparse +import datetime +from typing import Any +from pathlib import Path +from zipfile import ZipFile +from urllib.parse import urljoin +from functools import reduce, partial + +from MySQLdb.cursors import DictCursor + +from gn_libs import jobs, mysqldb, sqlite3, monadic_requests as mrequests + +from r_qtl import r_qtl2 as rqtl2 +from uploader.species.models import species_by_id +from uploader.population.models import population_by_species_and_id +from uploader.samples.models import samples_by_species_and_population +from uploader.phenotypes.models import ( + dataset_by_id, + save_phenotypes_data, + create_new_phenotypes, + quick_save_phenotypes_data) +from uploader.publications.models import fetch_publication_by_id + +from scripts.rqtl2.bundleutils import build_line_joiner, build_line_splitter + +logging.basicConfig( + format="%(asctime)s — %(filename)s:%(lineno)s — %(levelname)s: %(message)s") +logger = logging.getLogger(__name__) + + + +def __replace_na_strings__(line, na_strings): + return ((None if value in na_strings else value) for value in line) + + +def save_phenotypes( + cursor: mysqldb.Connection, + control_data: dict[str, Any], + filesdir: Path +) -> tuple[dict, ...]: + """Read `phenofiles` and save the phenotypes therein.""" + phenofiles = tuple(filesdir.joinpath(_file) for _file in control_data["phenocovar"]) + if len(phenofiles) <= 0: + return tuple() + + if control_data["phenocovar_transposed"]: + logger.info("Undoing transposition of the files rows and columns.") + phenofiles = ( + rqtl2.transpose_csv_with_rename( + _file, + build_line_splitter(control_data), + build_line_joiner(control_data)) + for _file in phenofiles) + + _headers = rqtl2.read_csv_file_headers(phenofiles[0], + control_data["phenocovar_transposed"], + control_data["sep"], + control_data["comment.char"]) + return create_new_phenotypes( + cursor, + (dict(zip(_headers, + __replace_na_strings__(line, control_data["na.strings"]))) + for filecontent + in (rqtl2.read_csv_file(path, + separator=control_data["sep"], + comment_char=control_data["comment.char"]) + for path in phenofiles) + for idx, line in enumerate(filecontent) + if idx != 0)) + + +def __fetch_next_dataid__(conn: mysqldb.Connection) -> int: + """Fetch the next available DataId value from the database.""" + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute( + "SELECT MAX(DataId) AS CurrentMaxDataId FROM PublishXRef") + return int(cursor.fetchone()["CurrentMaxDataId"]) + 1 + + +def __row_to_dataitems__( + sample_row: dict, + dataidmap: dict, + pheno_name2id: dict[str, int], + samples: dict +) -> tuple[dict, ...]: + samplename = sample_row["id"] + + return ({ + "phenotype_id": dataidmap[pheno_name2id[phenoname]]["phenotype_id"], + "data_id": dataidmap[pheno_name2id[phenoname]]["data_id"], + "sample_name": samplename, + "sample_id": samples[samplename]["Id"], + "value": phenovalue + } for phenoname, phenovalue in sample_row.items() if phenoname != "id") + + +def __build_dataitems__( + phenofiles, + control_data, + samples, + dataidmap, + pheno_name2id +): + _headers = rqtl2.read_csv_file_headers( + phenofiles[0], + False, # Any transposed files have been un-transposed by this point + control_data["sep"], + control_data["comment.char"]) + _filescontents = ( + rqtl2.read_csv_file(path, + separator=control_data["sep"], + comment_char=control_data["comment.char"]) + for path in phenofiles) + _linescontents = ( + __row_to_dataitems__( + dict(zip(("id",) + _headers[1:], + __replace_na_strings__(line, control_data["na.strings"]))), + dataidmap, + pheno_name2id, + samples) + for linenum, line in (enumline for filecontent in _filescontents + for enumline in enumerate(filecontent)) + if linenum > 0) + return (item for items in _linescontents + for item in items + if item["value"] is not None) + + +def save_numeric_data(# pylint: disable=[too-many-positional-arguments,too-many-arguments] + conn: mysqldb.Connection, + dataidmap: dict, + pheno_name2id: dict[str, int], + samples: tuple[dict, ...], + control_data: dict, + filesdir: Path, + filetype: str, + table: str +): + """Read data from files and save to the database.""" + phenofiles = tuple( + filesdir.joinpath(_file) for _file in control_data[filetype]) + if len(phenofiles) <= 0: + return tuple() + + if control_data[f"{filetype}_transposed"]: + logger.info("Undoing transposition of the files rows and columns.") + phenofiles = tuple( + rqtl2.transpose_csv_with_rename( + _file, + build_line_splitter(control_data), + build_line_joiner(control_data)) + for _file in phenofiles) + + try: + logger.debug("Attempt quick save with `LOAD … INFILE`.") + return quick_save_phenotypes_data( + conn, + table, + __build_dataitems__( + phenofiles, + control_data, + samples, + dataidmap, + pheno_name2id), + filesdir) + except Exception as _exc:# pylint: disable=[broad-exception-caught] + logger.debug("Could not use `LOAD … INFILE`, using raw query", + exc_info=True) + time.sleep(60) + return save_phenotypes_data( + conn, + table, + __build_dataitems__( + phenofiles, + control_data, + samples, + dataidmap, + pheno_name2id)) + + +save_pheno_data = partial(save_numeric_data, + filetype="pheno", + table="PublishData") + + +save_phenotypes_se = partial(save_numeric_data, + filetype="phenose", + table="PublishSE") + + +save_phenotypes_n = partial(save_numeric_data, + filetype="phenonum", + table="NStrain") + + +def cross_reference_phenotypes_publications_and_data( + conn: mysqldb.Connection, xref_data: tuple[dict, ...] +): + """Crossreference the phenotypes, publication and data.""" + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute("SELECT MAX(Id) CurrentMaxId FROM PublishXRef") + _nextid = int(cursor.fetchone()["CurrentMaxId"]) + 1 + _params = tuple({**row, "xref_id": _id} + for _id, row in enumerate(xref_data, start=_nextid)) + cursor.executemany( + ("INSERT INTO PublishXRef(" + "Id, InbredSetId, PhenotypeId, PublicationId, DataId, comments" + ") " + "VALUES (" + "%(xref_id)s, %(population_id)s, %(phenotype_id)s, " + "%(publication_id)s, %(data_id)s, 'Upload of new data.'" + ")"), + _params) + cursor.executemany( + "UPDATE PublishXRef SET mean=" + "(SELECT AVG(value) FROM PublishData WHERE PublishData.Id=PublishXRef.DataId) " + "WHERE PublishXRef.Id=%(xref_id)s AND " + "InbredSetId=%(population_id)s", + _params) + return _params + return tuple() + + +def update_auth(# pylint: disable=[too-many-locals,too-many-positional-arguments,too-many-arguments] + authserver, + token, + species, + population, + dataset, + xrefdata): + """Grant the user access to their data.""" + _tries = 0 + _delay = 1 + headers = { + "Authorization": f"Bearer {token}", + "Content-Type": "application/json" + } + def authserveruri(endpoint): + return urljoin(authserver, endpoint) + + def __fetch_user_details__(): + logger.debug("… Fetching user details") + return mrequests.get( + authserveruri("/auth/user/"), + headers=headers + ) + + def __link_data__(user): + logger.debug("… linking uploaded data to user's group") + return mrequests.post( + authserveruri("/auth/data/link/phenotype"), + headers=headers, + json={ + "species_name": species["Name"], + "group_id": user["group"]["group_id"], + "selected": [ + { + "SpeciesId": species["SpeciesId"], + "InbredSetId": population["Id"], + "PublishFreezeId": dataset["Id"], + "dataset_name": dataset["Name"], + "dataset_fullname": dataset["FullName"], + "dataset_shortname": dataset["ShortName"], + "PublishXRefId": item["xref_id"] + } + for item in xrefdata + ], + "using-raw-ids": "on" + }).then(lambda ld_results: (user, ld_results)) + + def __fetch_phenotype_category_details__(user, linkeddata): + logger.debug("… fetching phenotype category details") + return mrequests.get( + authserveruri("/auth/resource/categories"), + headers=headers + ).then( + lambda categories: ( + user, + linkeddata, + next(category for category in categories + if category["resource_category_key"] == "phenotype")) + ) + + def __create_resource__(user, linkeddata, category): + logger.debug("… creating authorisation resource object") + now = datetime.datetime.now().isoformat() + return mrequests.post( + authserveruri("/auth/resource/create"), + headers=headers, + json={ + "resource_category": category["resource_category_id"], + "resource_name": (f"{user['email']}—{dataset['Name']}—{now}—" + f"{len(xrefdata)} phenotypes"), + "public": "off" + }).then(lambda cr_results: (user, linkeddata, cr_results)) + + def __attach_data_to_resource__(user, linkeddata, resource): + logger.debug("… attaching data to authorisation resource object") + return mrequests.post( + authserveruri("/auth/resource/data/link"), + headers=headers, + json={ + "dataset_type": "phenotype", + "resource_id": resource["resource_id"], + "data_link_ids": [ + item["data_link_id"] for item in linkeddata["traits"]] + }).then(lambda attc: (user, linkeddata, resource, attc)) + + def __handle_error__(resp): + error = resp.json() + if error.get("error") == "IntegrityError": + # This is hacky. If the auth already exists, something went wrong + # somewhere. + # This needs investigation to recover correctly. + logger.info( + "The authorisation for the data was already set up.") + return 0 + logger.error("ERROR: Updating the authorisation for the data failed.") + logger.debug( + "ERROR: The response from the authorisation server was:\n\t%s", + error) + return 1 + + def __handle_success__(_val): + logger.info( + "The authorisation for the data has been updated successfully.") + return 0 + + return __fetch_user_details__().then(__link_data__).then( + lambda result: __fetch_phenotype_category_details__(*result) + ).then( + lambda result: __create_resource__(*result) + ).then( + lambda result: __attach_data_to_resource__(*result) + ).either(__handle_error__, __handle_success__) + + +def load_data(conn: mysqldb.Connection, job: dict) -> int:#pylint: disable=[too-many-locals] + """Load the data attached in the given job.""" + _job_metadata = job["metadata"] + # Steps + # 0. Read data from the files: can be multiple files per type + # + _species = species_by_id(conn, int(_job_metadata["species_id"])) + _population = population_by_species_and_id( + conn, + _species["SpeciesId"], + int(_job_metadata["population_id"])) + _dataset = dataset_by_id( + conn, + _species["SpeciesId"], + _population["Id"], + int(_job_metadata["dataset_id"])) + # 1. Just retrive the publication: Don't create publications for now. + _publication = fetch_publication_by_id( + conn, int(_job_metadata.get("publication_id", "0"))) or {"Id": 0} + # 2. Save all new phenotypes: + # -> return phenotype IDs + bundle = Path(_job_metadata["bundle_file"]) + _control_data = rqtl2.control_data(bundle) + logger.info("Extracting the zipped bundle of files.") + _outdir = Path(bundle.parent, f"bundle_{bundle.stem}") + with ZipFile(str(bundle), "r") as zfile: + _files = rqtl2.extract(zfile, _outdir) + logger.info("Saving new phenotypes.") + _phenos = save_phenotypes(conn, _control_data, _outdir) + def __build_phenos_maps__(accumulator, current): + dataid, row = current + return ({ + **accumulator[0], + row["phenotype_id"]: { + "population_id": _population["Id"], + "phenotype_id": row["phenotype_id"], + "data_id": dataid, + "publication_id": _publication["Id"], + } + }, { + **accumulator[1], + row["id"]: row["phenotype_id"] + }) + dataidmap, pheno_name2id = reduce( + __build_phenos_maps__, + enumerate(_phenos, start=__fetch_next_dataid__(conn)), + ({},{})) + # 3. a. Fetch the strain names and IDS: create name->ID map + samples = { + row["Name"]: row + for row in samples_by_species_and_population( + conn, _species["SpeciesId"], _population["Id"])} + # b. Save all the data items (DataIds are vibes), return new IDs + logger.info("Saving new phenotypes data.") + _num_data_rows = save_pheno_data(conn=conn, + dataidmap=dataidmap, + pheno_name2id=pheno_name2id, + samples=samples, + control_data=_control_data, + filesdir=_outdir) + logger.info("Saved %s new phenotype data rows.", _num_data_rows) + # 4. Cross-reference Phenotype, Publication, and PublishData in PublishXRef + logger.info("Cross-referencing new phenotypes to their data and publications.") + _xrefs = cross_reference_phenotypes_publications_and_data( + conn, tuple(dataidmap.values())) + # 5. If standard errors and N exist, save them too + # (use IDs returned in `3. b.` above). + if _control_data.get("phenose"): + logger.info("Saving new phenotypes standard errors.") + _num_se_rows = save_phenotypes_se(conn=conn, + dataidmap=dataidmap, + pheno_name2id=pheno_name2id, + samples=samples, + control_data=_control_data, + filesdir=_outdir) + logger.info("Saved %s new phenotype standard error rows.", _num_se_rows) + + if _control_data.get("phenonum"): + logger.info("Saving new phenotypes sample counts.") + _num_n_rows = save_phenotypes_n(conn=conn, + dataidmap=dataidmap, + pheno_name2id=pheno_name2id, + samples=samples, + control_data=_control_data, + filesdir=_outdir) + logger.info("Saved %s new phenotype sample counts rows.", _num_n_rows) + + return (_species, _population, _dataset, _xrefs) + + +if __name__ == "__main__": + def parse_args(): + """Setup command-line arguments.""" + parser = argparse.ArgumentParser( + prog="load_phenotypes_to_db", + description="Process the phenotypes' data and load it into the database.") + parser.add_argument("db_uri", type=str, help="MariaDB/MySQL connection URL") + parser.add_argument( + "jobs_db_path", type=Path, help="Path to jobs' SQLite database.") + parser.add_argument("job_id", type=uuid.UUID, help="ID of the running job") + parser.add_argument( + "--log-level", + type=str, + help="Determines what is logged out.", + choices=("debug", "info", "warning", "error", "critical"), + default="info") + return parser.parse_args() + + def setup_logging(log_level: str): + """Setup logging for the script.""" + logger.setLevel(log_level) + logging.getLogger("uploader.phenotypes.models").setLevel(log_level) + + + def main(): + """Entry-point for this script.""" + args = parse_args() + setup_logging(args.log_level.upper()) + + with (mysqldb.database_connection(args.db_uri) as conn, + conn.cursor(cursorclass=DictCursor) as cursor, + sqlite3.connection(args.jobs_db_path) as jobs_conn): + job = jobs.job(jobs_conn, args.job_id) + + # Lock the PublishXRef/PublishData/PublishSE/NStrain here: Why? + # The `DataId` values are sequential, but not auto-increment + # Can't convert `PublishXRef`.`DataId` to AUTO_INCREMENT. + # `SELECT MAX(DataId) FROM PublishXRef;` + # How do you check for a table lock? + # https://oracle-base.com/articles/mysql/mysql-identify-locked-tables + # `SHOW OPEN TABLES LIKE 'Publish%';` + _db_tables_ = ( + "Species", + "InbredSet", + "Strain", + "StrainXRef", + "Publication", + "Phenotype", + "PublishXRef", + "PublishFreeze", + "PublishData", + "PublishSE", + "NStrain") + + logger.debug( + ("Locking database tables for the connection:" + + "".join("\n\t- %s" for _ in _db_tables_) + "\n"), + *_db_tables_) + cursor.execute(# Lock the tables to avoid race conditions + "LOCK TABLES " + ", ".join( + f"{_table} WRITE" for _table in _db_tables_)) + + db_results = load_data(conn, job) + jobs.update_metadata( + jobs_conn, + args.job_id, + "xref_ids", + json.dumps([xref["xref_id"] for xref in db_results[3]])) + + logger.info("Unlocking all database tables.") + cursor.execute("UNLOCK TABLES") + + # Update authorisations (break this down) — maybe loop until it works? + logger.info("Updating authorisation.") + _job_metadata = job["metadata"] + return update_auth(_job_metadata["authserver"], + _job_metadata["token"], + *db_results) + + + try: + sys.exit(main()) + except Exception as _exc:# pylint: disable=[broad-exception-caught] + logger.debug("Data loading failed… Halting!", + exc_info=True) + sys.exit(1) diff --git a/scripts/process_rqtl2_bundle.py b/scripts/process_rqtl2_bundle.py index 8b7a0fb..e2ce420 100644 --- a/scripts/process_rqtl2_bundle.py +++ b/scripts/process_rqtl2_bundle.py @@ -104,7 +104,7 @@ def process_bundle(dbconn: mdb.Connection, rqtl2bundle=Path(meta["rqtl2-bundle-file"])), logger) if genoexit != 0: - raise Exception("Processing 'geno' file failed.") + raise Exception("Processing 'geno' file failed.")# pylint: disable=[broad-exception-raised] logger.debug( "geno file processing completed successfully. (ExitCode: %s)", genoexit) @@ -122,7 +122,7 @@ def process_bundle(dbconn: mdb.Connection, rqtl2bundle=Path(meta["rqtl2-bundle-file"])), logger) if phenoexit != 0: - raise Exception("Processing 'pheno' file failed.") + raise Exception("Processing 'pheno' file failed.")# pylint: disable=[broad-exception-raised] logger.debug( "pheno file processing completed successfully. (ExitCode: %s)", phenoexit) diff --git a/scripts/qc_on_rqtl2_bundle.py b/scripts/qc_on_rqtl2_bundle.py index 9f9248c..0207938 100644 --- a/scripts/qc_on_rqtl2_bundle.py +++ b/scripts/qc_on_rqtl2_bundle.py @@ -191,7 +191,7 @@ def check_pheno_samples( return allerrors -def qc_pheno_errors(# pylint: disable=[too-many-arguments] +def qc_pheno_errors(# pylint: disable=[too-many-arguments, too-many-positional-arguments] rconn, fqjobid, dburi, speciesid, zfile, logger) -> bool: """Check for errors in `pheno` file(s).""" cdata = rqtl2.control_data(zfile) @@ -260,7 +260,7 @@ def run_qc(rconn: Redis, if qc_missing_files(rconn, fqjobid, zfile, logger): return 1 - def with_zipfile(# pylint: disable=[too-many-arguments] + def with_zipfile(# pylint: disable=[too-many-arguments,too-many-positional-arguments] rconn, fqjobid, dbconn, speciesid, filename, logger, func ): with ZipFile(filename, "r") as zfile: diff --git a/scripts/qc_on_rqtl2_bundle2.py b/scripts/qc_on_rqtl2_bundle2.py deleted file mode 100644 index 7e5d253..0000000 --- a/scripts/qc_on_rqtl2_bundle2.py +++ /dev/null @@ -1,346 +0,0 @@ -"""Run Quality Control checks on R/qtl2 bundle.""" -import os -import sys -import json -from time import sleep -from pathlib import Path -from zipfile import ZipFile -from argparse import Namespace -from datetime import timedelta -import multiprocessing as mproc -from functools import reduce, partial -from logging import Logger, getLogger, StreamHandler -from typing import Union, Sequence, Callable, Iterator - -import MySQLdb as mdb -from redis import Redis - -from quality_control.errors import InvalidValue -from quality_control.checks import decimal_points_error - -from uploader import jobs -from uploader.db_utils import database_connection -from uploader.check_connections import check_db, check_redis - -from r_qtl import r_qtl2 as rqtl2 -from r_qtl import r_qtl2_qc as rqc -from r_qtl import exceptions as rqe -from r_qtl import fileerrors as rqfe - -from scripts.process_rqtl2_bundle import parse_job -from scripts.redis_logger import setup_redis_logger -from scripts.cli_parser import init_cli_parser, add_global_data_arguments -from scripts.rqtl2.bundleutils import build_line_joiner, build_line_splitter - - -def check_for_missing_files( - rconn: Redis, fqjobid: str, extractpath: Path, logger: Logger) -> bool: - """Check that all files listed in the control file do actually exist.""" - logger.info("Checking for missing files.") - missing = rqc.missing_files(extractpath) - # add_to_errors(rconn, fqjobid, "errors-generic", tuple( - # rqfe.MissingFile( - # mfile[0], mfile[1], ( - # f"File '{mfile[1]}' is listed in the control file under " - # f"the '{mfile[0]}' key, but it does not actually exist in " - # "the bundle.")) - # for mfile in missing)) - if len(missing) > 0: - logger.error(f"Missing files in the bundle!") - return True - return False - - -def open_file(file_: Path) -> Iterator: - """Open file and return one line at a time.""" - with open(file_, "r", encoding="utf8") as infile: - for line in infile: - yield line - - -def check_markers( - filename: str, - row: tuple[str, ...], - save_error: lambda val: val -) -> tuple[rqfe.InvalidValue]: - """Check that the markers are okay""" - errors = tuple() - counts = {} - for marker in row: - counts = {**counts, marker: counts.get(marker, 0) + 1} - if marker is None or marker == "": - errors = errors + (save_error(rqfe.InvalidValue( - filename, - "markers" - "-", - marker, - "A marker MUST be a valid value.")),) - - return errors + tuple( - save_error(rqfe.InvalidValue( - filename, - "markers", - key, - f"Marker '{key}' was repeated {value} times")) - for key,value in counts.items() if value > 1) - - -def check_geno_line( - filename: str, - headers: tuple[str, ...], - row: tuple[Union[str, None]], - cdata: dict, - save_error: lambda val: val -) -> tuple[rqfe.InvalidValue]: - """Check that the geno line is correct.""" - errors = tuple() - # Verify that line has same number of columns as headers - if len(headers) != len(row): - errors = errors + (save_error(rqfe.InvalidValue( - filename, - headers[0], - row[0], - row[0], - "Every line MUST have the same number of columns.")),) - - # First column is the individuals/cases/samples - if not bool(row[0]): - errors = errors + (save_error(rqfe.InvalidValue( - filename, - headers[0], - row[0], - row[0], - "The sample/case MUST be a valid value.")),) - - def __process_value__(val): - if val in cdata["na.strings"]: - return None - if val in cdata["alleles"]: - return cdata["genotypes"][val] - - genocode = cdata.get("genotypes", {}) - for coltitle, cellvalue in zip(headers[1:],row[1:]): - if ( - bool(genocode) and - cellvalue is not None and - cellvalue not in genocode.keys() - ): - errors = errors + (save_error(rqfe.InvalidValue( - filename, row[0], coltitle, cellvalue, - f"Value '{cellvalue}' is invalid. Expected one of " - f"'{', '.join(genocode.keys())}'.")),) - - return errors - - -def push_file_error_to_redis(rconn: Redis, key: str, error: InvalidValue) -> InvalidValue: - """Push the file error to redis a json string - - Parameters - ---------- - rconn: Connection to redis - key: The name of the list where we push the errors - error: The file error to save - - Returns - ------- - Returns the file error it saved - """ - if bool(error): - rconn.rpush(key, json.dumps(error._asdict())) - return error - - -def file_errors_and_details( - redisargs: dict[str, str], - file_: Path, - filetype: str, - cdata: dict, - linesplitterfn: Callable, - linejoinerfn: Callable, - headercheckers: tuple[Callable, ...], - bodycheckers: tuple[Callable, ...] -) -> dict: - """Compute errors, and other file metadata.""" - errors = tuple() - if cdata[f"{filetype}_transposed"]: - rqtl2.transpose_csv_with_rename(file_, linesplitterfn, linejoinerfn) - - with Redis.from_url(redisargs["redisuri"], decode_responses=True) as rconn: - save_error_fn = partial(push_file_error_to_redis, - rconn, - error_list_name(filetype, file_.name)) - for lineno, line in enumerate(open_file(file_), start=1): - row = linesplitterfn(line) - if lineno == 1: - headers = tuple(row) - errors = errors + reduce( - lambda errs, fnct: errs + fnct( - file_.name, row[1:], save_error_fn), - headercheckers, - tuple()) - continue - - errors = errors + reduce( - lambda errs, fnct: errs + fnct( - file_.name, headers, row, cdata, save_error_fn), - bodycheckers, - tuple()) - - filedetails = { - "filename": file_.name, - "filesize": os.stat(file_).st_size, - "linecount": lineno - } - rconn.hset(redisargs["fqjobid"], - f"file-details:{filetype}:{file_.name}", - json.dumps(filedetails)) - return {**filedetails, "errors": errors} - - -def error_list_name(filetype: str, filename: str): - """Compute the name of the list where the errors will be pushed. - - Parameters - ---------- - filetype: The type of file. One of `r_qtl.r_qtl2.FILE_TYPES` - filename: The name of the file. - """ - return f"errors:{filetype}:{filename}" - - -def check_for_geno_errors( - redisargs: dict[str, str], - extractdir: Path, - cdata: dict, - linesplitterfn: Callable[[str], tuple[Union[str, None]]], - linejoinerfn: Callable[[tuple[Union[str, None], ...]], str], - logger: Logger -) -> bool: - """Check for errors in genotype files.""" - if "geno" in cdata or "founder_geno" in cdata: - genofiles = tuple( - extractdir.joinpath(fname) for fname in cdata.get("geno", [])) - fgenofiles = tuple( - extractdir.joinpath(fname) for fname in cdata.get("founder_geno", [])) - allgenofiles = genofiles + fgenofiles - with Redis.from_url(redisargs["redisuri"], decode_responses=True) as rconn: - error_list_names = [ - error_list_name("geno", file_.name) for file_ in allgenofiles] - for list_name in error_list_names: - rconn.delete(list_name) - rconn.hset( - redisargs["fqjobid"], - "geno-errors-lists", - json.dumps(error_list_names)) - processes = [ - mproc.Process(target=file_errors_and_details, - args=( - redisargs, - file_, - ftype, - cdata, - linesplitterfn, - linejoinerfn, - (check_markers,), - (check_geno_line,)) - ) - for ftype, file_ in ( - tuple(("geno", file_) for file_ in genofiles) + - tuple(("founder_geno", file_) for file_ in fgenofiles)) - ] - for process in processes: - process.start() - # Set expiry for any created error lists - for key in error_list_names: - rconn.expire(name=key, - time=timedelta(seconds=redisargs["redisexpiry"])) - - # TOD0: Add the errors to redis - if any(rconn.llen(errlst) > 0 for errlst in error_list_names): - logger.error("At least one of the 'geno' files has (an) error(s).") - return True - logger.info("No error(s) found in any of the 'geno' files.") - - else: - logger.info("No 'geno' files to check.") - - return False - - -# def check_for_pheno_errors(...): -# """Check for errors in phenotype files.""" -# pass - - -# def check_for_phenose_errors(...): -# """Check for errors in phenotype, standard-error files.""" -# pass - - -# def check_for_phenocovar_errors(...): -# """Check for errors in phenotype-covariates files.""" -# pass - - -def run_qc(rconn: Redis, args: Namespace, fqjobid: str, logger: Logger) -> int: - """Run quality control checks on R/qtl2 bundles.""" - thejob = parse_job(rconn, args.redisprefix, args.jobid) - print(f"THE JOB =================> {thejob}") - jobmeta = thejob["job-metadata"] - inpath = Path(jobmeta["rqtl2-bundle-file"]) - extractdir = inpath.parent.joinpath(f"{inpath.name}__extraction_dir") - with ZipFile(inpath, "r") as zfile: - rqtl2.extract(zfile, extractdir) - - ### BEGIN: The quality control checks ### - cdata = rqtl2.control_data(extractdir) - splitter = build_line_splitter(cdata) - joiner = build_line_joiner(cdata) - - redisargs = { - "fqjobid": fqjobid, - "redisuri": args.redisuri, - "redisexpiry": args.redisexpiry - } - check_for_missing_files(rconn, fqjobid, extractdir, logger) - # check_for_pheno_errors(...) - check_for_geno_errors(redisargs, extractdir, cdata, splitter, joiner, logger) - # check_for_phenose_errors(...) - # check_for_phenocovar_errors(...) - ### END: The quality control checks ### - - def __fetch_errors__(rkey: str) -> tuple: - return tuple(json.loads(rconn.hget(fqjobid, rkey) or "[]")) - - return (1 if any(( - bool(__fetch_errors__(key)) - for key in - ("errors-geno", "errors-pheno", "errors-phenos", "errors-phenocovar"))) - else 0) - - -if __name__ == "__main__": - def main(): - """Enter R/qtl2 bundle QC runner.""" - args = add_global_data_arguments(init_cli_parser( - "qc-on-rqtl2-bundle", "Run QC on R/qtl2 bundle.")).parse_args() - check_redis(args.redisuri) - check_db(args.databaseuri) - - logger = getLogger("qc-on-rqtl2-bundle") - logger.addHandler(StreamHandler(stream=sys.stderr)) - logger.setLevel("DEBUG") - - fqjobid = jobs.job_key(args.redisprefix, args.jobid) - with Redis.from_url(args.redisuri, decode_responses=True) as rconn: - logger.addHandler(setup_redis_logger( - rconn, fqjobid, f"{fqjobid}:log-messages", - args.redisexpiry)) - - exitcode = run_qc(rconn, args, fqjobid, logger) - rconn.hset( - jobs.job_key(args.redisprefix, args.jobid), "exitcode", exitcode) - return exitcode - - sys.exit(main()) diff --git a/scripts/redis_logger.py b/scripts/redis_logger.py index d3fde5f..a74e5e4 100644 --- a/scripts/redis_logger.py +++ b/scripts/redis_logger.py @@ -6,7 +6,7 @@ from redis import Redis class RedisLogger(logging.Handler): """Log out to redis for our worker scripts""" - def __init__(self,#pylint: disable=[too-many-arguments] + def __init__(self,#pylint: disable=[too-many-arguments, too-many-positional-arguments] rconn: Redis, fullyqualifiedjobid: str, messageslistname: str, diff --git a/scripts/rqtl2/entry.py b/scripts/rqtl2/entry.py index 327ed2c..e0e00e7 100644 --- a/scripts/rqtl2/entry.py +++ b/scripts/rqtl2/entry.py @@ -20,27 +20,23 @@ def build_main( [Redis, Connection, str, Namespace, logging.Logger], int ], - loggername: str + logger: logging.Logger ) -> Callable[[],int]: """Build a function to be used as an entry-point for scripts.""" def main(): - try: - logging.basicConfig( - format=( - "%(asctime)s - %(levelname)s %(name)s: " - "(%(pathname)s: %(lineno)d) %(message)s"), - level=args.loglevel) - logger = logging.getLogger(loggername) - with (Redis.from_url(args.redisuri, decode_responses=True) as rconn, - database_connection(args.databaseuri) as dbconn): - fqjobid = jobs.job_key(args.redisprefix, args.jobid) + with (Redis.from_url(args.redisuri, decode_responses=True) as rconn, + database_connection(args.databaseuri) as dbconn): + logger.setLevel(args.loglevel.upper()) + fqjobid = jobs.job_key(args.redisprefix, args.jobid) + + try: rconn.hset(fqjobid, "status", "started") logger.addHandler(setup_redis_logger( rconn, fqjobid, f"{fqjobid}:log-messages", args.redisexpiry)) - logger.addHandler(StreamHandler(stream=sys.stdout)) + logger.addHandler(StreamHandler(stream=sys.stderr)) check_db(args.databaseuri) check_redis(args.redisuri) @@ -48,15 +44,15 @@ def build_main( logger.error("File not found: '%s'.", args.rqtl2bundle) return 2 - returncode = run_fn(rconn, dbconn, fqjobid, args, logger) + returncode = run_fn(rconn, dbconn, fqjobid, args) if returncode == 0: rconn.hset(fqjobid, "status", "completed:success") return returncode rconn.hset(fqjobid, "status", "completed:error") return returncode - except Exception as _exc:# pylint: disable=[broad-except] - logger.error("The process failed!", exc_info=True) - rconn.hset(fqjobid, "status", "completed:error") - return 4 + except Exception as _exc:# pylint: disable=[broad-except] + logger.error("The process failed!", exc_info=True) + rconn.hset(fqjobid, "status", "completed:error") + return 4 return main diff --git a/scripts/rqtl2/phenotypes_qc.py b/scripts/rqtl2/phenotypes_qc.py index ba28ed0..9f11f57 100644 --- a/scripts/rqtl2/phenotypes_qc.py +++ b/scripts/rqtl2/phenotypes_qc.py @@ -36,8 +36,15 @@ from scripts.cli_parser import init_cli_parser, add_global_data_arguments from scripts.rqtl2.bundleutils import build_line_joiner, build_line_splitter __MODULE__ = "scripts.rqtl2.phenotypes_qc" +logging.basicConfig( + format=("%(asctime)s - %(levelname)s %(name)s: " + "(%(pathname)s: %(lineno)d) %(message)s")) +logger = logging.getLogger(__MODULE__) -def validate(phenobundle: Path, logger: Logger) -> dict: +def validate( + phenobundle: Path, + logger: Logger# pylint: disable=[redefined-outer-name] +) -> dict: """Check that the bundle is generally valid""" try: rqc.validate_bundle(phenobundle) @@ -59,7 +66,7 @@ def validate(phenobundle: Path, logger: Logger) -> dict: def check_for_mandatory_pheno_keys( phenobundle: Path, - logger: Logger, + logger: Logger,# pylint: disable=[redefined-outer-name] **kwargs ) -> dict: """Check that the mandatory keys exist for phenotypes.""" @@ -86,7 +93,7 @@ def check_for_mandatory_pheno_keys( def check_for_averages_files( phenobundle: Path, - logger: Logger, + logger: Logger,# pylint: disable=[redefined-outer-name] **kwargs ) -> dict: """Check that averages files appear together""" @@ -140,15 +147,15 @@ def redis_logger( ) -> Iterator[logging.Logger]: """Build a Redis message-list logger.""" rconn = Redis.from_url(redisuri, decode_responses=True) - logger = logging.getLogger(loggername) - logger.propagate = False + _logger = logging.getLogger(loggername) + _logger.propagate = False handler = RedisMessageListHandler( rconn, fullyqualifiedkey(fqkey, filename))#type: ignore[arg-type] handler.setFormatter(logging.getLogger().handlers[0].formatter) - logger.addHandler(handler) + _logger.addHandler(handler) try: - yield logger + yield _logger finally: rconn.close() @@ -175,9 +182,9 @@ def qc_phenocovar_file( redisuri, f"{__MODULE__}.qc_phenocovar_file", filepath.name, - f"{fqkey}:logs") as logger, + f"{fqkey}:logs") as _logger, Redis.from_url(redisuri, decode_responses=True) as rconn): - logger.info("Running QC on file: %s", filepath.name) + print("Running QC on file: ", filepath.name) _csvfile = rqtl2.read_csv_file(filepath, separator, comment_char) _headings = tuple(heading.lower() for heading in next(_csvfile)) _errors: tuple[InvalidValue, ...] = tuple() @@ -195,7 +202,7 @@ def qc_phenocovar_file( def collect_errors(errors_and_linecount, line): _errs, _lc = errors_and_linecount - logger.info("Testing record '%s'", line[0]) + _logger.info("Testing record '%s'", line[0]) if len(line) != len(_headings): _errs = _errs + (save_error(InvalidValue( filepath.name, @@ -205,12 +212,12 @@ def qc_phenocovar_file( (f"Record {_lc} in file {filepath.name} has a different " "number of columns than the number of headings"))),) _line = dict(zip(_headings, line)) - if not bool(_line["description"]): + if not bool(_line.get("description")): _errs = _errs + ( save_error(InvalidValue(filepath.name, _line[_headings[0]], "description", - _line["description"], + _line.get("description"), "The description is not provided!")),) rconn.hset(file_fqkey(fqkey, "metadata", filepath), @@ -236,7 +243,7 @@ def merge_dicts(*dicts): return reduce(lambda merged, dct: {**merged, **dct}, dicts, {}) -def decimal_points_error(# pylint: disable=[too-many-arguments] +def decimal_points_error(# pylint: disable=[too-many-arguments,too-many-positional-arguments] filename: str, rowtitle: str, coltitle: str, @@ -267,7 +274,7 @@ def integer_error( return InvalidValue(filename, rowtitle, coltitle, cellvalue, message) -def qc_pheno_file(# pylint: disable=[too-many-locals, too-many-arguments] +def qc_pheno_file(# pylint: disable=[too-many-locals, too-many-arguments, too-many-positional-arguments] filepath: Path, redisuri: str, fqkey: str, @@ -283,17 +290,22 @@ def qc_pheno_file(# pylint: disable=[too-many-locals, too-many-arguments] redisuri, f"{__MODULE__}.qc_pheno_file", filepath.name, - f"{fqkey}:logs") as logger, + f"{fqkey}:logs") as _logger, Redis.from_url(redisuri, decode_responses=True) as rconn): - logger.info("Running QC on file: %s", filepath.name) + print("Running QC on file: ", filepath.name) save_error = partial( push_error, rconn, file_fqkey(fqkey, "errors", filepath)) _csvfile = rqtl2.read_csv_file(filepath, separator, comment_char) _headings: tuple[str, ...] = tuple( + # select lowercase for comparison purposes heading.lower() for heading in next(_csvfile)) _errors: tuple[InvalidValue, ...] = tuple() - _absent = tuple(pheno for pheno in _headings[1:] if pheno not in phenonames) + _absent = tuple(pheno for pheno in _headings[1:] if pheno + not in tuple( + # lower to have consistent case with headings for + # comparison + phe.lower() for phe in phenonames)) if len(_absent) > 0: _errors = _errors + (save_error(InvalidValue( filepath.name, @@ -305,7 +317,7 @@ def qc_pheno_file(# pylint: disable=[too-many-locals, too-many-arguments] def collect_errors(errors_and_linecount, line): _errs, _lc = errors_and_linecount - logger.debug("Checking row %s", line[0]) + _logger.debug("Checking row %s", line[0]) if line[0] not in samples: _errs = _errs + (save_error(InvalidValue( filepath.name, @@ -364,11 +376,10 @@ def run_qc(# pylint: disable=[too-many-locals] rconn: Redis, dbconn: mdb.Connection, fullyqualifiedjobid: str, - args: Namespace, - logger: Logger + args: Namespace ) -> int: """Run quality control checks on the bundle.""" - logger.debug("Beginning the quality assurance checks.") + print("Beginning the quality assurance checks.") results = check_for_averages_files( **check_for_mandatory_pheno_keys( **validate(args.rqtl2bundle, logger))) @@ -393,7 +404,7 @@ def run_qc(# pylint: disable=[too-many-locals] for ftype in ("pheno", "phenocovar", "phenose", "phenonum"))) # - Fetch samples/individuals from database. - logger.debug("Fetching samples/individuals from the database.") + print("Fetching samples/individuals from the database.") samples = tuple(#type: ignore[var-annotated] item for item in set(reduce( lambda acc, item: acc + ( @@ -410,7 +421,7 @@ def run_qc(# pylint: disable=[too-many-locals] json.dumps(tuple(f"{fullyqualifiedjobid}:phenocovar:{_file}" for _file in cdata.get("phenocovar", [])))) with mproc.Pool(mproc.cpu_count() - 1) as pool: - logger.debug("Check for errors in 'phenocovar' file(s).") + print("Check for errors in 'phenocovar' file(s).") _phenocovar_qc_res = merge_dicts(*pool.starmap(qc_phenocovar_file, tuple( (extractiondir.joinpath(_file), args.redisuri, @@ -432,7 +443,7 @@ def run_qc(# pylint: disable=[too-many-locals] "Expected a non-negative number with at least one decimal " "place.")) - logger.debug("Check for errors in 'pheno' file(s).") + print("Check for errors in 'pheno' file(s).") _pheno_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple(( extractiondir.joinpath(_file), args.redisuri, @@ -451,7 +462,7 @@ def run_qc(# pylint: disable=[too-many-locals] # - Check the 3 checks above for phenose and phenonum values too # qc_phenose_files(…) # qc_phenonum_files(…) - logger.debug("Check for errors in 'phenose' file(s).") + print("Check for errors in 'phenose' file(s).") _phenose_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple(( extractiondir.joinpath(_file), args.redisuri, @@ -467,7 +478,7 @@ def run_qc(# pylint: disable=[too-many-locals] dec_err_fn ) for _file in cdata.get("phenose", [])))) - logger.debug("Check for errors in 'phenonum' file(s).") + print("Check for errors in 'phenonum' file(s).") _phenonum_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple(( extractiondir.joinpath(_file), args.redisuri, @@ -504,5 +515,5 @@ if __name__ == "__main__": type=Path) return parser.parse_args() - main = build_main(cli_args(), run_qc, __MODULE__) + main = build_main(cli_args(), run_qc, logger) sys.exit(main()) diff --git a/scripts/worker.py b/scripts/worker.py index 91b0332..3165fe7 100644 --- a/scripts/worker.py +++ b/scripts/worker.py @@ -79,7 +79,7 @@ def main(): fqjobid = jobs.job_key(args.redisprefix, args.jobid) rconn.hset(fqjobid, "stderr", f"No such job. '{args.job_id}'.") rconn.expire(name=jobs.job_key(args.redisprefix, args.job_id), - time=timedelta(seconds=(2 * 60 * 60))) + time=timedelta(seconds=2 * 60 * 60)) print(f"No such job. '{args.job_id}'.", file=sys.stderr) return 2 return 3 |