diff options
Diffstat (limited to 'scripts')
-rw-r--r-- | scripts/cli_parser.py | 7 | ||||
-rw-r--r-- | scripts/insert_data.py | 4 | ||||
-rw-r--r-- | scripts/insert_samples.py | 35 | ||||
-rw-r--r-- | scripts/load_phenotypes_to_db.py | 521 | ||||
-rw-r--r-- | scripts/phenotypes_bulk_edit.py | 266 | ||||
-rw-r--r-- | scripts/process_rqtl2_bundle.py | 31 | ||||
-rw-r--r-- | scripts/qc.py | 4 | ||||
-rw-r--r-- | scripts/qc_on_rqtl2_bundle.py | 87 | ||||
-rw-r--r-- | scripts/qcapp_wsgi.py | 52 | ||||
-rw-r--r-- | scripts/redis_logger.py | 21 | ||||
-rw-r--r-- | scripts/rqtl2/bundleutils.py | 44 | ||||
-rw-r--r-- | scripts/rqtl2/cli_parser.py | 20 | ||||
-rw-r--r-- | scripts/rqtl2/entry.py | 64 | ||||
-rw-r--r-- | scripts/rqtl2/install_genotypes.py | 150 | ||||
-rw-r--r-- | scripts/rqtl2/install_phenos.py | 31 | ||||
-rw-r--r-- | scripts/rqtl2/phenotypes_qc.py | 516 | ||||
-rw-r--r-- | scripts/validate_file.py | 4 | ||||
-rw-r--r-- | scripts/worker.py | 4 |
18 files changed, 1674 insertions, 187 deletions
diff --git a/scripts/cli_parser.py b/scripts/cli_parser.py index 308ee4b..0c91c5e 100644 --- a/scripts/cli_parser.py +++ b/scripts/cli_parser.py @@ -19,6 +19,13 @@ def init_cli_parser(program: str, description: Optional[str] = None) -> Argument type=int, default=86400, help="How long to keep any redis keys around.") + parser.add_argument( + "--loglevel", + type=str, + default="INFO", + choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", + "debug", "info", "warning", "error", "critical"], + help="The severity of events to track with the logger.") return parser def add_global_data_arguments(parser: ArgumentParser) -> ArgumentParser: diff --git a/scripts/insert_data.py b/scripts/insert_data.py index 1465348..67038f8 100644 --- a/scripts/insert_data.py +++ b/scripts/insert_data.py @@ -10,12 +10,12 @@ from typing import Tuple, Iterator import MySQLdb as mdb from redis import Redis from MySQLdb.cursors import DictCursor +from gn_libs.mysqldb import database_connection from functional_tools import take from quality_control.file_utils import open_file -from qc_app.db_utils import database_connection -from qc_app.check_connections import check_db, check_redis +from uploader.check_connections import check_db, check_redis # Set up logging stderr_handler = logging.StreamHandler(stream=sys.stderr) diff --git a/scripts/insert_samples.py b/scripts/insert_samples.py index 8431462..742c4ae 100644 --- a/scripts/insert_samples.py +++ b/scripts/insert_samples.py @@ -3,14 +3,16 @@ import sys import logging import pathlib import argparse +import traceback import MySQLdb as mdb from redis import Redis +from gn_libs.mysqldb import database_connection -from qc_app.db_utils import database_connection -from qc_app.check_connections import check_db, check_redis -from qc_app.db import species_by_id, population_by_id -from qc_app.samples import ( +from uploader.check_connections import check_db, check_redis +from uploader.species.models import species_by_id +from uploader.population.models import population_by_id +from uploader.samples.models import ( save_samples_data, read_samples_file, cross_reference_samples) @@ -72,6 +74,7 @@ def insert_samples(conn: mdb.Connection,# pylint: disable=[too-many-arguments] print("Samples upload successfully completed.") return 0 + if __name__ == "__main__": def cli_args(): @@ -126,7 +129,7 @@ if __name__ == "__main__": def main(): """Run script to insert samples into the database.""" - + status_code = 1 # Exit with an Exception args = cli_args() check_db(args.databaseuri) check_redis(args.redisuri) @@ -136,13 +139,19 @@ if __name__ == "__main__": with (Redis.from_url(args.redisuri, decode_responses=True) as rconn, database_connection(args.databaseuri) as dbconn): - return insert_samples(dbconn, - rconn, - args.speciesid, - args.populationid, - args.samplesfile, - args.separator, - args.firstlineheading, - args.quotechar) + + try: + status_code = insert_samples(dbconn, + rconn, + args.speciesid, + args.populationid, + args.samplesfile, + args.separator, + args.firstlineheading, + args.quotechar) + except Exception as _exc: + print(traceback.format_exc(), file=sys.stderr) + + return status_code sys.exit(main()) diff --git a/scripts/load_phenotypes_to_db.py b/scripts/load_phenotypes_to_db.py new file mode 100644 index 0000000..08ee558 --- /dev/null +++ b/scripts/load_phenotypes_to_db.py @@ -0,0 +1,521 @@ +import sys +import uuid +import json +import logging +import argparse +import datetime +from pathlib import Path +from zipfile import ZipFile +from typing import Any, Union +from urllib.parse import urljoin +from functools import reduce, partial + +from MySQLdb.cursors import Cursor, DictCursor + +from gn_libs import jobs, mysqldb, sqlite3, monadic_requests as mrequests + +from r_qtl import r_qtl2 as rqtl2 +from uploader.species.models import species_by_id +from uploader.population.models import population_by_species_and_id +from uploader.samples.models import samples_by_species_and_population +from uploader.phenotypes.models import ( + dataset_by_id, + save_phenotypes_data, + create_new_phenotypes, + quick_save_phenotypes_data) +from uploader.publications.models import ( + create_new_publications, + fetch_publication_by_id) + +from scripts.rqtl2.bundleutils import build_line_joiner, build_line_splitter + +logging.basicConfig( + format="%(asctime)s — %(filename)s:%(lineno)s — %(levelname)s: %(message)s") +logger = logging.getLogger(__name__) + + + +def __replace_na_strings__(line, na_strings): + return ((None if value in na_strings else value) for value in line) + + +def save_phenotypes( + cursor: mysqldb.Connection, + control_data: dict[str, Any], + filesdir: Path +) -> tuple[dict, ...]: + """Read `phenofiles` and save the phenotypes therein.""" + ## TODO: Replace with something like this: ## + # phenofiles = control_data["phenocovar"] + control_data.get( + # "gn-metadata", {}).get("pheno", []) + # + # This is meant to load (and merge) data from the "phenocovar" and + # "gn-metadata -> pheno" files into a single collection of phenotypes. + phenofiles = tuple(filesdir.joinpath(_file) for _file in control_data["phenocovar"]) + if len(phenofiles) <= 0: + return tuple() + + if control_data["phenocovar_transposed"]: + logger.info("Undoing transposition of the files rows and columns.") + phenofiles = ( + rqtl2.transpose_csv_with_rename( + _file, + build_line_splitter(control_data), + build_line_joiner(control_data)) + for _file in phenofiles) + + _headers = rqtl2.read_csv_file_headers(phenofiles[0], + control_data["phenocovar_transposed"], + control_data["sep"], + control_data["comment.char"]) + return create_new_phenotypes( + cursor, + (dict(zip(_headers, + __replace_na_strings__(line, control_data["na.strings"]))) + for filecontent + in (rqtl2.read_csv_file(path, + separator=control_data["sep"], + comment_char=control_data["comment.char"]) + for path in phenofiles) + for idx, line in enumerate(filecontent) + if idx != 0)) + + +def __fetch_next_dataid__(conn: mysqldb.Connection) -> int: + """Fetch the next available DataId value from the database.""" + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute( + "SELECT MAX(DataId) AS CurrentMaxDataId FROM PublishXRef") + return int(cursor.fetchone()["CurrentMaxDataId"]) + 1 + + +def __row_to_dataitems__( + sample_row: dict, + dataidmap: dict, + pheno_name2id: dict[str, int], + samples: dict +) -> tuple[dict, ...]: + samplename = sample_row["id"] + + return ({ + "phenotype_id": dataidmap[pheno_name2id[phenoname]]["phenotype_id"], + "data_id": dataidmap[pheno_name2id[phenoname]]["data_id"], + "sample_name": samplename, + "sample_id": samples[samplename]["Id"], + "value": phenovalue + } for phenoname, phenovalue in sample_row.items() if phenoname != "id") + + +def __build_dataitems__( + filetype, + phenofiles, + control_data, + samples, + dataidmap, + pheno_name2id +): + _headers = rqtl2.read_csv_file_headers( + phenofiles[0], + False, # Any transposed files have been un-transposed by this point + control_data["sep"], + control_data["comment.char"]) + _filescontents = ( + rqtl2.read_csv_file(path, + separator=control_data["sep"], + comment_char=control_data["comment.char"]) + for path in phenofiles) + _linescontents = ( + __row_to_dataitems__( + dict(zip(("id",) + _headers[1:], + __replace_na_strings__(line, control_data["na.strings"]))), + dataidmap, + pheno_name2id, + samples) + for linenum, line in (enumline for filecontent in _filescontents + for enumline in enumerate(filecontent)) + if linenum > 0) + return (item for items in _linescontents + for item in items + if item["value"] is not None) + + +def save_numeric_data( + conn: mysqldb.Connection, + dataidmap: dict, + pheno_name2id: dict[str, int], + samples: tuple[dict, ...], + control_data: dict, + filesdir: Path, + filetype: str, + table: str +): + """Read data from files and save to the database.""" + phenofiles = tuple( + filesdir.joinpath(_file) for _file in control_data[filetype]) + if len(phenofiles) <= 0: + return tuple() + + if control_data[f"{filetype}_transposed"]: + logger.info("Undoing transposition of the files rows and columns.") + phenofiles = tuple( + rqtl2.transpose_csv_with_rename( + _file, + build_line_splitter(control_data), + build_line_joiner(control_data)) + for _file in phenofiles) + + try: + logger.debug("Attempt quick save with `LOAD … INFILE`.") + return quick_save_phenotypes_data( + conn, + table, + __build_dataitems__( + filetype, + phenofiles, + control_data, + samples, + dataidmap, + pheno_name2id), + filesdir) + except Exception as _exc: + logger.debug("Could not use `LOAD … INFILE`, using raw query", + exc_info=True) + import time;time.sleep(60) + return save_phenotypes_data( + conn, + table, + __build_dataitems__( + filetype, + phenofiles, + control_data, + samples, + dataidmap, + pheno_name2id)) + + +save_pheno_data = partial(save_numeric_data, + filetype="pheno", + table="PublishData") + + +save_phenotypes_se = partial(save_numeric_data, + filetype="phenose", + table="PublishSE") + + +save_phenotypes_n = partial(save_numeric_data, + filetype="phenonum", + table="NStrain") + + +def cross_reference_phenotypes_publications_and_data( + conn: mysqldb.Connection, xref_data: tuple[dict, ...] +): + """Crossreference the phenotypes, publication and data.""" + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute("SELECT MAX(Id) CurrentMaxId FROM PublishXRef") + _nextid = int(cursor.fetchone()["CurrentMaxId"]) + 1 + _params = tuple({**row, "xref_id": _id} + for _id, row in enumerate(xref_data, start=_nextid)) + cursor.executemany( + ("INSERT INTO PublishXRef(" + "Id, InbredSetId, PhenotypeId, PublicationId, DataId, comments" + ") " + "VALUES (" + "%(xref_id)s, %(population_id)s, %(phenotype_id)s, " + "%(publication_id)s, %(data_id)s, 'Upload of new data.'" + ")"), + _params) + return _params + return tuple() + + +def update_auth(authserver, token, species, population, dataset, xrefdata): + """Grant the user access to their data.""" + # TODO Call into the auth server to: + # 1. Link the phenotypes with a user group + # - fetch group: http://localhost:8081/auth/user/group + # - link data to group: http://localhost:8081/auth/data/link/phenotype + # - *might need code update in gn-auth: remove restriction, perhaps* + # 2. Create resource (perhaps?) + # - Get resource categories: http://localhost:8081/auth/resource/categories + # - Create a new resource: http://localhost:80host:8081/auth/resource/create + # - single resource for all phenotypes + # - resource name from user, species, population, dataset, datetime? + # - User will have "ownership" of resource by default + # 3. Link data to the resource: http://localhost:8081/auth/resource/data/link + # - Update code to allow linking multiple items in a single request + _tries = 0 # TODO use this to limit how many tries before quiting and bailing + _delay = 1 + headers = { + "Authorization": f"Bearer {token}", + "Content-Type": "application/json" + } + def authserveruri(endpoint): + return urljoin(authserver, endpoint) + + def __fetch_user_details__(): + logger.debug("… Fetching user details") + return mrequests.get( + authserveruri("/auth/user/"), + headers=headers + ) + + def __link_data__(user): + logger.debug("… linking uploaded data to user's group") + return mrequests.post( + authserveruri("/auth/data/link/phenotype"), + headers=headers, + json={ + "species_name": species["Name"], + "group_id": user["group"]["group_id"], + "selected": [ + { + "SpeciesId": species["SpeciesId"], + "InbredSetId": population["Id"], + "PublishFreezeId": dataset["Id"], + "dataset_name": dataset["Name"], + "dataset_fullname": dataset["FullName"], + "dataset_shortname": dataset["ShortName"], + "PublishXRefId": item["xref_id"] + } + for item in xrefdata + ], + "using-raw-ids": "on" + }).then(lambda ld_results: (user, ld_results)) + + def __fetch_phenotype_category_details__(user, linkeddata): + logger.debug("… fetching phenotype category details") + return mrequests.get( + authserveruri("/auth/resource/categories"), + headers=headers + ).then( + lambda categories: ( + user, + linkeddata, + next(category for category in categories + if category["resource_category_key"] == "phenotype")) + ) + + def __create_resource__(user, linkeddata, category): + logger.debug("… creating authorisation resource object") + now = datetime.datetime.now().isoformat() + return mrequests.post( + authserveruri("/auth/resource/create"), + headers=headers, + json={ + "resource_category": category["resource_category_id"], + "resource_name": (f"{user['email']}—{dataset['Name']}—{now}—" + f"{len(xrefdata)} phenotypes"), + "public": "off" + }).then(lambda cr_results: (user, linkeddata, cr_results)) + + def __attach_data_to_resource__(user, linkeddata, resource): + logger.debug("… attaching data to authorisation resource object") + return mrequests.post( + authserveruri("/auth/resource/data/link"), + headers=headers, + json={ + "dataset_type": "phenotype", + "resource_id": resource["resource_id"], + "data_link_ids": [ + item["data_link_id"] for item in linkeddata["traits"]] + }).then(lambda attc: (user, linkeddata, resource, attc)) + + def __handle_error__(resp): + logger.error("ERROR: Updating the authorisation for the data failed.") + logger.debug( + "ERROR: The response from the authorisation server was:\n\t%s", + resp.json()) + return 1 + + def __handle_success__(val): + logger.info( + "The authorisation for the data has been updated successfully.") + return 0 + + return __fetch_user_details__().then(__link_data__).then( + lambda result: __fetch_phenotype_category_details__(*result) + ).then( + lambda result: __create_resource__(*result) + ).then( + lambda result: __attach_data_to_resource__(*result) + ).either(__handle_error__, __handle_success__) + + +def load_data(conn: mysqldb.Connection, job: dict) -> int: + """Load the data attached in the given job.""" + _job_metadata = job["metadata"] + # Steps + # 0. Read data from the files: can be multiple files per type + # + _species = species_by_id(conn, int(_job_metadata["species_id"])) + _population = population_by_species_and_id( + conn, + _species["SpeciesId"], + int(_job_metadata["population_id"])) + _dataset = dataset_by_id( + conn, + _species["SpeciesId"], + _population["Id"], + int(_job_metadata["dataset_id"])) + # 1. Just retrive the publication: Don't create publications for now. + _publication = fetch_publication_by_id( + conn, int(_job_metadata.get("publication_id", "0"))) or {"Id": 0} + # 2. Save all new phenotypes: + # -> return phenotype IDs + bundle = Path(_job_metadata["bundle_file"]) + _control_data = rqtl2.control_data(bundle) + logger.info("Extracting the zipped bundle of files.") + _outdir = Path(bundle.parent, f"bundle_{bundle.stem}") + with ZipFile(str(bundle), "r") as zfile: + _files = rqtl2.extract(zfile, _outdir) + logger.info("Saving new phenotypes.") + _phenos = save_phenotypes(conn, _control_data, _outdir) + def __build_phenos_maps__(accumulator, current): + dataid, row = current + return ({ + **accumulator[0], + row["phenotype_id"]: { + "population_id": _population["Id"], + "phenotype_id": row["phenotype_id"], + "data_id": dataid, + "publication_id": _publication["Id"], + } + }, { + **accumulator[1], + row["id"]: row["phenotype_id"] + }) + dataidmap, pheno_name2id = reduce( + __build_phenos_maps__, + enumerate(_phenos, start=__fetch_next_dataid__(conn)), + ({},{})) + # 3. a. Fetch the strain names and IDS: create name->ID map + samples = { + row["Name"]: row + for row in samples_by_species_and_population( + conn, _species["SpeciesId"], _population["Id"])} + # b. Save all the data items (DataIds are vibes), return new IDs + logger.info("Saving new phenotypes data.") + _num_data_rows = save_pheno_data(conn=conn, + dataidmap=dataidmap, + pheno_name2id=pheno_name2id, + samples=samples, + control_data=_control_data, + filesdir=_outdir) + logger.info("Saved %s new phenotype data rows.", _num_data_rows) + # 4. Cross-reference Phenotype, Publication, and PublishData in PublishXRef + logger.info("Cross-referencing new phenotypes to their data and publications.") + _xrefs = cross_reference_phenotypes_publications_and_data( + conn, tuple(dataidmap.values())) + # 5. If standard errors and N exist, save them too + # (use IDs returned in `3. b.` above). + if _control_data.get("phenose"): + logger.info("Saving new phenotypes standard errors.") + _num_se_rows = save_phenotypes_se(conn=conn, + dataidmap=dataidmap, + pheno_name2id=pheno_name2id, + samples=samples, + control_data=_control_data, + filesdir=_outdir) + logger.info("Saved %s new phenotype standard error rows.", _num_se_rows) + + if _control_data.get("phenonum"): + logger.info("Saving new phenotypes sample counts.") + _num_n_rows = save_phenotypes_n(conn=conn, + dataidmap=dataidmap, + pheno_name2id=pheno_name2id, + samples=samples, + control_data=_control_data, + filesdir=_outdir) + logger.info("Saved %s new phenotype sample counts rows.", _num_n_rows) + + return (_species, _population, _dataset, _xrefs) + + +if __name__ == "__main__": + def parse_args(): + """Setup command-line arguments.""" + parser = argparse.ArgumentParser( + prog="load_phenotypes_to_db", + description="Process the phenotypes' data and load it into the database.") + parser.add_argument("db_uri", type=str, help="MariaDB/MySQL connection URL") + parser.add_argument( + "jobs_db_path", type=Path, help="Path to jobs' SQLite database.") + parser.add_argument("job_id", type=uuid.UUID, help="ID of the running job") + parser.add_argument( + "--log-level", + type=str, + help="Determines what is logged out.", + choices=("debug", "info", "warning", "error", "critical"), + default="info") + return parser.parse_args() + + def setup_logging(log_level: str): + """Setup logging for the script.""" + logger.setLevel(log_level) + logging.getLogger("uploader.phenotypes.models").setLevel(log_level) + + + def main(): + """Entry-point for this script.""" + args = parse_args() + setup_logging(args.log_level.upper()) + + with (mysqldb.database_connection(args.db_uri) as conn, + conn.cursor(cursorclass=DictCursor) as cursor, + sqlite3.connection(args.jobs_db_path) as jobs_conn): + job = jobs.job(jobs_conn, args.job_id) + + # Lock the PublishXRef/PublishData/PublishSE/NStrain here: Why? + # The `DataId` values are sequential, but not auto-increment + # Can't convert `PublishXRef`.`DataId` to AUTO_INCREMENT. + # `SELECT MAX(DataId) FROM PublishXRef;` + # How do you check for a table lock? + # https://oracle-base.com/articles/mysql/mysql-identify-locked-tables + # `SHOW OPEN TABLES LIKE 'Publish%';` + _db_tables_ = ( + "Species", + "InbredSet", + "Strain", + "StrainXRef", + "Publication", + "Phenotype", + "PublishXRef", + "PublishFreeze", + "PublishData", + "PublishSE", + "NStrain") + + logger.debug( + ("Locking database tables for the connection:" + + "".join("\n\t- %s" for _ in _db_tables_) + "\n"), + *_db_tables_) + cursor.execute(# Lock the tables to avoid race conditions + "LOCK TABLES " + ", ".join( + f"{_table} WRITE" for _table in _db_tables_)) + + db_results = load_data(conn, job) + jobs.update_metadata( + jobs_conn, + args.job_id, + "xref_ids", + json.dumps([xref["xref_id"] for xref in db_results[3]])) + + logger.info("Unlocking all database tables.") + cursor.execute("UNLOCK TABLES") + + # Update authorisations (break this down) — maybe loop until it works? + logger.info("Updating authorisation.") + _job_metadata = job["metadata"] + return update_auth(_job_metadata["authserver"], + _job_metadata["token"], + *db_results) + + + try: + sys.exit(main()) + except Exception as _exc: + logger.debug("Data loading failed… Halting!", + exc_info=True) + sys.exit(1) diff --git a/scripts/phenotypes_bulk_edit.py b/scripts/phenotypes_bulk_edit.py new file mode 100644 index 0000000..cee5f4e --- /dev/null +++ b/scripts/phenotypes_bulk_edit.py @@ -0,0 +1,266 @@ +import sys +import uuid +import logging +import argparse +from pathlib import Path +from typing import Iterator +from functools import reduce + +from MySQLdb.cursors import DictCursor + +from gn_libs import jobs, mysqldb, sqlite3 + +from uploader.phenotypes.models import phenotypes_data_by_ids +from uploader.phenotypes.misc import phenotypes_data_differences +from uploader.phenotypes.views import BULK_EDIT_COMMON_FIELDNAMES + +import uploader.publications.pubmed as pmed +from uploader.publications.misc import publications_differences +from uploader.publications.models import ( + update_publications, fetch_phenotype_publications) + +logging.basicConfig( + format="%(asctime)s — %(filename)s:%(lineno)s — %(levelname)s: %(message)s") +logger = logging.getLogger(__name__) + + +def check_ids(conn, ids: tuple[tuple[int, int], ...]) -> bool: + """Verify that all the `UniqueIdentifier` values are valid.""" + logger.info("Checking the 'UniqueIdentifier' values.") + with conn.cursor(cursorclass=DictCursor) as cursor: + paramstr = ",".join(["(%s, %s)"] * len(ids)) + cursor.execute( + "SELECT PhenotypeId AS phenotype_id, Id AS xref_id " + "FROM PublishXRef " + f"WHERE (PhenotypeId, Id) IN ({paramstr})", + tuple(item for row in ids for item in row)) + mysqldb.debug_query(cursor, logger) + found = tuple((row["phenotype_id"], row["xref_id"]) + for row in cursor.fetchall()) + + not_found = tuple(item for item in ids if item not in found) + if len(not_found) == 0: + logger.info("All 'UniqueIdentifier' are valid.") + return True + + for item in not_found: + logger.error(f"Invalid 'UniqueIdentifier' value: phId:%s::xrId:%s", item[0], item[1]) + + return False + + +def check_for_mandatory_fields(): + """Verify that mandatory fields have values.""" + pass + + +def __fetch_phenotypes__(conn, ids: tuple[int, ...]) -> tuple[dict, ...]: + """Fetch basic (non-numeric) phenotypes data from the database.""" + with conn.cursor(cursorclass=DictCursor) as cursor: + paramstr = ",".join(["%s"] * len(ids)) + cursor.execute(f"SELECT * FROM Phenotype WHERE Id IN ({paramstr}) " + "ORDER BY Id ASC", + ids) + return tuple(dict(row) for row in cursor.fetchall()) + + +def descriptions_differences(file_data, db_data) -> dict[str, str]: + """Compute differences in the descriptions.""" + logger.info("Computing differences in phenotype descriptions.") + assert len(file_data) == len(db_data), "The counts of phenotypes differ!" + description_columns = ("Pre_publication_description", + "Post_publication_description", + "Original_description", + "Pre_publication_abbreviation", + "Post_publication_abbreviation") + diff = tuple() + for file_row, db_row in zip(file_data, db_data): + assert file_row["phenotype_id"] == db_row["Id"] + inner_diff = { + key: file_row[key] + for key in description_columns + if not file_row[key] == db_row[key] + } + if bool(inner_diff): + diff = diff + ({ + "phenotype_id": file_row["phenotype_id"], + **inner_diff + },) + + return diff + + +def update_descriptions(): + """Update descriptions in the database""" + logger.info("Updating descriptions") + # Compute differences between db data and uploaded file + # Only run query for changed descriptions + pass + + +def link_publications(): + """Link phenotypes to relevant publications.""" + logger.info("Linking phenotypes to publications.") + # Create publication if PubMed_ID doesn't exist in db + pass + + +def update_values(): + """Update the phenotype values.""" + logger.info("Updating phenotypes values.") + # Compute differences between db data and uploaded file + # Only run query for changed data + pass + + +def parse_args(): + parser = argparse.ArgumentParser( + prog="Phenotypes Bulk-Edit Processor", + description="Process the bulk-edits to phenotype data and descriptions.") + parser.add_argument("db_uri", type=str, help="MariaDB/MySQL connection URL") + parser.add_argument( + "jobs_db_path", type=Path, help="Path to jobs' SQLite database.") + parser.add_argument("job_id", type=uuid.UUID, help="ID of the running job") + parser.add_argument( + "--log-level", + type=str, + help="Determines what is logged out.", + choices=("debug", "info", "warning", "error", "critical"), + default="info") + return parser.parse_args() + + +def read_file(filepath: Path) -> Iterator[str]: + """Read the file, one line at a time.""" + with filepath.open(mode="r", encoding="utf-8") as infile: + count = 0 + headers = None + for line in infile: + if line.startswith("#"): # ignore comments + continue; + + fields = line.strip().split("\t") + if count == 0: + headers = fields + count = count + 1 + continue + + _dict = dict(zip( + headers, + ((None if item.strip() == "" else item.strip()) + for item in fields))) + _pheno, _xref = _dict.pop("UniqueIdentifier").split("::") + _dict = { + key: ((float(val) if bool(val) else val) + if key not in BULK_EDIT_COMMON_FIELDNAMES + else val) + for key, val in _dict.items() + } + _dict["phenotype_id"] = int(_pheno.split(":")[1]) + _dict["xref_id"] = int(_xref.split(":")[1]) + if _dict["PubMed_ID"] is not None: + _dict["PubMed_ID"] = int(_dict["PubMed_ID"]) + + yield _dict + count = count + 1 + + +def run(conn, job): + """Process the data and update it.""" + file_contents = tuple(sorted(read_file(Path(job["metadata"]["edit-file"])), + key=lambda item: item["phenotype_id"])) + pheno_ids, pheno_xref_ids, pubmed_ids = reduce( + lambda coll, curr: ( + coll[0] + (curr["phenotype_id"],), + coll[1] + ((curr["phenotype_id"], curr["xref_id"]),), + coll[2].union(set([curr["PubMed_ID"]]))), + file_contents, + (tuple(), tuple(), set([None]))) + check_ids(conn, pheno_xref_ids) + check_for_mandatory_fields() + # stop running here if any errors are found. + + ### Compute differences + logger.info("Computing differences.") + # 1. Basic Phenotype data differences + # a. Descriptions differences + _desc_diff = descriptions_differences( + file_contents, __fetch_phenotypes__(conn, pheno_ids)) + logger.debug("DESCRIPTIONS DIFFERENCES: %s", _desc_diff) + + # b. Publications differences + _db_publications = fetch_phenotype_publications(conn, pheno_xref_ids) + logger.debug("DB PUBLICATIONS: %s", _db_publications) + + _pubmed_map = { + (int(row["PubMed_ID"]) if bool(row["PubMed_ID"]) else None): f"{row['phenotype_id']}::{row['xref_id']}" + for row in file_contents + } + _pub_id_map = { + f"{pub['PhenotypeId']}::{pub['xref_id']}": pub["PublicationId"] + for pub in _db_publications + } + + _new_publications = update_publications( + conn, tuple({ + **pub, "publication_id": _pub_id_map[_pubmed_map[pub["pubmed_id"]]] + } for pub in pmed.fetch_publications(tuple( + pubmed_id for pubmed_id in pubmed_ids + if pubmed_id not in + tuple(row["PubMed_ID"] for row in _db_publications))))) + _pub_diff = publications_differences( + file_contents, _db_publications, { + row["PubMed_ID" if "PubMed_ID" in row else "pubmed_id"]: row[ + "PublicationId" if "PublicationId" in row else "publication_id"] + for row in _db_publications + _new_publications}) + logger.debug("Publications diff: %s", _pub_diff) + # 2. Data differences + _db_pheno_data = phenotypes_data_by_ids(conn, tuple({ + "population_id": job["metadata"]["population-id"], + "phenoid": row[0], + "xref_id": row[1] + } for row in pheno_xref_ids)) + + data_diff = phenotypes_data_differences( + ({ + "phenotype_id": row["phenotype_id"], + "xref_id": row["xref_id"], + "data": { + key:val for key,val in row.items() + if key not in BULK_EDIT_COMMON_FIELDNAMES + [ + "phenotype_id", "xref_id"] + } + } for row in file_contents), + ({ + **row, + "PhenotypeId": row["Id"], + "data": { + dataitem["StrainName"]: dataitem + for dataitem in row["data"].values() + } + } for row in _db_pheno_data)) + logger.debug("Data differences: %s", data_diff) + ### END: Compute differences + update_descriptions() + link_publications() + update_values() + return 0 + + +def main(): + """Entry-point for this script.""" + args = parse_args() + logger.setLevel(args.log_level.upper()) + logger.debug("Arguments: %s", args) + + logging.getLogger("uploader.phenotypes.misc").setLevel(args.log_level.upper()) + logging.getLogger("uploader.phenotypes.models").setLevel(args.log_level.upper()) + logging.getLogger("uploader.publications.models").setLevel(args.log_level.upper()) + + with (mysqldb.database_connection(args.db_uri) as conn, + sqlite3.connection(args.jobs_db_path) as jobs_conn): + return run(conn, jobs.job(jobs_conn, args.job_id)) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/process_rqtl2_bundle.py b/scripts/process_rqtl2_bundle.py index 4da3936..8b7a0fb 100644 --- a/scripts/process_rqtl2_bundle.py +++ b/scripts/process_rqtl2_bundle.py @@ -2,6 +2,7 @@ import sys import uuid import json +import argparse import traceback from typing import Any from pathlib import Path @@ -10,16 +11,16 @@ from logging import Logger, getLogger, StreamHandler import MySQLdb as mdb from redis import Redis +from gn_libs.mysqldb import database_connection from functional_tools import take -import r_qtl.errors as rqe import r_qtl.r_qtl2 as rqtl2 import r_qtl.r_qtl2_qc as rqc +import r_qtl.exceptions as rqe -from qc_app import jobs -from qc_app.db_utils import database_connection -from qc_app.check_connections import check_db, check_redis +from uploader import jobs +from uploader.check_connections import check_db, check_redis from scripts.cli_parser import init_cli_parser from scripts.redis_logger import setup_redis_logger @@ -93,11 +94,14 @@ def process_bundle(dbconn: mdb.Connection, if has_geno_file(thejob): logger.info("Processing geno files.") genoexit = install_genotypes( + rconn, dbconn, - meta["speciesid"], - meta["populationid"], - meta["geno-dataset-id"], - Path(meta["rqtl2-bundle-file"]), + f"{rprefix}:{jobid}", + argparse.Namespace( + speciesid=meta["speciesid"], + populationid=meta["populationid"], + datasetid=meta["geno-dataset-id"], + rqtl2bundle=Path(meta["rqtl2-bundle-file"])), logger) if genoexit != 0: raise Exception("Processing 'geno' file failed.") @@ -108,11 +112,14 @@ def process_bundle(dbconn: mdb.Connection, if has_pheno_file(thejob): phenoexit = install_pheno_files( + rconn, dbconn, - meta["speciesid"], - meta["platformid"], - meta["probe-dataset-id"], - Path(meta["rqtl2-bundle-file"]), + f"{rprefix}:{jobid}", + argparse.Namespace( + speciesid=meta["speciesid"], + platformid=meta["platformid"], + dataset_id=meta["probe-dataset-id"], + rqtl2bundle=Path(meta["rqtl2-bundle-file"])), logger) if phenoexit != 0: raise Exception("Processing 'pheno' file failed.") diff --git a/scripts/qc.py b/scripts/qc.py index e8573a9..b00f4c1 100644 --- a/scripts/qc.py +++ b/scripts/qc.py @@ -5,14 +5,14 @@ import mimetypes from typing import Union, Callable from argparse import ArgumentParser +from gn_libs.mysqldb import database_connection + from functional_tools import take from quality_control.utils import make_progress_calculator from quality_control.errors import InvalidValue, DuplicateHeading from quality_control.parsing import FileType, strain_names, collect_errors -from qc_app.db_utils import database_connection - from .cli_parser import init_cli_parser diff --git a/scripts/qc_on_rqtl2_bundle.py b/scripts/qc_on_rqtl2_bundle.py index deef8fe..9f9248c 100644 --- a/scripts/qc_on_rqtl2_bundle.py +++ b/scripts/qc_on_rqtl2_bundle.py @@ -10,22 +10,24 @@ import multiprocessing as mproc from logging import Logger, getLogger, StreamHandler from typing import Union, Sequence, Callable, Iterator +import MySQLdb as mdb from redis import Redis +from gn_libs.mysqldb import database_connection from quality_control.errors import InvalidValue from quality_control.checks import decimal_points_error -from qc_app import jobs -from qc_app.check_connections import check_db, check_redis +from uploader import jobs +from uploader.check_connections import check_db, check_redis -from r_qtl import errors as rqe from r_qtl import r_qtl2 as rqtl2 from r_qtl import r_qtl2_qc as rqc +from r_qtl import exceptions as rqe from r_qtl import fileerrors as rqfe -from scripts.cli_parser import init_cli_parser from scripts.process_rqtl2_bundle import parse_job from scripts.redis_logger import setup_redis_logger +from scripts.cli_parser import init_cli_parser, add_global_data_arguments def dict2tuple(dct: dict) -> tuple: """Utility to convert items in dicts to pairs of tuples.""" @@ -103,13 +105,13 @@ def retrieve_errors_with_progress(rconn: Redis,#pylint: disable=[too-many-locals __update_processed__(value) rconn.hset(fqjobid, f"{filetype}-linecount", count) - except rqe.MissingFileError: + except rqe.MissingFileException: fname = cdata.get(filetype) yield rqfe.MissingFile(filetype, fname, ( f"The file '{fname}' does not exist in the bundle despite it being " f"listed under '{filetype}' in the control file.")) -def qc_geno_errors(rconn, fqjobid, zfile, logger) -> bool: +def qc_geno_errors(rconn, fqjobid, _dburi, _speciesid, zfile, logger) -> bool: """Check for errors in `geno` file(s).""" cdata = rqtl2.control_data(zfile) if "geno" in cdata: @@ -129,15 +131,32 @@ def qc_geno_errors(rconn, fqjobid, zfile, logger) -> bool: return False -def check_pheno_samples(zipfilepath: Union[str, Path], logger: Logger) -> tuple[ - Union[InvalidValue, rqfe.MissingFile], ...]: +def fetch_db_geno_samples(conn: mdb.Connection, speciesid: int) -> tuple[str, ...]: + """Fetch samples/cases/individuals from the database.""" + samples = set()# type: ignore[var-annotated] + with conn.cursor() as cursor: + cursor.execute("SELECT Name, Name2 from Strain WHERE SpeciesId=%s", + (speciesid,)) + rows = cursor.fetchall() or tuple() + for row in rows: + samples.update(tuple(row)) + + return tuple(item.strip() for item in samples if bool(item)) + + +def check_pheno_samples( + conn: mdb.Connection, + speciesid: int, + zipfilepath: Union[str, Path], + logger: Logger +) -> tuple[Union[InvalidValue, rqfe.MissingFile], ...]: """Check that samples in 'pheno' file exist in geno file.""" cdata = rqtl2.read_control_file(zipfilepath) genosamples = tuple( sample for perfilesamples in ( rqtl2.load_samples(zipfilepath, member, cdata["geno_transposed"]) for member in cdata["geno"]) - for sample in perfilesamples) + for sample in perfilesamples) + fetch_db_geno_samples(conn, speciesid) def __check_file__(member) -> tuple[InvalidValue, ...]: logger.info("Checking samples/cases in member file '%s' …", member) @@ -149,7 +168,9 @@ def check_pheno_samples(zipfilepath: Union[str, Path], logger: Logger) -> tuple[ errors = errors + (InvalidValue( member, "-", "-", sample, f"The individual/case/sample '{sample}' in file " - f"{member} does not exist in any of the 'geno' files."),) + f"{member} does not exist in either, any of the 'geno' " + "files provided in the bundle or the GeneNetwork database." + ),) logger.info("Found %s missing samples in member file '%s'.", len(errors), @@ -170,15 +191,21 @@ def check_pheno_samples(zipfilepath: Union[str, Path], logger: Logger) -> tuple[ return allerrors -def qc_pheno_errors(rconn, fqjobid, zfile, logger) -> bool: +def qc_pheno_errors(# pylint: disable=[too-many-arguments] + rconn, fqjobid, dburi, speciesid, zfile, logger) -> bool: """Check for errors in `pheno` file(s).""" cdata = rqtl2.control_data(zfile) if "pheno" in cdata: logger.info("Checking for errors in the 'pheno' file…") - perrs = check_pheno_samples(zfile.filename, logger) + tuple( - retrieve_errors_with_progress( - rconn,fqjobid, zfile, "pheno", - (partial(decimal_points_error, filename="pheno", mini=3),))) + perrs = tuple()# type: ignore[var-annotated] + with database_connection(dburi) as dbconn: + perrs = check_pheno_samples( + dbconn, speciesid, zfile.filename, logger) + tuple( + retrieve_errors_with_progress( + rconn,fqjobid, zfile, "pheno", + (partial(decimal_points_error, + filename="pheno", + mini=3),))) add_to_errors(rconn, fqjobid, "errors-generic", tuple( err for err in perrs if isinstance(err, rqfe.MissingFile))) add_to_errors(rconn, fqjobid, "errors-pheno", tuple( @@ -190,7 +217,8 @@ def qc_pheno_errors(rconn, fqjobid, zfile, logger) -> bool: return False -def qc_phenose_errors(rconn, fqjobid, zfile, logger) -> bool: +def qc_phenose_errors(# pylint: disable=[too-many-arguments] + rconn, fqjobid, _dburi, _speciesid, zfile, logger) -> bool: """Check for errors in `phenose` file(s).""" cdata = rqtl2.control_data(zfile) if "phenose" in cdata: @@ -209,7 +237,14 @@ def qc_phenose_errors(rconn, fqjobid, zfile, logger) -> bool: return False -def qc_phenocovar_errors(_rconn, _fqjobid, _zfile, _logger) -> bool: +def qc_phenocovar_errors( + _rconn, + _fqjobid, + _dburi, + _speciesid, + _zfile, + _logger +) -> bool: """Check for errors in `phenocovar` file(s).""" return False @@ -225,12 +260,20 @@ def run_qc(rconn: Redis, if qc_missing_files(rconn, fqjobid, zfile, logger): return 1 - def with_zipfile(rconn, fqjobid, filename, logger, func): + def with_zipfile(# pylint: disable=[too-many-arguments] + rconn, fqjobid, dbconn, speciesid, filename, logger, func + ): with ZipFile(filename, "r") as zfile: - return func(rconn, fqjobid, zfile, logger) + return func(rconn, fqjobid, dbconn, speciesid, zfile, logger) def buildargs(func): - return (rconn, fqjobid, jobmeta["rqtl2-bundle-file"], logger, func) + return (rconn, + fqjobid, + args.databaseuri, + args.speciesid, + jobmeta["rqtl2-bundle-file"], + logger, + func) processes = [ mproc.Process(target=with_zipfile, args=buildargs(qc_geno_errors,)), mproc.Process(target=with_zipfile, args=buildargs(qc_pheno_errors,)), @@ -263,8 +306,8 @@ def run_qc(rconn: Redis, if __name__ == "__main__": def main(): """Enter R/qtl2 bundle QC runner.""" - args = init_cli_parser( - "qc-on-rqtl2-bundle", "Run QC on R/qtl2 bundle.").parse_args() + args = add_global_data_arguments(init_cli_parser( + "qc-on-rqtl2-bundle", "Run QC on R/qtl2 bundle.")).parse_args() check_redis(args.redisuri) check_db(args.databaseuri) diff --git a/scripts/qcapp_wsgi.py b/scripts/qcapp_wsgi.py index 9fb63cb..fe77031 100644 --- a/scripts/qcapp_wsgi.py +++ b/scripts/qcapp_wsgi.py @@ -1,11 +1,36 @@ """Run the application""" +import os import sys from logging import getLogger, StreamHandler from flask import Flask -from qc_app import create_app -from qc_app.check_connections import check_db, check_redis +from uploader import create_app +from uploader.check_connections import check_db, check_redis + +def setup_logging(appl: Flask) -> Flask: + """Setup appropriate logging paradigm depending on environment.""" + # https://datatracker.ietf.org/doc/html/draft-coar-cgi-v11-03#section-4.1.17 + # https://wsgi.readthedocs.io/en/latest/proposals-2.0.html#making-some-keys-required + # https://peps.python.org/pep-3333/#id4 + software, *_version_and_comments = os.environ.get( + "SERVER_SOFTWARE", "").split('/') + if bool(software): + gunicorn_logger = getLogger("gunicorn.error") + appl.logger.handlers = gunicorn_logger.handlers + appl.logger.setLevel(gunicorn_logger.level)#pylint: disable=[no-member] + else: + loglevel = appl.config["LOG_LEVEL"].upper() + # Maybe call `logging.dictConfig(…)` here instead of all this stuff below + handler_stderr = StreamHandler(stream=sys.stderr) + appl.logger.addHandler(handler_stderr) + rootlogger = getLogger() + rootlogger.addHandler(handler_stderr) + rootlogger.setLevel(loglevel) + appl.logger.setLevel(loglevel) + + return appl + def check_and_build_app() -> Flask: """Setup the application for running.""" @@ -15,30 +40,11 @@ def check_and_build_app() -> Flask: # Check connections check_db(appl.config["SQL_URI"]) check_redis(appl.config["REDIS_URL"]) - return appl + return setup_logging(appl) -def setup_logging(appl: Flask): - """Setup application logging""" - loglevel = appl.config["LOG_LEVEL"].upper() - - # Maybe call `logging.dictConfig(…)` here instead of all this stuff below - handler_stderr = StreamHandler(stream=sys.stderr) - appl.logger.addHandler(handler_stderr) - - rootlogger = getLogger() - rootlogger.addHandler(handler_stderr) - rootlogger.setLevel(loglevel) - - appl.logger.setLevel(loglevel) app = check_and_build_app() -if __name__ != "__main__":# Running via gunicorn - gunicorn_logger = getLogger("gunicorn.error") - app.logger.handlers = gunicorn_logger.handlers - app.logger.setLevel(gunicorn_logger.level)#pylint: disable=[no-member] - -if __name__ == "__main__":# Running via flask +if __name__ == "__main__": # Run the app - setup_logging(app) app.run() diff --git a/scripts/redis_logger.py b/scripts/redis_logger.py index 2ae682b..d3fde5f 100644 --- a/scripts/redis_logger.py +++ b/scripts/redis_logger.py @@ -1,5 +1,6 @@ """Utilities to log to redis for our worker scripts.""" import logging +from typing import Optional from redis import Redis @@ -26,6 +27,26 @@ class RedisLogger(logging.Handler): self.redisconnection.rpush(self.messageslistname, self.format(record)) self.redisconnection.expire(self.messageslistname, self.expiry) +class RedisMessageListHandler(logging.Handler): + """Send messages to specified redis list.""" + def __init__(self, + rconn: Redis, + fullyqualifiedkey: str, + loglevel: int = logging.NOTSET, + expiry: Optional[int] = 86400): + super().__init__(loglevel) + self.redisconnection = rconn + self.fullyqualifiedkey = fullyqualifiedkey + self.expiry = expiry + + def emit(self, record): + """Log out to specified `fullyqualifiedkey`.""" + self.redisconnection.rpush(self.fullyqualifiedkey, self.format(record)) + if bool(self.expiry): + self.redisconnection.expire(self.fullyqualifiedkey, self.expiry) + else: + self.redisconnection.persist(self.fullyqualifiedkey) + def setup_redis_logger(rconn: Redis, fullyqualifiedjobid: str, job_messagelist: str, diff --git a/scripts/rqtl2/bundleutils.py b/scripts/rqtl2/bundleutils.py new file mode 100644 index 0000000..17faa7c --- /dev/null +++ b/scripts/rqtl2/bundleutils.py @@ -0,0 +1,44 @@ +"""Common utilities to operate in R/qtl2 bundles.""" +from typing import Union, Callable + +def build_line_splitter(cdata: dict) -> Callable[[str], tuple[Union[str, None], ...]]: + """Build and return a function to use to split data in the files. + + Parameters + ---------- + cdata: A dict holding the control information included with the R/qtl2 + bundle. + + Returns + ------- + A function that takes a string and return a tuple of strings. + """ + separator = cdata["sep"] + na_strings = cdata["na.strings"] + def __splitter__(line: str) -> tuple[Union[str, None], ...]: + return tuple( + item if item not in na_strings else None + for item in + (field.strip() for field in line.strip().split(separator))) + return __splitter__ + + +def build_line_joiner(cdata: dict) -> Callable[[tuple[Union[str, None], ...]], str]: + """Build and return a function to use to split data in the files. + + Parameters + ---------- + cdata: A dict holding the control information included with the R/qtl2 + bundle. + + Returns + ------- + A function that takes a string and return a tuple of strings. + """ + separator = cdata["sep"] + na_strings = cdata["na.strings"] + def __joiner__(row: tuple[Union[str, None], ...]) -> str: + return separator.join( + (na_strings[0] if item is None else item) + for item in row) + return __joiner__ diff --git a/scripts/rqtl2/cli_parser.py b/scripts/rqtl2/cli_parser.py index bcc7a4f..9bb60a3 100644 --- a/scripts/rqtl2/cli_parser.py +++ b/scripts/rqtl2/cli_parser.py @@ -2,12 +2,22 @@ from pathlib import Path from argparse import ArgumentParser -def add_common_arguments(parser: ArgumentParser) -> ArgumentParser: - """Add common arguments to the CLI parser.""" - parser.add_argument("datasetid", - type=int, - help="The dataset to which the data belongs.") +def add_bundle_argument(parser: ArgumentParser) -> ArgumentParser: + """Add the `rqtl2bundle` argument.""" parser.add_argument("rqtl2bundle", type=Path, help="Path to R/qtl2 bundle zip file.") return parser + + +def add_datasetid_argument(parser: ArgumentParser) -> ArgumentParser: + """Add the `datasetid` argument.""" + parser.add_argument("datasetid", + type=int, + help="The dataset to which the data belongs.") + return parser + + +def add_common_arguments(parser: ArgumentParser) -> ArgumentParser: + """Add common arguments to the CLI parser.""" + return add_bundle_argument(add_datasetid_argument(parser)) diff --git a/scripts/rqtl2/entry.py b/scripts/rqtl2/entry.py index 93fc130..e0e00e7 100644 --- a/scripts/rqtl2/entry.py +++ b/scripts/rqtl2/entry.py @@ -1,38 +1,58 @@ """Build common script-entry structure.""" -from logging import Logger +import sys +import logging from typing import Callable from argparse import Namespace +from logging import StreamHandler from redis import Redis from MySQLdb import Connection +from gn_libs.mysqldb import database_connection -from qc_app import jobs -from qc_app.db_utils import database_connection -from qc_app.check_connections import check_db, check_redis +from uploader import jobs +from uploader.check_connections import check_db, check_redis from scripts.redis_logger import setup_redis_logger -def build_main(args: Namespace, - run_fn: Callable[[Connection, Namespace], int], - logger: Logger, - loglevel: str = "INFO") -> Callable[[],int]: +def build_main( + args: Namespace, + run_fn: Callable[ + [Redis, Connection, str, Namespace, logging.Logger], + int + ], + logger: logging.Logger +) -> Callable[[],int]: """Build a function to be used as an entry-point for scripts.""" def main(): - check_db(args.databaseuri) - check_redis(args.redisuri) - if not args.rqtl2bundle.exists(): - logger.error("File not found: '%s'.", args.rqtl2bundle) - return 2 - with (Redis.from_url(args.redisuri, decode_responses=True) as rconn, database_connection(args.databaseuri) as dbconn): - fqjobid = jobs.job_key(jobs.jobsnamespace(), args.jobid) - logger.addHandler(setup_redis_logger( - rconn, - fqjobid, - f"{fqjobid}:log-messages", - args.redisexpiry)) - logger.setLevel(loglevel) - return run_fn(dbconn, args) + logger.setLevel(args.loglevel.upper()) + fqjobid = jobs.job_key(args.redisprefix, args.jobid) + + try: + rconn.hset(fqjobid, "status", "started") + logger.addHandler(setup_redis_logger( + rconn, + fqjobid, + f"{fqjobid}:log-messages", + args.redisexpiry)) + logger.addHandler(StreamHandler(stream=sys.stderr)) + + check_db(args.databaseuri) + check_redis(args.redisuri) + if not args.rqtl2bundle.exists(): + logger.error("File not found: '%s'.", args.rqtl2bundle) + return 2 + + returncode = run_fn(rconn, dbconn, fqjobid, args) + if returncode == 0: + rconn.hset(fqjobid, "status", "completed:success") + return returncode + rconn.hset(fqjobid, "status", "completed:error") + return returncode + except Exception as _exc:# pylint: disable=[broad-except] + logger.error("The process failed!", exc_info=True) + rconn.hset(fqjobid, "status", "completed:error") + return 4 return main diff --git a/scripts/rqtl2/install_genotypes.py b/scripts/rqtl2/install_genotypes.py index d0731a2..8762655 100644 --- a/scripts/rqtl2/install_genotypes.py +++ b/scripts/rqtl2/install_genotypes.py @@ -1,12 +1,13 @@ """Load genotypes from R/qtl2 bundle into the database.""" import sys +import argparse import traceback -from pathlib import Path from zipfile import ZipFile from functools import reduce from typing import Iterator, Optional -from logging import Logger, getLogger, StreamHandler +from logging import Logger, getLogger +from redis import Redis import MySQLdb as mdb from MySQLdb.cursors import DictCursor @@ -19,10 +20,15 @@ from scripts.rqtl2.entry import build_main from scripts.rqtl2.cli_parser import add_common_arguments from scripts.cli_parser import init_cli_parser, add_global_data_arguments -def insert_markers(dbconn: mdb.Connection, - speciesid: int, - markers: tuple[str, ...], - pmapdata: Optional[Iterator[dict]]) -> int: +__MODULE__ = "scripts.rqtl2.install_genotypes" + +def insert_markers( + dbconn: mdb.Connection, + speciesid: int, + markers: tuple[str, ...], + pmapdata: Optional[Iterator[dict]], + _logger: Logger +) -> int: """Insert genotype and genotype values into the database.""" mdata = reduce(#type: ignore[var-annotated] lambda acc, row: ({#type: ignore[arg-type, return-value] @@ -40,16 +46,20 @@ def insert_markers(dbconn: mdb.Connection, "VALUES (%(speciesid)s, %(marker)s, %(marker)s, %(chr)s, %(pos)s) " "ON DUPLICATE KEY UPDATE SpeciesId=SpeciesId", tuple({ - "speciesid": speciesid, - "marker": marker, - "chr": mdata.get(marker, {}).get("chr"), - "pos": mdata.get(marker, {}).get("pos") - } for marker in markers)) + (speciesid, marker): { + "speciesid": speciesid, + "marker": marker, + "chr": mdata.get(marker, {}).get("chr"), + "pos": mdata.get(marker, {}).get("pos") + } for marker in markers}.values())) return cursor.rowcount -def insert_individuals(dbconn: mdb.Connection, - speciesid: int, - individuals: tuple[str, ...]) -> int: +def insert_individuals( + dbconn: mdb.Connection, + speciesid: int, + individuals: tuple[str, ...], + _logger: Logger +) -> int: """Insert individuals/samples into the database.""" with dbconn.cursor() as cursor: cursor.executemany( @@ -60,10 +70,13 @@ def insert_individuals(dbconn: mdb.Connection, for individual in individuals)) return cursor.rowcount -def cross_reference_individuals(dbconn: mdb.Connection, - speciesid: int, - populationid: int, - individuals: tuple[str, ...]) -> int: +def cross_reference_individuals( + dbconn: mdb.Connection, + speciesid: int, + populationid: int, + individuals: tuple[str, ...], + _logger: Logger +) -> int: """Cross reference any inserted individuals.""" with dbconn.cursor(cursorclass=DictCursor) as cursor: paramstr = ", ".join(["%s"] * len(individuals)) @@ -79,11 +92,13 @@ def cross_reference_individuals(dbconn: mdb.Connection, tuple(ids)) return cursor.rowcount -def insert_genotype_data(dbconn: mdb.Connection, - speciesid: int, - genotypes: tuple[dict, ...], - individuals: tuple[str, ...]) -> tuple[ - int, tuple[dict, ...]]: +def insert_genotype_data( + dbconn: mdb.Connection, + speciesid: int, + genotypes: tuple[dict, ...], + individuals: tuple[str, ...], + _logger: Logger +) -> tuple[int, tuple[dict, ...]]: """Insert the genotype data values into the database.""" with dbconn.cursor(cursorclass=DictCursor) as cursor: paramstr = ", ".join(["%s"] * len(individuals)) @@ -119,11 +134,14 @@ def insert_genotype_data(dbconn: mdb.Connection, "markerid": row["markerid"] } for row in data) -def cross_reference_genotypes(dbconn: mdb.Connection, - speciesid: int, - datasetid: int, - dataids: tuple[dict, ...], - gmapdata: Optional[Iterator[dict]]) -> int: +def cross_reference_genotypes( + dbconn: mdb.Connection, + speciesid: int, + datasetid: int, + dataids: tuple[dict, ...], + gmapdata: Optional[Iterator[dict]], + _logger: Logger +) -> int: """Cross-reference the data to the relevant dataset.""" _rows, markers, mdata = reduce(#type: ignore[var-annotated] lambda acc, row: (#type: ignore[return-value,arg-type] @@ -139,31 +157,45 @@ def cross_reference_genotypes(dbconn: mdb.Connection, (tuple(), tuple(), {})) with dbconn.cursor(cursorclass=DictCursor) as cursor: - paramstr = ", ".join(["%s"] * len(markers)) - cursor.execute("SELECT Id, Name FROM Geno " - f"WHERE SpeciesId=%s AND Name IN ({paramstr})", - (speciesid,) + markers) - markersdict = {row["Id"]: row["Name"] for row in cursor.fetchall()} - cursor.executemany( + markersdict = {} + if len(markers) > 0: + paramstr = ", ".join(["%s"] * len(markers)) + insertparams = (speciesid,) + markers + selectquery = ("SELECT Id, Name FROM Geno " + f"WHERE SpeciesId=%s AND Name IN ({paramstr})") + _logger.debug( + "The select query was\n\t%s\n\nwith the parameters\n\t%s", + selectquery, + (speciesid,) + markers) + cursor.execute(selectquery, insertparams) + markersdict = {row["Id"]: row["Name"] for row in cursor.fetchall()} + + insertquery = ( "INSERT INTO GenoXRef(GenoFreezeId, GenoId, DataId, cM) " "VALUES(%(datasetid)s, %(markerid)s, %(dataid)s, %(pos)s) " - "ON DUPLICATE KEY UPDATE GenoFreezeId=GenoFreezeId", - tuple({ - **row, - "datasetid": datasetid, - "pos": mdata.get(markersdict.get( - row.get("markerid"), {}), {}).get("pos") - } for row in dataids)) + "ON DUPLICATE KEY UPDATE GenoFreezeId=GenoFreezeId") + insertparams = tuple({ + **row, + "datasetid": datasetid, + "pos": mdata.get(markersdict.get( + row.get("markerid"), "nosuchkey"), {}).get("pos") + } for row in dataids) + _logger.debug( + "The insert query was\n\t%s\n\nwith the parameters\n\t%s", + insertquery, insertparams) + cursor.executemany(insertquery, insertparams) return cursor.rowcount -def install_genotypes(#pylint: disable=[too-many-arguments, too-many-locals] +def install_genotypes(#pylint: disable=[too-many-locals] + rconn: Redis,#pylint: disable=[unused-argument] dbconn: mdb.Connection, - speciesid: int, - populationid: int, - datasetid: int, - rqtl2bundle: Path, - logger: Logger = getLogger()) -> int: + fullyqualifiedjobid: str,#pylint: disable=[unused-argument] + args: argparse.Namespace, + logger: Logger = getLogger(__name__) +) -> int: """Load any existing genotypes into the database.""" + (speciesid, populationid, datasetid, rqtl2bundle) = ( + args.speciesid, args.populationid, args.datasetid, args.rqtl2bundle) count = 0 with ZipFile(str(rqtl2bundle.absolute()), "r") as zfile: try: @@ -188,20 +220,22 @@ def install_genotypes(#pylint: disable=[too-many-arguments, too-many-locals] speciesid, tuple(key for key in batch[0].keys() if key != "id"), (rqtl2.file_data(zfile, "pmap", cdata) if "pmap" in cdata - else None)) + else None), + logger) individuals = tuple(row["id"] for row in batch) - insert_individuals(dbconn, speciesid, individuals) + insert_individuals(dbconn, speciesid, individuals, logger) cross_reference_individuals( - dbconn, speciesid, populationid, individuals) + dbconn, speciesid, populationid, individuals, logger) _num_rows, dataids = insert_genotype_data( - dbconn, speciesid, batch, individuals) + dbconn, speciesid, batch, individuals, logger) cross_reference_genotypes( dbconn, speciesid, datasetid, dataids, (rqtl2.file_data(zfile, "gmap", cdata) - if "gmap" in cdata else None)) + if "gmap" in cdata else None), + logger) count = count + len(batch) except rqtl2.InvalidFormat as exc: logger.error(str(exc)) @@ -223,15 +257,5 @@ if __name__ == "__main__": return parser.parse_args() - thelogger = getLogger("install_genotypes") - thelogger.addHandler(StreamHandler(stream=sys.stderr)) - main = build_main( - cli_args(), - lambda dbconn, args: install_genotypes(dbconn, - args.speciesid, - args.populationid, - args.datasetid, - args.rqtl2bundle), - thelogger, - "INFO") + main = build_main(cli_args(), install_genotypes, __MODULE__) sys.exit(main()) diff --git a/scripts/rqtl2/install_phenos.py b/scripts/rqtl2/install_phenos.py index b5cab8e..9059cd6 100644 --- a/scripts/rqtl2/install_phenos.py +++ b/scripts/rqtl2/install_phenos.py @@ -1,11 +1,12 @@ """Load pheno from R/qtl2 bundle into the database.""" import sys +import argparse import traceback -from pathlib import Path from zipfile import ZipFile from functools import reduce -from logging import Logger, getLogger, StreamHandler +from logging import Logger, getLogger +from redis import Redis import MySQLdb as mdb from MySQLdb.cursors import DictCursor @@ -18,6 +19,8 @@ from r_qtl import r_qtl2_qc as rqc from functional_tools import take +__MODULE__ = "scripts.rqtl2.install_phenos" + def insert_probesets(dbconn: mdb.Connection, platformid: int, phenos: tuple[str, ...]) -> int: @@ -93,14 +96,15 @@ def cross_reference_probeset_data(dbconn: mdb.Connection, } for row in dataids)) return cursor.rowcount -def install_pheno_files(#pylint: disable=[too-many-arguments, too-many-locals] +def install_pheno_files(#pylint: disable=[too-many-locals] + rconn: Redis,#pylint: disable=[unused-argument] dbconn: mdb.Connection, - speciesid: int, - platformid: int, - datasetid: int, - rqtl2bundle: Path, + fullyqualifiedjobid: str,#pylint: disable=[unused-argument] + args: argparse.Namespace, logger: Logger = getLogger()) -> int: """Load data in `pheno` files and other related files into the database.""" + (speciesid, platformid, datasetid, rqtl2bundle) = ( + args.speciesid, args.platformid, args.datasetid, args.rqtl2bundle) with ZipFile(str(rqtl2bundle), "r") as zfile: try: rqc.validate_bundle(zfile) @@ -155,16 +159,5 @@ if __name__ == "__main__": return parser.parse_args() - thelogger = getLogger("install_phenos") - thelogger.addHandler(StreamHandler(stream=sys.stderr)) - main = build_main( - cli_args(), - lambda dbconn, args: install_pheno_files(dbconn, - args.speciesid, - args.platformid, - args.datasetid, - args.rqtl2bundle, - thelogger), - thelogger, - "DEBUG") + main = build_main(cli_args(), install_pheno_files, __MODULE__) sys.exit(main()) diff --git a/scripts/rqtl2/phenotypes_qc.py b/scripts/rqtl2/phenotypes_qc.py new file mode 100644 index 0000000..5c89ca0 --- /dev/null +++ b/scripts/rqtl2/phenotypes_qc.py @@ -0,0 +1,516 @@ +"""Run quality control on phenotypes-specific files in the bundle.""" +import sys +import uuid +import json +import shutil +import logging +import tempfile +import contextlib +from pathlib import Path +from logging import Logger +from zipfile import ZipFile +from argparse import Namespace +import multiprocessing as mproc +from functools import reduce, partial +from typing import Union, Iterator, Callable, Optional, Sequence + +import MySQLdb as mdb +from redis import Redis + +from r_qtl import r_qtl2 as rqtl2 +from r_qtl import r_qtl2_qc as rqc +from r_qtl import exceptions as rqe +from r_qtl.fileerrors import InvalidValue + +from functional_tools import chain + +from quality_control.checks import decimal_places_pattern + +from uploader.files import sha256_digest_over_file +from uploader.samples.models import samples_by_species_and_population + +from scripts.rqtl2.entry import build_main +from scripts.redis_logger import RedisMessageListHandler +from scripts.rqtl2.cli_parser import add_bundle_argument +from scripts.cli_parser import init_cli_parser, add_global_data_arguments +from scripts.rqtl2.bundleutils import build_line_joiner, build_line_splitter + +__MODULE__ = "scripts.rqtl2.phenotypes_qc" +logging.basicConfig( + format=("%(asctime)s - %(levelname)s %(name)s: " + "(%(pathname)s: %(lineno)d) %(message)s")) +logger = logging.getLogger(__MODULE__) + +def validate(phenobundle: Path, logger: Logger) -> dict: + """Check that the bundle is generally valid""" + try: + rqc.validate_bundle(phenobundle) + except rqe.RQTLError as rqtlerr: + # logger.error("Bundle file validation failed!", exc_info=True) + return { + "skip": True, + "logger": logger, + "phenobundle": phenobundle, + "errors": (" ".join(rqtlerr.args),) + } + return { + "errors": tuple(), + "skip": False, + "phenobundle": phenobundle, + "logger": logger + } + + +def check_for_mandatory_pheno_keys( + phenobundle: Path, + logger: Logger, + **kwargs +) -> dict: + """Check that the mandatory keys exist for phenotypes.""" + if kwargs.get("skip", False): + return { + **kwargs, + "logger": logger, + "phenobundle": phenobundle + } + + _mandatory_keys = ("pheno", "phenocovar") + _cdata = rqtl2.read_control_file(phenobundle) + _errors = kwargs.get("errors", tuple()) + tuple( + f"Expected '{key}' file(s) are not declared in the bundle." + for key in _mandatory_keys if key not in _cdata.keys()) + return { + **kwargs, + "logger": logger, + "phenobundle": phenobundle, + "errors": _errors, + "skip": len(_errors) > 0 + } + + +def check_for_averages_files( + phenobundle: Path, + logger: Logger, + **kwargs +) -> dict: + """Check that averages files appear together""" + if kwargs.get("skip", False): + return { + **kwargs, + "logger": logger, + "phenobundle": phenobundle + } + + _together = (("phenose", "phenonum"), ("phenonum", "phenose")) + _cdata = rqtl2.read_control_file(phenobundle) + _errors = kwargs.get("errors", tuple()) + tuple( + f"'{first}' is defined in the control file but there is no " + f"corresponding '{second}'" + for first, second in _together + if ((first in _cdata.keys()) and (second not in _cdata.keys()))) + return { + **kwargs, + "logger": logger, + "phenobundle": phenobundle, + "errors": _errors, + "skip": len(_errors) > 0 + } + + +def extract_bundle( + bundle: Path, workdir: Path, jobid: uuid.UUID +) -> tuple[Path, tuple[Path, ...]]: + """Extract the bundle.""" + with ZipFile(bundle) as zfile: + extractiondir = workdir.joinpath( + f"{str(jobid)}-{sha256_digest_over_file(bundle)}-{bundle.name}") + return extractiondir, rqtl2.extract(zfile, extractiondir) + + +def undo_transpose(filetype: str, cdata: dict, extractiondir): + """Undo transposition of all files of type `filetype` in thebundle.""" + if len(cdata.get(filetype, [])) > 0 and cdata.get(f"{filetype}_transposed", False): + files = (extractiondir.joinpath(_file) for _file in cdata[filetype]) + for _file in files: + rqtl2.transpose_csv_with_rename( + _file, + build_line_splitter(cdata), + build_line_joiner(cdata)) + + +@contextlib.contextmanager +def redis_logger( + redisuri: str, loggername: str, filename: str, fqkey: str +) -> Iterator[logging.Logger]: + """Build a Redis message-list logger.""" + rconn = Redis.from_url(redisuri, decode_responses=True) + logger = logging.getLogger(loggername) + logger.propagate = False + handler = RedisMessageListHandler( + rconn, + fullyqualifiedkey(fqkey, filename))#type: ignore[arg-type] + handler.setFormatter(logging.getLogger().handlers[0].formatter) + logger.addHandler(handler) + try: + yield logger + finally: + rconn.close() + + +def push_error(rconn: Redis, fqkey: str, error: InvalidValue) -> InvalidValue: + """Persist the error in redis.""" + rconn.rpush(fqkey, json.dumps(error._asdict())) + return error + + +def file_fqkey(prefix: str, section: str, filepath: Path) -> str: + """Build a files fully-qualified key in a consistent manner""" + return f"{prefix}:{section}:{filepath.name}" + + +def qc_phenocovar_file( + filepath: Path, + redisuri, + fqkey: str, + separator: str, + comment_char: str): + """Check that `phenocovar` files are structured correctly.""" + with (redis_logger( + redisuri, + f"{__MODULE__}.qc_phenocovar_file", + filepath.name, + f"{fqkey}:logs") as logger, + Redis.from_url(redisuri, decode_responses=True) as rconn): + print("Running QC on file: ", filepath.name) + _csvfile = rqtl2.read_csv_file(filepath, separator, comment_char) + _headings = tuple(heading.lower() for heading in next(_csvfile)) + _errors: tuple[InvalidValue, ...] = tuple() + save_error = partial( + push_error, rconn, file_fqkey(fqkey, "errors", filepath)) + for heading in ("description", "units"): + if heading not in _headings: + _errors = (save_error(InvalidValue( + filepath.name, + "header row", + "-", + "-", + (f"File {filepath.name} is missing the {heading} heading " + "in the header line."))),) + + def collect_errors(errors_and_linecount, line): + _errs, _lc = errors_and_linecount + logger.info("Testing record '%s'", line[0]) + if len(line) != len(_headings): + _errs = _errs + (save_error(InvalidValue( + filepath.name, + line[0], + "-", + "-", + (f"Record {_lc} in file {filepath.name} has a different " + "number of columns than the number of headings"))),) + _line = dict(zip(_headings, line)) + if not bool(_line.get("description")): + _errs = _errs + ( + save_error(InvalidValue(filepath.name, + _line[_headings[0]], + "description", + _line.get("description"), + "The description is not provided!")),) + + rconn.hset(file_fqkey(fqkey, "metadata", filepath), + mapping={ + "status": "checking", + "linecount": _lc+1, + "total-errors": len(_errs) + }) + return _errs, _lc+1 + + _errors, _linecount = reduce(collect_errors, _csvfile, (_errors, 1)) + rconn.hset(file_fqkey(fqkey, "metadata", filepath), + mapping={ + "status": "completed", + "linecount": _linecount, + "total-errors": len(_errors) + }) + return {filepath.name: {"errors": _errors, "linecount": _linecount}} + + +def merge_dicts(*dicts): + """Merge multiple dicts into a single one.""" + return reduce(lambda merged, dct: {**merged, **dct}, dicts, {}) + + +def decimal_points_error(# pylint: disable=[too-many-arguments] + filename: str, + rowtitle: str, + coltitle: str, + cellvalue: str, + message: str, + decimal_places: int = 1 +) -> Optional[InvalidValue]: + """Returns an error if the value does not meet the checks.""" + if not bool(decimal_places_pattern(decimal_places).match(cellvalue)): + return InvalidValue(filename, rowtitle, coltitle, cellvalue, message) + return None + + +def integer_error( + filename: str, + rowtitle: str, + coltitle: str, + cellvalue: str, + message: str +) -> Optional[InvalidValue]: + """Returns an error if the value does not meet the checks.""" + try: + value = int(cellvalue) + if value <= 0: + raise ValueError("Must be a non-zero, positive number.") + return None + except ValueError as _verr: + return InvalidValue(filename, rowtitle, coltitle, cellvalue, message) + + +def qc_pheno_file(# pylint: disable=[too-many-locals, too-many-arguments] + filepath: Path, + redisuri: str, + fqkey: str, + samples: tuple[str, ...], + phenonames: tuple[str, ...], + separator: str, + comment_char: str, + na_strings: Sequence[str], + error_fn: Callable = decimal_points_error +): + """Run QC/QA on a `pheno` file.""" + with (redis_logger( + redisuri, + f"{__MODULE__}.qc_pheno_file", + filepath.name, + f"{fqkey}:logs") as logger, + Redis.from_url(redisuri, decode_responses=True) as rconn): + print("Running QC on file: ", filepath.name) + save_error = partial( + push_error, rconn, file_fqkey(fqkey, "errors", filepath)) + _csvfile = rqtl2.read_csv_file(filepath, separator, comment_char) + _headings: tuple[str, ...] = tuple( + # select lowercase for comparison purposes + heading.lower() for heading in next(_csvfile)) + _errors: tuple[InvalidValue, ...] = tuple() + + _absent = tuple(pheno for pheno in _headings[1:] if pheno + not in tuple( + # lower to have consistent case with headings for + # comparison + phe.lower() for phe in phenonames)) + if len(_absent) > 0: + _errors = _errors + (save_error(InvalidValue( + filepath.name, + "header row", + "-", + ", ".join(_absent), + ("The following phenotype names do not exist in any of the " + f"provided phenocovar files: ({', '.join(_absent)})"))),) + + def collect_errors(errors_and_linecount, line): + _errs, _lc = errors_and_linecount + logger.debug("Checking row %s", line[0]) + if line[0] not in samples: + _errs = _errs + (save_error(InvalidValue( + filepath.name, + line[0], + _headings[0], + line[0], + (f"The sample named '{line[0]}' does not exist in the database. " + "You will need to upload that first."))),) + + for field, value in zip(_headings[1:], line[1:]): + if value in na_strings: + continue + _err = error_fn( + filepath.name, + line[0], + field, + value) + _errs = _errs + ((save_error(_err),) if bool(_err) else tuple()) + + rconn.hset(file_fqkey(fqkey, "metadata", filepath), + mapping={ + "status": "checking", + "linecount": _lc+1, + "total-errors": len(_errs) + }) + return _errs, _lc+1 + + _errors, _linecount = reduce(collect_errors, _csvfile, (_errors, 1)) + rconn.hset(file_fqkey(fqkey, "metadata", filepath), + mapping={ + "status": "completed", + "linecount": _linecount, + "total-errors": len(_errors) + }) + return {filepath.name: {"errors": _errors, "linecount": _linecount}} + + +def phenotype_names(filepath: Path, + separator: str, + comment_char: str) -> tuple[str, ...]: + """Read phenotype names from `phenocovar` file.""" + return reduce(lambda tpl, line: tpl + (line[0],),#type: ignore[arg-type, return-value] + rqtl2.read_csv_file(filepath, separator, comment_char), + tuple())[1:] + +def fullyqualifiedkey( + prefix: str, + rest: Optional[str] = None +) -> Union[Callable[[str], str], str]: + """Compute fully qualified Redis key.""" + if not bool(rest): + return lambda _rest: f"{prefix}:{_rest}" + return f"{prefix}:{rest}" + +def run_qc(# pylint: disable=[too-many-locals] + rconn: Redis, + dbconn: mdb.Connection, + fullyqualifiedjobid: str, + args: Namespace +) -> int: + """Run quality control checks on the bundle.""" + print("Beginning the quality assurance checks.") + results = check_for_averages_files( + **check_for_mandatory_pheno_keys( + **validate(args.rqtl2bundle, logger))) + errors = results.get("errors", tuple()) + if len(errors) > 0: + logger.error("We found the following errors:\n%s", + "\n".join(f" - {error}" for error in errors)) + return 1 + # Run QC on actual values + # Steps: + # - Extract file to specific directory + extractiondir, *_bundlefiles = extract_bundle( + args.rqtl2bundle, args.workingdir, args.jobid) + + # - For every pheno, phenocovar, phenose, phenonum file, undo + # transposition where relevant + cdata = rqtl2.control_data(extractiondir) + with mproc.Pool(mproc.cpu_count() - 1) as pool: + pool.starmap( + undo_transpose, + ((ftype, cdata, extractiondir) + for ftype in ("pheno", "phenocovar", "phenose", "phenonum"))) + + # - Fetch samples/individuals from database. + print("Fetching samples/individuals from the database.") + samples = tuple(#type: ignore[var-annotated] + item for item in set(reduce( + lambda acc, item: acc + ( + item["Name"], item["Name2"], item["Symbol"], item["Alias"]), + samples_by_species_and_population( + dbconn, args.speciesid, args.populationid), + tuple())) + if bool(item)) + + # - Check that `description` and `units` is present in phenocovar for + # all phenotypes + rconn.hset(fullyqualifiedjobid, + "fully-qualified-keys:phenocovar", + json.dumps(tuple(f"{fullyqualifiedjobid}:phenocovar:{_file}" + for _file in cdata.get("phenocovar", [])))) + with mproc.Pool(mproc.cpu_count() - 1) as pool: + print("Check for errors in 'phenocovar' file(s).") + _phenocovar_qc_res = merge_dicts(*pool.starmap(qc_phenocovar_file, tuple( + (extractiondir.joinpath(_file), + args.redisuri, + f"{fullyqualifiedjobid}:phenocovar", + cdata["sep"], + cdata["comment.char"]) + for _file in cdata.get("phenocovar", [])))) + + # - Check all samples in pheno files exist in database + # - Check all phenotypes in pheno files exist in phenocovar files + # - Check all numeric values in pheno files + phenonames = tuple(set( + name for names in pool.starmap(phenotype_names, tuple( + (extractiondir.joinpath(_file), cdata["sep"], cdata["comment.char"]) + for _file in cdata.get("phenocovar", []))) + for name in names)) + + dec_err_fn = partial(decimal_points_error, message=( + "Expected a non-negative number with at least one decimal " + "place.")) + + print("Check for errors in 'pheno' file(s).") + _pheno_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple(( + extractiondir.joinpath(_file), + args.redisuri, + chain( + "pheno", + fullyqualifiedkey(args.jobid), + fullyqualifiedkey(args.redisprefix)), + samples, + phenonames, + cdata["sep"], + cdata["comment.char"], + cdata["na.strings"], + dec_err_fn + ) for _file in cdata.get("pheno", [])))) + + # - Check the 3 checks above for phenose and phenonum values too + # qc_phenose_files(…) + # qc_phenonum_files(…) + print("Check for errors in 'phenose' file(s).") + _phenose_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple(( + extractiondir.joinpath(_file), + args.redisuri, + chain( + "phenose", + fullyqualifiedkey(args.jobid), + fullyqualifiedkey(args.redisprefix)), + samples, + phenonames, + cdata["sep"], + cdata["comment.char"], + cdata["na.strings"], + dec_err_fn + ) for _file in cdata.get("phenose", [])))) + + print("Check for errors in 'phenonum' file(s).") + _phenonum_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple(( + extractiondir.joinpath(_file), + args.redisuri, + chain( + "phenonum", + fullyqualifiedkey(args.jobid), + fullyqualifiedkey(args.redisprefix)), + samples, + phenonames, + cdata["sep"], + cdata["comment.char"], + cdata["na.strings"], + partial(integer_error, message=( + "Expected a non-negative, non-zero integer value.")) + ) for _file in cdata.get("phenonum", [])))) + + # - Delete all extracted files + shutil.rmtree(extractiondir) + return 0 + + +if __name__ == "__main__": + def cli_args(): + """Process command-line arguments for `install_phenos`""" + parser = add_bundle_argument(add_global_data_arguments(init_cli_parser( + program="PhenotypesQC", + description=( + "Perform Quality Control checks on a phenotypes bundle file")))) + parser.add_argument( + "--workingdir", + default=f"{tempfile.gettempdir()}/phenotypes_qc", + help=("The directory where this script will put its intermediate " + "files."), + type=Path) + return parser.parse_args() + + main = build_main(cli_args(), run_qc, logger) + sys.exit(main()) diff --git a/scripts/validate_file.py b/scripts/validate_file.py index 0028795..52e55ec 100644 --- a/scripts/validate_file.py +++ b/scripts/validate_file.py @@ -8,12 +8,12 @@ from zipfile import ZipFile, is_zipfile import jsonpickle from redis import Redis from redis.exceptions import ConnectionError # pylint: disable=[redefined-builtin] +from gn_libs.mysqldb import database_connection from quality_control.utils import make_progress_calculator from quality_control.parsing import FileType, strain_names, collect_errors -from qc_app import jobs -from qc_app.db_utils import database_connection +from uploader import jobs from .cli_parser import init_cli_parser from .qc import add_file_validation_arguments diff --git a/scripts/worker.py b/scripts/worker.py index 0eb9ea5..91b0332 100644 --- a/scripts/worker.py +++ b/scripts/worker.py @@ -11,8 +11,8 @@ from tempfile import TemporaryDirectory from redis import Redis -from qc_app import jobs -from qc_app.check_connections import check_redis +from uploader import jobs +from uploader.check_connections import check_redis def parse_args(): "Parse the command-line arguments" |