"""Delete phenotypes.""" import sys import logging from pathlib import Path from typing import Optional from urllib.parse import urljoin from argparse import Namespace, ArgumentParser import requests from MySQLdb.cursors import DictCursor, BaseCursor from gn_libs.mysqldb import database_connection from uploader.phenotypes.models import delete_phenotypes from scripts.cli.logging import setup_logging from scripts.cli.options import (add_logging, add_mariadb_uri, add_population_id) logger = logging.getLogger(__name__) def read_xref_ids_file(filepath: Optional[Path]) -> tuple[int, ...]: """Read the phenotypes' cross-reference IDS from file.""" if filepath is None: return tuple() _ids: tuple[int, ...] = tuple() with filepath.open(mode="r") as infile: try: _ids += (int(infile.readline().strip()),) except TypeError: pass return _ids def fetch_all_xref_ids( cursor: BaseCursor, population_id: int) -> tuple[int, ...]: """Fetch all cross-reference IDs.""" cursor.execute("SELECT Id FROM PublishXRef WHERE InbredSetId=%s", (population_id,)) return tuple(int(row["Id"]) for row in cursor.fetchall()) def update_auth( auth_details: tuple[str, str], species_id: int, population_id: int, dataset_id: int, xref_ids: tuple[int, ...] = tuple() ): """Update the authorisation server: remove items to delete.""" authserver, token = auth_details resp = requests.post( urljoin(authserver, (f"/auth/data/phenotypes/{species_id}/{population_id}" f"/{dataset_id}/delete")), timeout=(9.13, 20), headers={ "Authorization": f"Bearer {token}", "Content-Type": "application/json" }, json={"xref_ids": xref_ids}) resp.raise_for_status() def delete_the_phenotypes( cursor: BaseCursor, population_id: int, xref_ids: tuple[int, ...] = tuple()) -> int: """Process and delete the phenotypes.""" delete_phenotypes(cursor, population_id, xref_ids) return 0 if __name__ == "__main__": def parse_args() -> Namespace: """Parse CLI arguments.""" parser = add_logging( add_population_id( add_mariadb_uri( ArgumentParser( prog="delete-phenotypes", description=( "Script to delete phenotypes from the database."))))) parser.add_argument( "dataset_id", metavar="DATASET-ID", type=int, help="The dataset identifier for phenotypes to delete.") parser.add_argument( "auth_server_uri", metavar="AUTH-SERVER-URI", type=str, help="URI to the authorisation server.") parser.add_argument( "auth_token", metavar="AUTH-TOKEN", type=str, help=("Token to use to update the authorisation system with the " "deletions done.")) parser.add_argument( "--xref_ids_file", metavar="XREF-IDS-FILE", type=Path, help=("Path to a file with phenotypes cross-reference IDs to " "delete.")) parser.add_argument( "--delete-all", action="store_true", help=("If no 'XREF-IDS-FILE' is provided, this flag determines " "whether or not all the phenotypes for the given population " "will be deleted.")) return parser.parse_args() def main(): """The `delete-phenotypes` script's entry point.""" args = parse_args() setup_logging(logger, args.log_level.upper(), tuple()) with (database_connection(args.db_uri) as conn, conn.cursor(cursorclass=DictCursor) as cursor): xref_ids = read_xref_ids_file(args.xref_ids_file) try: assert not (len(xref_ids) > 0 and args.delete_all) xref_ids = (fetch_all_xref_ids(cursor, args.population_id) if args.delete_all else xref_ids) if len(xref_ids) == 0: print("No cross-reference IDs were provided. Aborting.") return 0 update_auth((args.auth_server_uri, args.auth_token), args.species_id, args.population_id, args.dataset_id, xref_ids) delete_phenotypes(cursor, args.population_id, xref_ids=xref_ids) return 0 except AssertionError: logger.error( "'DELETE-ALL' and 'XREF-IDS' are mutually exclusive. " "If you specify the list of XREF-IDS (in a file) to delete " "and also specify to 'DELETE-ALL' phenotypes in the " "population, we have no way of knowing what it is you want.") return 1 except Exception as _exc:# pylint: disable=[broad-exception-caught] logger.debug("Failed while attempting to delete phenotypes.", exc_info=True) return 1 sys.exit(main())