aboutsummaryrefslogtreecommitdiff
path: root/uploader/jobs.py
diff options
context:
space:
mode:
Diffstat (limited to 'uploader/jobs.py')
-rw-r--r--uploader/jobs.py130
1 files changed, 130 insertions, 0 deletions
diff --git a/uploader/jobs.py b/uploader/jobs.py
new file mode 100644
index 0000000..4a3fc80
--- /dev/null
+++ b/uploader/jobs.py
@@ -0,0 +1,130 @@
+"""Handle jobs"""
+import os
+import sys
+import shlex
+import subprocess
+from uuid import UUID, uuid4
+from datetime import timedelta
+from typing import Union, Optional
+
+from redis import Redis
+from flask import current_app as app
+
+JOBS_PREFIX = "jobs"
+
+class JobNotFound(Exception):
+ """Raised if we try to retrieve a non-existent job."""
+
+def jobsnamespace():
+ """
+ Return the jobs namespace prefix. It depends on app configuration.
+
+ Calling this function outside of an application context will cause an
+ exception to be raised. It is mostly a convenience utility to use within the
+ application.
+ """
+ return f"{app.config['GNQC_REDIS_PREFIX']}:{JOBS_PREFIX}"
+
+def job_key(namespaceprefix: str, jobid: Union[str, UUID]) -> str:
+ """Build the key by appending it to the namespace prefix."""
+ return f"{namespaceprefix}:{jobid}"
+
+def raise_jobnotfound(rprefix:str, jobid: Union[str,UUID]):
+ """Utility to raise a `NoSuchJobError`"""
+ raise JobNotFound(f"Could not retrieve job '{jobid}' from '{rprefix}.")
+
+def error_filename(jobid, error_dir):
+ "Compute the path of the file where errors will be dumped."
+ return f"{error_dir}/job_{jobid}.error"
+
+def initialise_job(# pylint: disable=[too-many-arguments]
+ rconn: Redis, rprefix: str, jobid: str, command: list, job_type: str,
+ ttl_seconds: int = 86400, extra_meta: Optional[dict] = None) -> dict:
+ "Initialise a job 'object' and put in on redis"
+ the_job = {
+ "jobid": jobid, "command": shlex.join(command), "status": "pending",
+ "percent": 0, "job-type": job_type, **(extra_meta or {})
+ }
+ rconn.hset(job_key(rprefix, jobid), mapping=the_job)
+ rconn.expire(
+ name=job_key(rprefix, jobid), time=timedelta(seconds=ttl_seconds))
+ return the_job
+
+def build_file_verification_job(#pylint: disable=[too-many-arguments]
+ redis_conn: Redis,
+ dburi: str,
+ redisuri: str,
+ speciesid: int,
+ filepath: str,
+ filetype: str,
+ ttl_seconds: int):
+ "Build a file verification job"
+ jobid = str(uuid4())
+ command = [
+ sys.executable, "-m", "scripts.validate_file",
+ dburi, redisuri, jobsnamespace(), jobid,
+ "--redisexpiry", str(ttl_seconds),
+ str(speciesid), filetype, filepath,
+ ]
+ return initialise_job(
+ redis_conn, jobsnamespace(), jobid, command, "file-verification",
+ ttl_seconds, {
+ "filetype": filetype,
+ "filename": os.path.basename(filepath), "percent": 0
+ })
+
+def data_insertion_job(# pylint: disable=[too-many-arguments]
+ redis_conn: Redis, filepath: str, filetype: str, totallines: int,
+ speciesid: int, platformid: int, datasetid: int, databaseuri: str,
+ redisuri: str, ttl_seconds: int) -> dict:
+ "Build a data insertion job"
+ jobid = str(uuid4())
+ command = [
+ sys.executable, "-m", "scripts.insert_data", filetype, filepath,
+ speciesid, platformid, datasetid, databaseuri, redisuri
+ ]
+ return initialise_job(
+ redis_conn, jobsnamespace(), jobid, command, "data-insertion",
+ ttl_seconds, {
+ "filename": os.path.basename(filepath), "filetype": filetype,
+ "totallines": totallines
+ })
+
+def launch_job(the_job: dict, redisurl: str, error_dir):
+ """Launch a job in the background"""
+ if not os.path.exists(error_dir):
+ os.mkdir(error_dir)
+
+ jobid = the_job["jobid"]
+ with open(error_filename(jobid, error_dir),
+ "w",
+ encoding="utf-8") as errorfile:
+ subprocess.Popen( # pylint: disable=[consider-using-with]
+ [sys.executable, "-m", "scripts.worker", redisurl, jobsnamespace(),
+ jobid],
+ stderr=errorfile,
+ env={"PYTHONPATH": ":".join(sys.path)})
+
+ return the_job
+
+def job(rconn: Redis, rprefix: str, jobid: Union[str,UUID]):
+ "Retrieve the job"
+ thejob = (rconn.hgetall(job_key(rprefix, jobid)) or
+ raise_jobnotfound(rprefix, jobid))
+ return thejob
+
+def update_status(
+ rconn: Redis, rprefix: str, jobid: Union[str, UUID], status: str):
+ """Update status of job in redis."""
+ rconn.hset(name=job_key(rprefix, jobid), key="status", value=status)
+
+def update_stdout_stderr(rconn: Redis,
+ rprefix: str,
+ jobid: Union[str, UUID],
+ bytes_read: bytes,
+ stream: str):
+ "Update the stdout/stderr keys according to the value of `stream`."
+ thejob = job(rconn, rprefix, jobid)
+ contents = thejob.get(stream, '')
+ new_contents = contents + bytes_read.decode("utf-8")
+ rconn.hset(name=job_key(rprefix, jobid), key=stream, value=new_contents)