From cdd4dc456e56bb4eb055e1cb7f2518d45fb3bfb9 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Sat, 20 Jan 2024 09:57:23 +0300 Subject: Fetch sample/case names from database Fetch the sample/case names from the database rather than from a static file in the repository. Issue: https://issues.genenetwork.org/issues/quality-control/read-samples-from-database-by-species --- scripts/validate_file.py | 65 +++++++++++++++++++++++++----------------------- 1 file changed, 34 insertions(+), 31 deletions(-) (limited to 'scripts/validate_file.py') diff --git a/scripts/validate_file.py b/scripts/validate_file.py index 9f0a561..4b4fc0c 100644 --- a/scripts/validate_file.py +++ b/scripts/validate_file.py @@ -11,16 +11,20 @@ from redis.exceptions import ConnectionError # pylint: disable=[redefined-builti from quality_control.utils import make_progress_calculator from quality_control.parsing import FileType, strain_names, collect_errors -from .qc import cli_argument_parser +from qc_app.db_utils import database_connection -def make_progress_indicator( - redis_connection: Redis, job_id: str, - progress_calc_fn: Callable) -> Callable: +from .cli_parser import init_cli_parser +from .qc import add_file_validation_arguments + + +def make_progress_indicator(redis_connection: Redis, + jobid: str, + progress_calc_fn: Callable) -> Callable: """Make function that will compute the progress and update redis""" def __indicator__(linenumber, linetext): progress = progress_calc_fn(linenumber, linetext) - redis_connection.hset(name=job_id, mapping=progress._asdict()) + redis_connection.hset(name=str(jobid), mapping=progress._asdict()) return progress @@ -32,13 +36,9 @@ def cli_args_valid(args): print(f"The file '{args.filepath}' does not exist.", file=sys.stderr) return None - if not os.path.exists(args.strainsfile): - print(f"The file '{args.strainsfile}' does not exist.", file=sys.stderr) - return None - try: - conn = Redis.from_url(args.redisurl) # pylint: disable=[unused-variable] - except ConnectionError as conn_err: # pylint: disable=[unused-variable] + _conn = Redis.from_url(args.redisuri) + except ConnectionError as _conn_err: print(traceback.format_exc(), file=sys.stderr) return None @@ -46,28 +46,31 @@ def cli_args_valid(args): def process_cli_arguments(): """Setup command-line parser""" - parser = cli_argument_parser() - parser.prog = "worker" - parser.add_argument( - "redisurl", default="redis:///", help="URL to the redis server") - parser.add_argument("job_id", help="The id of the job being processed") + parser = init_cli_parser( + "validate-file", + ("Verify that the file with the expression data conforms to " + "expectations.")) + parser.add_argument("speciesid", + type=int, + help="Species for which the data is to be processed.") + parser = add_file_validation_arguments(parser) return cli_args_valid(parser.parse_args()) -def stream_error(redis_conn, job_id, error): +def stream_error(redis_conn, jobid, error): """Update redis with the most current error(s) found""" errors = jsonpickle.decode( - redis_conn.hget(job_id, key="errors") or jsonpickle.encode(tuple())) + redis_conn.hget(str(jobid), key="errors") or jsonpickle.encode(tuple())) redis_conn.hset( - job_id, key="errors", value=jsonpickle.encode(errors + (error,))) + str(jobid), key="errors", value=jsonpickle.encode(errors + (error,))) -def make_user_aborted(redis_conn, job_id): +def make_user_aborted(redis_conn, jobid): """Mkae function that checks whether the user aborted the process""" def __aborted__(): user_aborted = bool(int( - redis_conn.hget(name=job_id, key="user_aborted") or "0")) + redis_conn.hget(name=str(jobid), key="user_aborted") or "0")) if user_aborted: - redis_conn.hset(name=job_id, key="status", value="aborted") + redis_conn.hset(name=str(jobid), key="status", value="aborted") return user_aborted return __aborted__ @@ -84,36 +87,36 @@ def main(): print("Quiting due to errors!", file=sys.stderr) return 1 - with Redis.from_url(args.redisurl) as redis_conn: + with (Redis.from_url(args.redisuri) as redis_conn, + database_connection(args.databaseuri) as dbconn): progress_calculator = make_progress_calculator( get_zipfile_size(args.filepath) if is_zipfile(args.filepath) else os.stat(args.filepath).st_size) progress_indicator = make_progress_indicator( - redis_conn, args.job_id, progress_calculator) + redis_conn, args.jobid, progress_calculator) count = args.count filepath = args.filepath filetype = ( FileType.AVERAGE if args.filetype == "average" else FileType.STANDARD_ERROR) - strains = strain_names(args.strainsfile) + strains = strain_names(dbconn, args.speciesid) + redis_conn.hset(name=str(args.jobid), key="status", value="Processing") redis_conn.hset( - name=args.job_id, key="status", value="Processing") - redis_conn.hset( - name=args.job_id, key="message", value="Collecting errors") + name=str(args.jobid), key="message", value="Collecting errors") error_count = 0 for error in collect_errors( filepath, filetype, strains, progress_indicator, - make_user_aborted(redis_conn, args.job_id)): - stream_error(redis_conn, args.job_id, error) + make_user_aborted(redis_conn, args.jobid)): + stream_error(redis_conn, args.jobid, error) if count > 0: error_count = error_count + 1 if error_count >= count: break - redis_conn.hset(name=args.job_id, key="status", value="success") + redis_conn.hset(name=str(args.jobid), key="status", value="success") return 0 -- cgit v1.2.3