about summary refs log tree commit diff
path: root/qc_app/jobs.py
diff options
context:
space:
mode:
Diffstat (limited to 'qc_app/jobs.py')
-rw-r--r--qc_app/jobs.py130
1 files changed, 0 insertions, 130 deletions
diff --git a/qc_app/jobs.py b/qc_app/jobs.py
deleted file mode 100644
index 21889da..0000000
--- a/qc_app/jobs.py
+++ /dev/null
@@ -1,130 +0,0 @@
-"""Handle jobs"""
-import os
-import sys
-import shlex
-import subprocess
-from uuid import UUID, uuid4
-from datetime import timedelta
-from typing import Union, Optional
-
-from redis import Redis
-from flask import current_app as app
-
-JOBS_PREFIX = "JOBS"
-
-class JobNotFound(Exception):
-    """Raised if we try to retrieve a non-existent job."""
-
-def jobsnamespace():
-    """
-    Return the jobs namespace prefix. It depends on app configuration.
-
-    Calling this function outside of an application context will cause an
-    exception to be raised. It is mostly a convenience utility to use within the
-    application.
-    """
-    return f"{app.config['GNQC_REDIS_PREFIX']}:{JOBS_PREFIX}"
-
-def job_key(namespaceprefix: str, jobid: Union[str, UUID]) -> str:
-    """Build the key by appending it to the namespace prefix."""
-    return f"{namespaceprefix}:{jobid}"
-
-def raise_jobnotfound(rprefix:str, jobid: Union[str,UUID]):
-    """Utility to raise a `NoSuchJobError`"""
-    raise JobNotFound(f"Could not retrieve job '{jobid}' from '{rprefix}.")
-
-def error_filename(jobid, error_dir):
-    "Compute the path of the file where errors will be dumped."
-    return f"{error_dir}/job_{jobid}.error"
-
-def initialise_job(# pylint: disable=[too-many-arguments]
-        rconn: Redis, rprefix: str, jobid: str, command: list, job_type: str,
-        ttl_seconds: int = 86400, extra_meta: Optional[dict] = None) -> dict:
-    "Initialise a job 'object' and put in on redis"
-    the_job = {
-        "jobid": jobid, "command": shlex.join(command), "status": "pending",
-        "percent": 0, "job-type": job_type, **(extra_meta or {})
-    }
-    rconn.hset(job_key(rprefix, jobid), mapping=the_job)
-    rconn.expire(
-        name=job_key(rprefix, jobid), time=timedelta(seconds=ttl_seconds))
-    return the_job
-
-def build_file_verification_job(#pylint: disable=[too-many-arguments]
-        redis_conn: Redis,
-        dburi: str,
-        redisuri: str,
-        speciesid: int,
-        filepath: str,
-        filetype: str,
-        ttl_seconds: int):
-    "Build a file verification job"
-    jobid = str(uuid4())
-    command = [
-        sys.executable, "-m", "scripts.validate_file",
-        dburi, redisuri, jobsnamespace(), jobid,
-        "--redisexpiry", str(ttl_seconds),
-        str(speciesid), filetype, filepath,
-    ]
-    return initialise_job(
-        redis_conn, jobsnamespace(), jobid, command, "file-verification",
-        ttl_seconds, {
-            "filetype": filetype,
-            "filename": os.path.basename(filepath), "percent": 0
-        })
-
-def data_insertion_job(# pylint: disable=[too-many-arguments]
-        redis_conn: Redis, filepath: str, filetype: str, totallines: int,
-        speciesid: int, platformid: int, datasetid: int, databaseuri: str,
-        redisuri: str, ttl_seconds: int) -> dict:
-    "Build a data insertion job"
-    jobid = str(uuid4())
-    command = [
-        sys.executable, "-m", "scripts.insert_data", filetype, filepath,
-        speciesid, platformid, datasetid, databaseuri, redisuri
-    ]
-    return initialise_job(
-        redis_conn, jobsnamespace(), jobid, command, "data-insertion",
-        ttl_seconds, {
-            "filename": os.path.basename(filepath), "filetype": filetype,
-            "totallines": totallines
-        })
-
-def launch_job(the_job: dict, redisurl: str, error_dir):
-    """Launch a job in the background"""
-    if not os.path.exists(error_dir):
-        os.mkdir(error_dir)
-
-    jobid = the_job["jobid"]
-    with open(error_filename(jobid, error_dir),
-              "w",
-              encoding="utf-8") as errorfile:
-        subprocess.Popen( # pylint: disable=[consider-using-with]
-            [sys.executable, "-m", "scripts.worker", redisurl, jobsnamespace(),
-             jobid],
-            stderr=errorfile,
-            env={"PYTHONPATH": ":".join(sys.path)})
-
-    return the_job
-
-def job(rconn: Redis, rprefix: str, jobid: Union[str,UUID]):
-    "Retrieve the job"
-    thejob = (rconn.hgetall(job_key(rprefix, jobid)) or
-              raise_jobnotfound(rprefix, jobid))
-    return thejob
-
-def update_status(
-        rconn: Redis, rprefix: str, jobid: Union[str, UUID], status: str):
-    """Update status of job in redis."""
-    rconn.hset(name=job_key(rprefix, jobid), key="status", value=status)
-
-def update_stdout_stderr(rconn: Redis,
-                         rprefix: str,
-                         jobid: Union[str, UUID],
-                         bytes_read: bytes,
-                         stream: str):
-    "Update the stdout/stderr keys according to the value of `stream`."
-    thejob = job(rconn, rprefix, jobid)
-    contents = thejob.get(stream, '')
-    new_contents = contents + bytes_read.decode("utf-8")
-    rconn.hset(name=job_key(rprefix, jobid), key=stream, value=new_contents)