aboutsummaryrefslogtreecommitdiff
path: root/uploader/jobs.py
diff options
context:
space:
mode:
Diffstat (limited to 'uploader/jobs.py')
-rw-r--r--uploader/jobs.py45
1 files changed, 41 insertions, 4 deletions
diff --git a/uploader/jobs.py b/uploader/jobs.py
index 21889da..5968c03 100644
--- a/uploader/jobs.py
+++ b/uploader/jobs.py
@@ -1,6 +1,8 @@
"""Handle jobs"""
import os
import sys
+import uuid
+import json
import shlex
import subprocess
from uuid import UUID, uuid4
@@ -10,7 +12,9 @@ from typing import Union, Optional
from redis import Redis
from flask import current_app as app
-JOBS_PREFIX = "JOBS"
+from functional_tools import take
+
+JOBS_PREFIX = "jobs"
class JobNotFound(Exception):
"""Raised if we try to retrieve a non-existent job."""
@@ -37,7 +41,8 @@ def error_filename(jobid, error_dir):
"Compute the path of the file where errors will be dumped."
return f"{error_dir}/job_{jobid}.error"
-def initialise_job(# pylint: disable=[too-many-arguments]
+def initialise_job(
+ # pylint: disable=[too-many-arguments, too-many-positional-arguments]
rconn: Redis, rprefix: str, jobid: str, command: list, job_type: str,
ttl_seconds: int = 86400, extra_meta: Optional[dict] = None) -> dict:
"Initialise a job 'object' and put in on redis"
@@ -50,7 +55,8 @@ def initialise_job(# pylint: disable=[too-many-arguments]
name=job_key(rprefix, jobid), time=timedelta(seconds=ttl_seconds))
return the_job
-def build_file_verification_job(#pylint: disable=[too-many-arguments]
+def build_file_verification_job(
+ #pylint: disable=[too-many-arguments, too-many-positional-arguments]
redis_conn: Redis,
dburi: str,
redisuri: str,
@@ -73,7 +79,8 @@ def build_file_verification_job(#pylint: disable=[too-many-arguments]
"filename": os.path.basename(filepath), "percent": 0
})
-def data_insertion_job(# pylint: disable=[too-many-arguments]
+def data_insertion_job(
+ # pylint: disable=[too-many-arguments, too-many-positional-arguments]
redis_conn: Redis, filepath: str, filetype: str, totallines: int,
speciesid: int, platformid: int, datasetid: int, databaseuri: str,
redisuri: str, ttl_seconds: int) -> dict:
@@ -128,3 +135,33 @@ def update_stdout_stderr(rconn: Redis,
contents = thejob.get(stream, '')
new_contents = contents + bytes_read.decode("utf-8")
rconn.hset(name=job_key(rprefix, jobid), key=stream, value=new_contents)
+
+
+def job_errors(
+ rconn: Redis,
+ prefix: str,
+ job_id: Union[str, uuid.UUID],
+ count: int = 100
+) -> list:
+ """Fetch job errors"""
+ return take(
+ (
+ json.loads(error)
+ for key in rconn.keys(f"{prefix}:{str(job_id)}:*:errors:*")
+ for error in rconn.lrange(key, 0, -1)),
+ count)
+
+
+def job_files_metadata(
+ rconn: Redis,
+ prefix: str,
+ job_id: Union[str, uuid.UUID]
+) -> dict:
+ """Get the metadata for specific job file."""
+ return {
+ key.split(":")[-1]: {
+ **rconn.hgetall(key),
+ "filetype": key.split(":")[-3]
+ }
+ for key in rconn.keys(f"{prefix}:{str(job_id)}:*:metadata*")
+ }