about summary refs log tree commit diff
path: root/uploader/jobs.py
diff options
context:
space:
mode:
Diffstat (limited to 'uploader/jobs.py')
-rw-r--r--uploader/jobs.py130
1 files changed, 130 insertions, 0 deletions
diff --git a/uploader/jobs.py b/uploader/jobs.py
new file mode 100644
index 0000000..21889da
--- /dev/null
+++ b/uploader/jobs.py
@@ -0,0 +1,130 @@
+"""Handle jobs"""
+import os
+import sys
+import shlex
+import subprocess
+from uuid import UUID, uuid4
+from datetime import timedelta
+from typing import Union, Optional
+
+from redis import Redis
+from flask import current_app as app
+
+JOBS_PREFIX = "JOBS"
+
+class JobNotFound(Exception):
+    """Raised if we try to retrieve a non-existent job."""
+
+def jobsnamespace():
+    """
+    Return the jobs namespace prefix. It depends on app configuration.
+
+    Calling this function outside of an application context will cause an
+    exception to be raised. It is mostly a convenience utility to use within the
+    application.
+    """
+    return f"{app.config['GNQC_REDIS_PREFIX']}:{JOBS_PREFIX}"
+
+def job_key(namespaceprefix: str, jobid: Union[str, UUID]) -> str:
+    """Build the key by appending it to the namespace prefix."""
+    return f"{namespaceprefix}:{jobid}"
+
+def raise_jobnotfound(rprefix:str, jobid: Union[str,UUID]):
+    """Utility to raise a `NoSuchJobError`"""
+    raise JobNotFound(f"Could not retrieve job '{jobid}' from '{rprefix}.")
+
+def error_filename(jobid, error_dir):
+    "Compute the path of the file where errors will be dumped."
+    return f"{error_dir}/job_{jobid}.error"
+
+def initialise_job(# pylint: disable=[too-many-arguments]
+        rconn: Redis, rprefix: str, jobid: str, command: list, job_type: str,
+        ttl_seconds: int = 86400, extra_meta: Optional[dict] = None) -> dict:
+    "Initialise a job 'object' and put in on redis"
+    the_job = {
+        "jobid": jobid, "command": shlex.join(command), "status": "pending",
+        "percent": 0, "job-type": job_type, **(extra_meta or {})
+    }
+    rconn.hset(job_key(rprefix, jobid), mapping=the_job)
+    rconn.expire(
+        name=job_key(rprefix, jobid), time=timedelta(seconds=ttl_seconds))
+    return the_job
+
+def build_file_verification_job(#pylint: disable=[too-many-arguments]
+        redis_conn: Redis,
+        dburi: str,
+        redisuri: str,
+        speciesid: int,
+        filepath: str,
+        filetype: str,
+        ttl_seconds: int):
+    "Build a file verification job"
+    jobid = str(uuid4())
+    command = [
+        sys.executable, "-m", "scripts.validate_file",
+        dburi, redisuri, jobsnamespace(), jobid,
+        "--redisexpiry", str(ttl_seconds),
+        str(speciesid), filetype, filepath,
+    ]
+    return initialise_job(
+        redis_conn, jobsnamespace(), jobid, command, "file-verification",
+        ttl_seconds, {
+            "filetype": filetype,
+            "filename": os.path.basename(filepath), "percent": 0
+        })
+
+def data_insertion_job(# pylint: disable=[too-many-arguments]
+        redis_conn: Redis, filepath: str, filetype: str, totallines: int,
+        speciesid: int, platformid: int, datasetid: int, databaseuri: str,
+        redisuri: str, ttl_seconds: int) -> dict:
+    "Build a data insertion job"
+    jobid = str(uuid4())
+    command = [
+        sys.executable, "-m", "scripts.insert_data", filetype, filepath,
+        speciesid, platformid, datasetid, databaseuri, redisuri
+    ]
+    return initialise_job(
+        redis_conn, jobsnamespace(), jobid, command, "data-insertion",
+        ttl_seconds, {
+            "filename": os.path.basename(filepath), "filetype": filetype,
+            "totallines": totallines
+        })
+
+def launch_job(the_job: dict, redisurl: str, error_dir):
+    """Launch a job in the background"""
+    if not os.path.exists(error_dir):
+        os.mkdir(error_dir)
+
+    jobid = the_job["jobid"]
+    with open(error_filename(jobid, error_dir),
+              "w",
+              encoding="utf-8") as errorfile:
+        subprocess.Popen( # pylint: disable=[consider-using-with]
+            [sys.executable, "-m", "scripts.worker", redisurl, jobsnamespace(),
+             jobid],
+            stderr=errorfile,
+            env={"PYTHONPATH": ":".join(sys.path)})
+
+    return the_job
+
+def job(rconn: Redis, rprefix: str, jobid: Union[str,UUID]):
+    "Retrieve the job"
+    thejob = (rconn.hgetall(job_key(rprefix, jobid)) or
+              raise_jobnotfound(rprefix, jobid))
+    return thejob
+
+def update_status(
+        rconn: Redis, rprefix: str, jobid: Union[str, UUID], status: str):
+    """Update status of job in redis."""
+    rconn.hset(name=job_key(rprefix, jobid), key="status", value=status)
+
+def update_stdout_stderr(rconn: Redis,
+                         rprefix: str,
+                         jobid: Union[str, UUID],
+                         bytes_read: bytes,
+                         stream: str):
+    "Update the stdout/stderr keys according to the value of `stream`."
+    thejob = job(rconn, rprefix, jobid)
+    contents = thejob.get(stream, '')
+    new_contents = contents + bytes_read.decode("utf-8")
+    rconn.hset(name=job_key(rprefix, jobid), key=stream, value=new_contents)