diff options
Diffstat (limited to 'scripts')
-rw-r--r-- | scripts/cli_parser.py | 3 | ||||
-rw-r--r-- | scripts/insert_samples.py | 26 | ||||
-rw-r--r-- | scripts/load_phenotypes_to_db.py | 518 | ||||
-rw-r--r-- | scripts/phenotypes_bulk_edit.py | 266 | ||||
-rw-r--r-- | scripts/rqtl2/entry.py | 30 | ||||
-rw-r--r-- | scripts/rqtl2/phenotypes_qc.py | 29 |
6 files changed, 832 insertions, 40 deletions
diff --git a/scripts/cli_parser.py b/scripts/cli_parser.py index d42ae66..0c91c5e 100644 --- a/scripts/cli_parser.py +++ b/scripts/cli_parser.py @@ -23,7 +23,8 @@ def init_cli_parser(program: str, description: Optional[str] = None) -> Argument "--loglevel", type=str, default="INFO", - choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", + "debug", "info", "warning", "error", "critical"], help="The severity of events to track with the logger.") return parser diff --git a/scripts/insert_samples.py b/scripts/insert_samples.py index 1b0a052..742c4ae 100644 --- a/scripts/insert_samples.py +++ b/scripts/insert_samples.py @@ -3,6 +3,7 @@ import sys import logging import pathlib import argparse +import traceback import MySQLdb as mdb from redis import Redis @@ -73,6 +74,7 @@ def insert_samples(conn: mdb.Connection,# pylint: disable=[too-many-arguments] print("Samples upload successfully completed.") return 0 + if __name__ == "__main__": def cli_args(): @@ -127,7 +129,7 @@ if __name__ == "__main__": def main(): """Run script to insert samples into the database.""" - + status_code = 1 # Exit with an Exception args = cli_args() check_db(args.databaseuri) check_redis(args.redisuri) @@ -137,13 +139,19 @@ if __name__ == "__main__": with (Redis.from_url(args.redisuri, decode_responses=True) as rconn, database_connection(args.databaseuri) as dbconn): - return insert_samples(dbconn, - rconn, - args.speciesid, - args.populationid, - args.samplesfile, - args.separator, - args.firstlineheading, - args.quotechar) + + try: + status_code = insert_samples(dbconn, + rconn, + args.speciesid, + args.populationid, + args.samplesfile, + args.separator, + args.firstlineheading, + args.quotechar) + except Exception as _exc: + print(traceback.format_exc(), file=sys.stderr) + + return status_code sys.exit(main()) diff --git a/scripts/load_phenotypes_to_db.py b/scripts/load_phenotypes_to_db.py new file mode 100644 index 0000000..5ce37f3 --- /dev/null +++ b/scripts/load_phenotypes_to_db.py @@ -0,0 +1,518 @@ +import sys +import uuid +import json +import logging +import argparse +import datetime +from pathlib import Path +from zipfile import ZipFile +from typing import Any, Union +from urllib.parse import urljoin +from functools import reduce, partial + +from MySQLdb.cursors import Cursor, DictCursor + +from gn_libs import jobs, mysqldb, sqlite3, monadic_requests as mrequests + +from r_qtl import r_qtl2 as rqtl2 +from uploader.species.models import species_by_id +from uploader.population.models import population_by_species_and_id +from uploader.samples.models import samples_by_species_and_population +from uploader.phenotypes.models import ( + dataset_by_id, + save_phenotypes_data, + create_new_phenotypes, + quick_save_phenotypes_data) +from uploader.publications.models import ( + create_new_publications, + fetch_publication_by_id) + +from scripts.rqtl2.bundleutils import build_line_joiner, build_line_splitter + +logging.basicConfig( + format="%(asctime)s — %(filename)s:%(lineno)s — %(levelname)s: %(message)s") +logger = logging.getLogger(__name__) + + + +def __replace_na_strings__(line, na_strings): + return ((None if value in na_strings else value) for value in line) + + +def save_phenotypes( + cursor: mysqldb.Connection, + control_data: dict[str, Any], + filesdir: Path +) -> tuple[dict, ...]: + """Read `phenofiles` and save the phenotypes therein.""" + ## TODO: Replace with something like this: ## + # phenofiles = control_data["phenocovar"] + control_data.get( + # "gn-metadata", {}).get("pheno", []) + # + # This is meant to load (and merge) data from the "phenocovar" and + # "gn-metadata -> pheno" files into a single collection of phenotypes. + phenofiles = tuple(filesdir.joinpath(_file) for _file in control_data["phenocovar"]) + if len(phenofiles) <= 0: + return tuple() + + if control_data["phenocovar_transposed"]: + logger.info("Undoing transposition of the files rows and columns.") + phenofiles = ( + rqtl2.transpose_csv_with_rename( + _file, + build_line_splitter(control_data), + build_line_joiner(control_data)) + for _file in phenofiles) + + _headers = rqtl2.read_csv_file_headers(phenofiles[0], + control_data["phenocovar_transposed"], + control_data["sep"], + control_data["comment.char"]) + return create_new_phenotypes( + cursor, + (dict(zip(_headers, + __replace_na_strings__(line, control_data["na.strings"]))) + for filecontent + in (rqtl2.read_csv_file(path, + separator=control_data["sep"], + comment_char=control_data["comment.char"]) + for path in phenofiles) + for idx, line in enumerate(filecontent) + if idx != 0)) + + +def __fetch_next_dataid__(conn: mysqldb.Connection) -> int: + """Fetch the next available DataId value from the database.""" + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute( + "SELECT MAX(DataId) AS CurrentMaxDataId FROM PublishXRef") + return int(cursor.fetchone()["CurrentMaxDataId"]) + 1 + + +def __row_to_dataitems__( + sample_row: dict, + dataidmap: dict, + pheno_name2id: dict[str, int], + samples: dict +) -> tuple[dict, ...]: + samplename = sample_row["id"] + + return ({ + "phenotype_id": dataidmap[pheno_name2id[phenoname]]["phenotype_id"], + "data_id": dataidmap[pheno_name2id[phenoname]]["data_id"], + "sample_name": samplename, + "sample_id": samples[samplename]["Id"], + "value": phenovalue + } for phenoname, phenovalue in sample_row.items() if phenoname != "id") + + +def __build_dataitems__( + filetype, + phenofiles, + control_data, + samples, + dataidmap, + pheno_name2id +): + _headers = rqtl2.read_csv_file_headers( + phenofiles[0], + False, # Any transposed files have been un-transposed by this point + control_data["sep"], + control_data["comment.char"]) + _filescontents = ( + rqtl2.read_csv_file(path, + separator=control_data["sep"], + comment_char=control_data["comment.char"]) + for path in phenofiles) + _linescontents = ( + __row_to_dataitems__( + dict(zip(("id",) + _headers[1:], + __replace_na_strings__(line, control_data["na.strings"]))), + dataidmap, + pheno_name2id, + samples) + for linenum, line in (enumline for filecontent in _filescontents + for enumline in enumerate(filecontent)) + if linenum > 0) + return (item for items in _linescontents + for item in items + if item["value"] is not None) + + +def save_numeric_data( + conn: mysqldb.Connection, + dataidmap: dict, + pheno_name2id: dict[str, int], + samples: tuple[dict, ...], + control_data: dict, + filesdir: Path, + filetype: str, + table: str +): + """Read data from files and save to the database.""" + phenofiles = tuple( + filesdir.joinpath(_file) for _file in control_data[filetype]) + if len(phenofiles) <= 0: + return tuple() + + if control_data[f"{filetype}_transposed"]: + logger.info("Undoing transposition of the files rows and columns.") + phenofiles = tuple( + rqtl2.transpose_csv_with_rename( + _file, + build_line_splitter(control_data), + build_line_joiner(control_data)) + for _file in phenofiles) + + try: + logger.debug("Attempt quick save with `LOAD … INFILE`.") + return quick_save_phenotypes_data( + conn, + table, + __build_dataitems__( + filetype, + phenofiles, + control_data, + samples, + dataidmap, + pheno_name2id), + filesdir) + except Exception as _exc: + logger.debug("Could not use `LOAD … INFILE`, using raw query", + exc_info=True) + import time;time.sleep(60) + return save_phenotypes_data( + conn, + table, + __build_dataitems__( + filetype, + phenofiles, + control_data, + samples, + dataidmap, + pheno_name2id)) + + +save_pheno_data = partial(save_numeric_data, + filetype="pheno", + table="PublishData") + + +save_phenotypes_se = partial(save_numeric_data, + filetype="phenose", + table="PublishSE") + + +save_phenotypes_n = partial(save_numeric_data, + filetype="phenonum", + table="NStrain") + + +def cross_reference_phenotypes_publications_and_data( + conn: mysqldb.Connection, xref_data: tuple[dict, ...] +): + """Crossreference the phenotypes, publication and data.""" + with conn.cursor(cursorclass=DictCursor) as cursor: + cursor.execute("SELECT MAX(Id) CurrentMaxId FROM PublishXRef") + _nextid = int(cursor.fetchone()["CurrentMaxId"]) + 1 + _params = tuple({**row, "xref_id": _id} + for _id, row in enumerate(xref_data, start=_nextid)) + cursor.executemany( + ("INSERT INTO PublishXRef(" + "Id, InbredSetId, PhenotypeId, PublicationId, DataId, comments" + ") " + "VALUES (" + "%(xref_id)s, %(population_id)s, %(phenotype_id)s, " + "%(publication_id)s, %(data_id)s, 'Upload of new data.'" + ")"), + _params) + return _params + return tuple() + + +def update_auth(authserver, token, species, population, dataset, xrefdata): + """Grant the user access to their data.""" + # TODO Call into the auth server to: + # 1. Link the phenotypes with a user group + # - fetch group: http://localhost:8081/auth/user/group + # - link data to group: http://localhost:8081/auth/data/link/phenotype + # - *might need code update in gn-auth: remove restriction, perhaps* + # 2. Create resource (perhaps?) + # - Get resource categories: http://localhost:8081/auth/resource/categories + # - Create a new resource: http://localhost:80host:8081/auth/resource/create + # - single resource for all phenotypes + # - resource name from user, species, population, dataset, datetime? + # - User will have "ownership" of resource by default + # 3. Link data to the resource: http://localhost:8081/auth/resource/data/link + # - Update code to allow linking multiple items in a single request + _tries = 0 # TODO use this to limit how many tries before quiting and bailing + _delay = 1 + headers = { + "Authorization": f"Bearer {token}", + "Content-Type": "application/json" + } + def authserveruri(endpoint): + return urljoin(authserver, endpoint) + + def __fetch_user_details__(): + logger.debug("… Fetching user details") + return mrequests.get( + authserveruri("/auth/user/"), + headers=headers + ) + + def __link_data__(user): + logger.debug("… linking uploaded data to user's group") + return mrequests.post( + authserveruri("/auth/data/link/phenotype"), + headers=headers, + json={ + "species_name": species["Name"], + "group_id": user["group"]["group_id"], + "selected": [ + { + "SpeciesId": species["SpeciesId"], + "InbredSetId": population["Id"], + "PublishFreezeId": dataset["Id"], + "dataset_name": dataset["Name"], + "dataset_fullname": dataset["FullName"], + "dataset_shortname": dataset["ShortName"], + "PublishXRefId": item["xref_id"] + } + for item in xrefdata + ], + "using-raw-ids": "on" + }).then(lambda ld_results: (user, ld_results)) + + def __fetch_phenotype_category_details__(user, linkeddata): + logger.debug("… fetching phenotype category details") + return mrequests.get( + authserveruri("/auth/resource/categories"), + headers=headers + ).then( + lambda categories: ( + user, + linkeddata, + next(category for category in categories + if category["resource_category_key"] == "phenotype")) + ) + + def __create_resource__(user, linkeddata, category): + logger.debug("… creating authorisation resource object") + now = datetime.datetime.now().isoformat() + return mrequests.post( + authserveruri("/auth/resource/create"), + headers=headers, + json={ + "resource_category": category["resource_category_id"], + "resource_name": (f"{user['email']}—{dataset['Name']}—{now}—" + f"{len(xrefdata)} phenotypes"), + "public": "off" + }).then(lambda cr_results: (user, linkeddata, cr_results)) + + def __attach_data_to_resource__(user, linkeddata, resource): + logger.debug("… attaching data to authorisation resource object") + return mrequests.post( + authserveruri("/auth/resource/data/link"), + headers=headers, + json={ + "dataset_type": "phenotype", + "resource_id": resource["resource_id"], + "data_link_ids": [ + item["data_link_id"] for item in linkeddata["traits"]] + }).then(lambda attc: (user, linkeddata, resource, attc)) + + def __handle_error__(resp): + logger.error("ERROR: Updating the authorisation for the data failed.") + logger.debug( + "ERROR: The response from the authorisation server was:\n\t%s", + resp.json()) + return 1 + + def __handle_success__(val): + logger.info( + "The authorisation for the data has been updated successfully.") + return 0 + + return __fetch_user_details__().then(__link_data__).then( + lambda result: __fetch_phenotype_category_details__(*result) + ).then( + lambda result: __create_resource__(*result) + ).then( + lambda result: __attach_data_to_resource__(*result) + ).either(__handle_error__, __handle_success__) + + +def load_data(conn: mysqldb.Connection, job: dict) -> int: + """Load the data attached in the given job.""" + _job_metadata = job["metadata"] + # Steps + # 0. Read data from the files: can be multiple files per type + # + _species = species_by_id(conn, int(_job_metadata["species_id"])) + _population = population_by_species_and_id( + conn, + _species["SpeciesId"], + int(_job_metadata["population_id"])) + _dataset = dataset_by_id( + conn, + _species["SpeciesId"], + _population["Id"], + int(_job_metadata["dataset_id"])) + # 1. Just retrive the publication: Don't create publications for now. + _publication = fetch_publication_by_id( + conn, int(_job_metadata.get("publication_id", "0"))) or {"Id": 0} + # 2. Save all new phenotypes: + # -> return phenotype IDs + bundle = Path(_job_metadata["bundle_file"]) + _control_data = rqtl2.control_data(bundle) + logger.info("Extracting the zipped bundle of files.") + _outdir = Path(bundle.parent, f"bundle_{bundle.stem}") + with ZipFile(str(bundle), "r") as zfile: + _files = rqtl2.extract(zfile, _outdir) + logger.info("Saving new phenotypes.") + _phenos = save_phenotypes(conn, _control_data, _outdir) + def __build_phenos_maps__(accumulator, current): + dataid, row = current + return ({ + **accumulator[0], + row["phenotype_id"]: { + "population_id": _population["Id"], + "phenotype_id": row["phenotype_id"], + "data_id": dataid, + "publication_id": _publication["Id"], + } + }, { + **accumulator[1], + row["id"]: row["phenotype_id"] + }) + dataidmap, pheno_name2id = reduce( + __build_phenos_maps__, + enumerate(_phenos, start=__fetch_next_dataid__(conn)), + ({},{})) + # 3. a. Fetch the strain names and IDS: create name->ID map + samples = { + row["Name"]: row + for row in samples_by_species_and_population( + conn, _species["SpeciesId"], _population["Id"])} + # b. Save all the data items (DataIds are vibes), return new IDs + logger.info("Saving new phenotypes data.") + _num_data_rows = save_pheno_data(conn=conn, + dataidmap=dataidmap, + pheno_name2id=pheno_name2id, + samples=samples, + control_data=_control_data, + filesdir=_outdir) + logger.info("Saved %s new phenotype data rows.", _num_data_rows) + # 4. Cross-reference Phenotype, Publication, and PublishData in PublishXRef + logger.info("Cross-referencing new phenotypes to their data and publications.") + _xrefs = cross_reference_phenotypes_publications_and_data( + conn, tuple(dataidmap.values())) + # 5. If standard errors and N exist, save them too + # (use IDs returned in `3. b.` above). + logger.info("Saving new phenotypes standard errors.") + _num_se_rows = save_phenotypes_se(conn=conn, + dataidmap=dataidmap, + pheno_name2id=pheno_name2id, + samples=samples, + control_data=_control_data, + filesdir=_outdir) + logger.info("Saved %s new phenotype standard error rows.", _num_se_rows) + + logger.info("Saving new phenotypes sample counts.") + _num_n_rows = save_phenotypes_n(conn=conn, + dataidmap=dataidmap, + pheno_name2id=pheno_name2id, + samples=samples, + control_data=_control_data, + filesdir=_outdir) + logger.info("Saved %s new phenotype sample counts rows.", _num_n_rows) + return (_species, _population, _dataset, _xrefs) + + +if __name__ == "__main__": + def parse_args(): + """Setup command-line arguments.""" + parser = argparse.ArgumentParser( + prog="load_phenotypes_to_db", + description="Process the phenotypes' data and load it into the database.") + parser.add_argument("db_uri", type=str, help="MariaDB/MySQL connection URL") + parser.add_argument( + "jobs_db_path", type=Path, help="Path to jobs' SQLite database.") + parser.add_argument("job_id", type=uuid.UUID, help="ID of the running job") + parser.add_argument( + "--log-level", + type=str, + help="Determines what is logged out.", + choices=("debug", "info", "warning", "error", "critical"), + default="info") + return parser.parse_args() + + def setup_logging(log_level: str): + """Setup logging for the script.""" + logger.setLevel(log_level) + logging.getLogger("uploader.phenotypes.models").setLevel(log_level) + + + def main(): + """Entry-point for this script.""" + args = parse_args() + setup_logging(args.log_level.upper()) + + with (mysqldb.database_connection(args.db_uri) as conn, + conn.cursor(cursorclass=DictCursor) as cursor, + sqlite3.connection(args.jobs_db_path) as jobs_conn): + job = jobs.job(jobs_conn, args.job_id) + + # Lock the PublishXRef/PublishData/PublishSE/NStrain here: Why? + # The `DataId` values are sequential, but not auto-increment + # Can't convert `PublishXRef`.`DataId` to AUTO_INCREMENT. + # `SELECT MAX(DataId) FROM PublishXRef;` + # How do you check for a table lock? + # https://oracle-base.com/articles/mysql/mysql-identify-locked-tables + # `SHOW OPEN TABLES LIKE 'Publish%';` + _db_tables_ = ( + "Species", + "InbredSet", + "Strain", + "StrainXRef", + "Publication", + "Phenotype", + "PublishXRef", + "PublishFreeze", + "PublishData", + "PublishSE", + "NStrain") + + logger.debug( + ("Locking database tables for the connection:" + + "".join("\n\t- %s" for _ in _db_tables_) + "\n"), + *_db_tables_) + cursor.execute(# Lock the tables to avoid race conditions + "LOCK TABLES " + ", ".join( + f"{_table} WRITE" for _table in _db_tables_)) + + db_results = load_data(conn, job) + jobs.update_metadata( + jobs_conn, + args.job_id, + "xref_ids", + json.dumps([xref["xref_id"] for xref in db_results[3]])) + + logger.info("Unlocking all database tables.") + cursor.execute("UNLOCK TABLES") + + # Update authorisations (break this down) — maybe loop until it works? + logger.info("Updating authorisation.") + _job_metadata = job["metadata"] + return update_auth(_job_metadata["authserver"], + _job_metadata["token"], + *db_results) + + + try: + sys.exit(main()) + except Exception as _exc: + logger.debug("Data loading failed… Halting!", + exc_info=True) + sys.exit(1) diff --git a/scripts/phenotypes_bulk_edit.py b/scripts/phenotypes_bulk_edit.py new file mode 100644 index 0000000..cee5f4e --- /dev/null +++ b/scripts/phenotypes_bulk_edit.py @@ -0,0 +1,266 @@ +import sys +import uuid +import logging +import argparse +from pathlib import Path +from typing import Iterator +from functools import reduce + +from MySQLdb.cursors import DictCursor + +from gn_libs import jobs, mysqldb, sqlite3 + +from uploader.phenotypes.models import phenotypes_data_by_ids +from uploader.phenotypes.misc import phenotypes_data_differences +from uploader.phenotypes.views import BULK_EDIT_COMMON_FIELDNAMES + +import uploader.publications.pubmed as pmed +from uploader.publications.misc import publications_differences +from uploader.publications.models import ( + update_publications, fetch_phenotype_publications) + +logging.basicConfig( + format="%(asctime)s — %(filename)s:%(lineno)s — %(levelname)s: %(message)s") +logger = logging.getLogger(__name__) + + +def check_ids(conn, ids: tuple[tuple[int, int], ...]) -> bool: + """Verify that all the `UniqueIdentifier` values are valid.""" + logger.info("Checking the 'UniqueIdentifier' values.") + with conn.cursor(cursorclass=DictCursor) as cursor: + paramstr = ",".join(["(%s, %s)"] * len(ids)) + cursor.execute( + "SELECT PhenotypeId AS phenotype_id, Id AS xref_id " + "FROM PublishXRef " + f"WHERE (PhenotypeId, Id) IN ({paramstr})", + tuple(item for row in ids for item in row)) + mysqldb.debug_query(cursor, logger) + found = tuple((row["phenotype_id"], row["xref_id"]) + for row in cursor.fetchall()) + + not_found = tuple(item for item in ids if item not in found) + if len(not_found) == 0: + logger.info("All 'UniqueIdentifier' are valid.") + return True + + for item in not_found: + logger.error(f"Invalid 'UniqueIdentifier' value: phId:%s::xrId:%s", item[0], item[1]) + + return False + + +def check_for_mandatory_fields(): + """Verify that mandatory fields have values.""" + pass + + +def __fetch_phenotypes__(conn, ids: tuple[int, ...]) -> tuple[dict, ...]: + """Fetch basic (non-numeric) phenotypes data from the database.""" + with conn.cursor(cursorclass=DictCursor) as cursor: + paramstr = ",".join(["%s"] * len(ids)) + cursor.execute(f"SELECT * FROM Phenotype WHERE Id IN ({paramstr}) " + "ORDER BY Id ASC", + ids) + return tuple(dict(row) for row in cursor.fetchall()) + + +def descriptions_differences(file_data, db_data) -> dict[str, str]: + """Compute differences in the descriptions.""" + logger.info("Computing differences in phenotype descriptions.") + assert len(file_data) == len(db_data), "The counts of phenotypes differ!" + description_columns = ("Pre_publication_description", + "Post_publication_description", + "Original_description", + "Pre_publication_abbreviation", + "Post_publication_abbreviation") + diff = tuple() + for file_row, db_row in zip(file_data, db_data): + assert file_row["phenotype_id"] == db_row["Id"] + inner_diff = { + key: file_row[key] + for key in description_columns + if not file_row[key] == db_row[key] + } + if bool(inner_diff): + diff = diff + ({ + "phenotype_id": file_row["phenotype_id"], + **inner_diff + },) + + return diff + + +def update_descriptions(): + """Update descriptions in the database""" + logger.info("Updating descriptions") + # Compute differences between db data and uploaded file + # Only run query for changed descriptions + pass + + +def link_publications(): + """Link phenotypes to relevant publications.""" + logger.info("Linking phenotypes to publications.") + # Create publication if PubMed_ID doesn't exist in db + pass + + +def update_values(): + """Update the phenotype values.""" + logger.info("Updating phenotypes values.") + # Compute differences between db data and uploaded file + # Only run query for changed data + pass + + +def parse_args(): + parser = argparse.ArgumentParser( + prog="Phenotypes Bulk-Edit Processor", + description="Process the bulk-edits to phenotype data and descriptions.") + parser.add_argument("db_uri", type=str, help="MariaDB/MySQL connection URL") + parser.add_argument( + "jobs_db_path", type=Path, help="Path to jobs' SQLite database.") + parser.add_argument("job_id", type=uuid.UUID, help="ID of the running job") + parser.add_argument( + "--log-level", + type=str, + help="Determines what is logged out.", + choices=("debug", "info", "warning", "error", "critical"), + default="info") + return parser.parse_args() + + +def read_file(filepath: Path) -> Iterator[str]: + """Read the file, one line at a time.""" + with filepath.open(mode="r", encoding="utf-8") as infile: + count = 0 + headers = None + for line in infile: + if line.startswith("#"): # ignore comments + continue; + + fields = line.strip().split("\t") + if count == 0: + headers = fields + count = count + 1 + continue + + _dict = dict(zip( + headers, + ((None if item.strip() == "" else item.strip()) + for item in fields))) + _pheno, _xref = _dict.pop("UniqueIdentifier").split("::") + _dict = { + key: ((float(val) if bool(val) else val) + if key not in BULK_EDIT_COMMON_FIELDNAMES + else val) + for key, val in _dict.items() + } + _dict["phenotype_id"] = int(_pheno.split(":")[1]) + _dict["xref_id"] = int(_xref.split(":")[1]) + if _dict["PubMed_ID"] is not None: + _dict["PubMed_ID"] = int(_dict["PubMed_ID"]) + + yield _dict + count = count + 1 + + +def run(conn, job): + """Process the data and update it.""" + file_contents = tuple(sorted(read_file(Path(job["metadata"]["edit-file"])), + key=lambda item: item["phenotype_id"])) + pheno_ids, pheno_xref_ids, pubmed_ids = reduce( + lambda coll, curr: ( + coll[0] + (curr["phenotype_id"],), + coll[1] + ((curr["phenotype_id"], curr["xref_id"]),), + coll[2].union(set([curr["PubMed_ID"]]))), + file_contents, + (tuple(), tuple(), set([None]))) + check_ids(conn, pheno_xref_ids) + check_for_mandatory_fields() + # stop running here if any errors are found. + + ### Compute differences + logger.info("Computing differences.") + # 1. Basic Phenotype data differences + # a. Descriptions differences + _desc_diff = descriptions_differences( + file_contents, __fetch_phenotypes__(conn, pheno_ids)) + logger.debug("DESCRIPTIONS DIFFERENCES: %s", _desc_diff) + + # b. Publications differences + _db_publications = fetch_phenotype_publications(conn, pheno_xref_ids) + logger.debug("DB PUBLICATIONS: %s", _db_publications) + + _pubmed_map = { + (int(row["PubMed_ID"]) if bool(row["PubMed_ID"]) else None): f"{row['phenotype_id']}::{row['xref_id']}" + for row in file_contents + } + _pub_id_map = { + f"{pub['PhenotypeId']}::{pub['xref_id']}": pub["PublicationId"] + for pub in _db_publications + } + + _new_publications = update_publications( + conn, tuple({ + **pub, "publication_id": _pub_id_map[_pubmed_map[pub["pubmed_id"]]] + } for pub in pmed.fetch_publications(tuple( + pubmed_id for pubmed_id in pubmed_ids + if pubmed_id not in + tuple(row["PubMed_ID"] for row in _db_publications))))) + _pub_diff = publications_differences( + file_contents, _db_publications, { + row["PubMed_ID" if "PubMed_ID" in row else "pubmed_id"]: row[ + "PublicationId" if "PublicationId" in row else "publication_id"] + for row in _db_publications + _new_publications}) + logger.debug("Publications diff: %s", _pub_diff) + # 2. Data differences + _db_pheno_data = phenotypes_data_by_ids(conn, tuple({ + "population_id": job["metadata"]["population-id"], + "phenoid": row[0], + "xref_id": row[1] + } for row in pheno_xref_ids)) + + data_diff = phenotypes_data_differences( + ({ + "phenotype_id": row["phenotype_id"], + "xref_id": row["xref_id"], + "data": { + key:val for key,val in row.items() + if key not in BULK_EDIT_COMMON_FIELDNAMES + [ + "phenotype_id", "xref_id"] + } + } for row in file_contents), + ({ + **row, + "PhenotypeId": row["Id"], + "data": { + dataitem["StrainName"]: dataitem + for dataitem in row["data"].values() + } + } for row in _db_pheno_data)) + logger.debug("Data differences: %s", data_diff) + ### END: Compute differences + update_descriptions() + link_publications() + update_values() + return 0 + + +def main(): + """Entry-point for this script.""" + args = parse_args() + logger.setLevel(args.log_level.upper()) + logger.debug("Arguments: %s", args) + + logging.getLogger("uploader.phenotypes.misc").setLevel(args.log_level.upper()) + logging.getLogger("uploader.phenotypes.models").setLevel(args.log_level.upper()) + logging.getLogger("uploader.publications.models").setLevel(args.log_level.upper()) + + with (mysqldb.database_connection(args.db_uri) as conn, + sqlite3.connection(args.jobs_db_path) as jobs_conn): + return run(conn, jobs.job(jobs_conn, args.job_id)) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/rqtl2/entry.py b/scripts/rqtl2/entry.py index 327ed2c..e0e00e7 100644 --- a/scripts/rqtl2/entry.py +++ b/scripts/rqtl2/entry.py @@ -20,27 +20,23 @@ def build_main( [Redis, Connection, str, Namespace, logging.Logger], int ], - loggername: str + logger: logging.Logger ) -> Callable[[],int]: """Build a function to be used as an entry-point for scripts.""" def main(): - try: - logging.basicConfig( - format=( - "%(asctime)s - %(levelname)s %(name)s: " - "(%(pathname)s: %(lineno)d) %(message)s"), - level=args.loglevel) - logger = logging.getLogger(loggername) - with (Redis.from_url(args.redisuri, decode_responses=True) as rconn, - database_connection(args.databaseuri) as dbconn): - fqjobid = jobs.job_key(args.redisprefix, args.jobid) + with (Redis.from_url(args.redisuri, decode_responses=True) as rconn, + database_connection(args.databaseuri) as dbconn): + logger.setLevel(args.loglevel.upper()) + fqjobid = jobs.job_key(args.redisprefix, args.jobid) + + try: rconn.hset(fqjobid, "status", "started") logger.addHandler(setup_redis_logger( rconn, fqjobid, f"{fqjobid}:log-messages", args.redisexpiry)) - logger.addHandler(StreamHandler(stream=sys.stdout)) + logger.addHandler(StreamHandler(stream=sys.stderr)) check_db(args.databaseuri) check_redis(args.redisuri) @@ -48,15 +44,15 @@ def build_main( logger.error("File not found: '%s'.", args.rqtl2bundle) return 2 - returncode = run_fn(rconn, dbconn, fqjobid, args, logger) + returncode = run_fn(rconn, dbconn, fqjobid, args) if returncode == 0: rconn.hset(fqjobid, "status", "completed:success") return returncode rconn.hset(fqjobid, "status", "completed:error") return returncode - except Exception as _exc:# pylint: disable=[broad-except] - logger.error("The process failed!", exc_info=True) - rconn.hset(fqjobid, "status", "completed:error") - return 4 + except Exception as _exc:# pylint: disable=[broad-except] + logger.error("The process failed!", exc_info=True) + rconn.hset(fqjobid, "status", "completed:error") + return 4 return main diff --git a/scripts/rqtl2/phenotypes_qc.py b/scripts/rqtl2/phenotypes_qc.py index 76ecb8d..5c89ca0 100644 --- a/scripts/rqtl2/phenotypes_qc.py +++ b/scripts/rqtl2/phenotypes_qc.py @@ -36,6 +36,10 @@ from scripts.cli_parser import init_cli_parser, add_global_data_arguments from scripts.rqtl2.bundleutils import build_line_joiner, build_line_splitter __MODULE__ = "scripts.rqtl2.phenotypes_qc" +logging.basicConfig( + format=("%(asctime)s - %(levelname)s %(name)s: " + "(%(pathname)s: %(lineno)d) %(message)s")) +logger = logging.getLogger(__MODULE__) def validate(phenobundle: Path, logger: Logger) -> dict: """Check that the bundle is generally valid""" @@ -177,7 +181,7 @@ def qc_phenocovar_file( filepath.name, f"{fqkey}:logs") as logger, Redis.from_url(redisuri, decode_responses=True) as rconn): - logger.info("Running QC on file: %s", filepath.name) + print("Running QC on file: ", filepath.name) _csvfile = rqtl2.read_csv_file(filepath, separator, comment_char) _headings = tuple(heading.lower() for heading in next(_csvfile)) _errors: tuple[InvalidValue, ...] = tuple() @@ -205,12 +209,12 @@ def qc_phenocovar_file( (f"Record {_lc} in file {filepath.name} has a different " "number of columns than the number of headings"))),) _line = dict(zip(_headings, line)) - if not bool(_line["description"]): + if not bool(_line.get("description")): _errs = _errs + ( save_error(InvalidValue(filepath.name, _line[_headings[0]], "description", - _line["description"], + _line.get("description"), "The description is not provided!")),) rconn.hset(file_fqkey(fqkey, "metadata", filepath), @@ -285,7 +289,7 @@ def qc_pheno_file(# pylint: disable=[too-many-locals, too-many-arguments] filepath.name, f"{fqkey}:logs") as logger, Redis.from_url(redisuri, decode_responses=True) as rconn): - logger.info("Running QC on file: %s", filepath.name) + print("Running QC on file: ", filepath.name) save_error = partial( push_error, rconn, file_fqkey(fqkey, "errors", filepath)) _csvfile = rqtl2.read_csv_file(filepath, separator, comment_char) @@ -369,11 +373,10 @@ def run_qc(# pylint: disable=[too-many-locals] rconn: Redis, dbconn: mdb.Connection, fullyqualifiedjobid: str, - args: Namespace, - logger: Logger + args: Namespace ) -> int: """Run quality control checks on the bundle.""" - logger.debug("Beginning the quality assurance checks.") + print("Beginning the quality assurance checks.") results = check_for_averages_files( **check_for_mandatory_pheno_keys( **validate(args.rqtl2bundle, logger))) @@ -398,7 +401,7 @@ def run_qc(# pylint: disable=[too-many-locals] for ftype in ("pheno", "phenocovar", "phenose", "phenonum"))) # - Fetch samples/individuals from database. - logger.debug("Fetching samples/individuals from the database.") + print("Fetching samples/individuals from the database.") samples = tuple(#type: ignore[var-annotated] item for item in set(reduce( lambda acc, item: acc + ( @@ -415,7 +418,7 @@ def run_qc(# pylint: disable=[too-many-locals] json.dumps(tuple(f"{fullyqualifiedjobid}:phenocovar:{_file}" for _file in cdata.get("phenocovar", [])))) with mproc.Pool(mproc.cpu_count() - 1) as pool: - logger.debug("Check for errors in 'phenocovar' file(s).") + print("Check for errors in 'phenocovar' file(s).") _phenocovar_qc_res = merge_dicts(*pool.starmap(qc_phenocovar_file, tuple( (extractiondir.joinpath(_file), args.redisuri, @@ -437,7 +440,7 @@ def run_qc(# pylint: disable=[too-many-locals] "Expected a non-negative number with at least one decimal " "place.")) - logger.debug("Check for errors in 'pheno' file(s).") + print("Check for errors in 'pheno' file(s).") _pheno_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple(( extractiondir.joinpath(_file), args.redisuri, @@ -456,7 +459,7 @@ def run_qc(# pylint: disable=[too-many-locals] # - Check the 3 checks above for phenose and phenonum values too # qc_phenose_files(…) # qc_phenonum_files(…) - logger.debug("Check for errors in 'phenose' file(s).") + print("Check for errors in 'phenose' file(s).") _phenose_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple(( extractiondir.joinpath(_file), args.redisuri, @@ -472,7 +475,7 @@ def run_qc(# pylint: disable=[too-many-locals] dec_err_fn ) for _file in cdata.get("phenose", [])))) - logger.debug("Check for errors in 'phenonum' file(s).") + print("Check for errors in 'phenonum' file(s).") _phenonum_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple(( extractiondir.joinpath(_file), args.redisuri, @@ -509,5 +512,5 @@ if __name__ == "__main__": type=Path) return parser.parse_args() - main = build_main(cli_args(), run_qc, __MODULE__) + main = build_main(cli_args(), run_qc, logger) sys.exit(main()) |