diff options
Diffstat (limited to 'uploader/jobs.py')
-rw-r--r-- | uploader/jobs.py | 45 |
1 files changed, 41 insertions, 4 deletions
diff --git a/uploader/jobs.py b/uploader/jobs.py index 21889da..5968c03 100644 --- a/uploader/jobs.py +++ b/uploader/jobs.py @@ -1,6 +1,8 @@ """Handle jobs""" import os import sys +import uuid +import json import shlex import subprocess from uuid import UUID, uuid4 @@ -10,7 +12,9 @@ from typing import Union, Optional from redis import Redis from flask import current_app as app -JOBS_PREFIX = "JOBS" +from functional_tools import take + +JOBS_PREFIX = "jobs" class JobNotFound(Exception): """Raised if we try to retrieve a non-existent job.""" @@ -37,7 +41,8 @@ def error_filename(jobid, error_dir): "Compute the path of the file where errors will be dumped." return f"{error_dir}/job_{jobid}.error" -def initialise_job(# pylint: disable=[too-many-arguments] +def initialise_job( + # pylint: disable=[too-many-arguments, too-many-positional-arguments] rconn: Redis, rprefix: str, jobid: str, command: list, job_type: str, ttl_seconds: int = 86400, extra_meta: Optional[dict] = None) -> dict: "Initialise a job 'object' and put in on redis" @@ -50,7 +55,8 @@ def initialise_job(# pylint: disable=[too-many-arguments] name=job_key(rprefix, jobid), time=timedelta(seconds=ttl_seconds)) return the_job -def build_file_verification_job(#pylint: disable=[too-many-arguments] +def build_file_verification_job( + #pylint: disable=[too-many-arguments, too-many-positional-arguments] redis_conn: Redis, dburi: str, redisuri: str, @@ -73,7 +79,8 @@ def build_file_verification_job(#pylint: disable=[too-many-arguments] "filename": os.path.basename(filepath), "percent": 0 }) -def data_insertion_job(# pylint: disable=[too-many-arguments] +def data_insertion_job( + # pylint: disable=[too-many-arguments, too-many-positional-arguments] redis_conn: Redis, filepath: str, filetype: str, totallines: int, speciesid: int, platformid: int, datasetid: int, databaseuri: str, redisuri: str, ttl_seconds: int) -> dict: @@ -128,3 +135,33 @@ def update_stdout_stderr(rconn: Redis, contents = thejob.get(stream, '') new_contents = contents + bytes_read.decode("utf-8") rconn.hset(name=job_key(rprefix, jobid), key=stream, value=new_contents) + + +def job_errors( + rconn: Redis, + prefix: str, + job_id: Union[str, uuid.UUID], + count: int = 100 +) -> list: + """Fetch job errors""" + return take( + ( + json.loads(error) + for key in rconn.keys(f"{prefix}:{str(job_id)}:*:errors:*") + for error in rconn.lrange(key, 0, -1)), + count) + + +def job_files_metadata( + rconn: Redis, + prefix: str, + job_id: Union[str, uuid.UUID] +) -> dict: + """Get the metadata for specific job file.""" + return { + key.split(":")[-1]: { + **rconn.hgetall(key), + "filetype": key.split(":")[-3] + } + for key in rconn.keys(f"{prefix}:{str(job_id)}:*:metadata*") + } |