aboutsummaryrefslogtreecommitdiff
path: root/qc_app/jobs.py
diff options
context:
space:
mode:
Diffstat (limited to 'qc_app/jobs.py')
-rw-r--r--qc_app/jobs.py130
1 files changed, 0 insertions, 130 deletions
diff --git a/qc_app/jobs.py b/qc_app/jobs.py
deleted file mode 100644
index 21889da..0000000
--- a/qc_app/jobs.py
+++ /dev/null
@@ -1,130 +0,0 @@
-"""Handle jobs"""
-import os
-import sys
-import shlex
-import subprocess
-from uuid import UUID, uuid4
-from datetime import timedelta
-from typing import Union, Optional
-
-from redis import Redis
-from flask import current_app as app
-
-JOBS_PREFIX = "JOBS"
-
-class JobNotFound(Exception):
- """Raised if we try to retrieve a non-existent job."""
-
-def jobsnamespace():
- """
- Return the jobs namespace prefix. It depends on app configuration.
-
- Calling this function outside of an application context will cause an
- exception to be raised. It is mostly a convenience utility to use within the
- application.
- """
- return f"{app.config['GNQC_REDIS_PREFIX']}:{JOBS_PREFIX}"
-
-def job_key(namespaceprefix: str, jobid: Union[str, UUID]) -> str:
- """Build the key by appending it to the namespace prefix."""
- return f"{namespaceprefix}:{jobid}"
-
-def raise_jobnotfound(rprefix:str, jobid: Union[str,UUID]):
- """Utility to raise a `NoSuchJobError`"""
- raise JobNotFound(f"Could not retrieve job '{jobid}' from '{rprefix}.")
-
-def error_filename(jobid, error_dir):
- "Compute the path of the file where errors will be dumped."
- return f"{error_dir}/job_{jobid}.error"
-
-def initialise_job(# pylint: disable=[too-many-arguments]
- rconn: Redis, rprefix: str, jobid: str, command: list, job_type: str,
- ttl_seconds: int = 86400, extra_meta: Optional[dict] = None) -> dict:
- "Initialise a job 'object' and put in on redis"
- the_job = {
- "jobid": jobid, "command": shlex.join(command), "status": "pending",
- "percent": 0, "job-type": job_type, **(extra_meta or {})
- }
- rconn.hset(job_key(rprefix, jobid), mapping=the_job)
- rconn.expire(
- name=job_key(rprefix, jobid), time=timedelta(seconds=ttl_seconds))
- return the_job
-
-def build_file_verification_job(#pylint: disable=[too-many-arguments]
- redis_conn: Redis,
- dburi: str,
- redisuri: str,
- speciesid: int,
- filepath: str,
- filetype: str,
- ttl_seconds: int):
- "Build a file verification job"
- jobid = str(uuid4())
- command = [
- sys.executable, "-m", "scripts.validate_file",
- dburi, redisuri, jobsnamespace(), jobid,
- "--redisexpiry", str(ttl_seconds),
- str(speciesid), filetype, filepath,
- ]
- return initialise_job(
- redis_conn, jobsnamespace(), jobid, command, "file-verification",
- ttl_seconds, {
- "filetype": filetype,
- "filename": os.path.basename(filepath), "percent": 0
- })
-
-def data_insertion_job(# pylint: disable=[too-many-arguments]
- redis_conn: Redis, filepath: str, filetype: str, totallines: int,
- speciesid: int, platformid: int, datasetid: int, databaseuri: str,
- redisuri: str, ttl_seconds: int) -> dict:
- "Build a data insertion job"
- jobid = str(uuid4())
- command = [
- sys.executable, "-m", "scripts.insert_data", filetype, filepath,
- speciesid, platformid, datasetid, databaseuri, redisuri
- ]
- return initialise_job(
- redis_conn, jobsnamespace(), jobid, command, "data-insertion",
- ttl_seconds, {
- "filename": os.path.basename(filepath), "filetype": filetype,
- "totallines": totallines
- })
-
-def launch_job(the_job: dict, redisurl: str, error_dir):
- """Launch a job in the background"""
- if not os.path.exists(error_dir):
- os.mkdir(error_dir)
-
- jobid = the_job["jobid"]
- with open(error_filename(jobid, error_dir),
- "w",
- encoding="utf-8") as errorfile:
- subprocess.Popen( # pylint: disable=[consider-using-with]
- [sys.executable, "-m", "scripts.worker", redisurl, jobsnamespace(),
- jobid],
- stderr=errorfile,
- env={"PYTHONPATH": ":".join(sys.path)})
-
- return the_job
-
-def job(rconn: Redis, rprefix: str, jobid: Union[str,UUID]):
- "Retrieve the job"
- thejob = (rconn.hgetall(job_key(rprefix, jobid)) or
- raise_jobnotfound(rprefix, jobid))
- return thejob
-
-def update_status(
- rconn: Redis, rprefix: str, jobid: Union[str, UUID], status: str):
- """Update status of job in redis."""
- rconn.hset(name=job_key(rprefix, jobid), key="status", value=status)
-
-def update_stdout_stderr(rconn: Redis,
- rprefix: str,
- jobid: Union[str, UUID],
- bytes_read: bytes,
- stream: str):
- "Update the stdout/stderr keys according to the value of `stream`."
- thejob = job(rconn, rprefix, jobid)
- contents = thejob.get(stream, '')
- new_contents = contents + bytes_read.decode("utf-8")
- rconn.hset(name=job_key(rprefix, jobid), key=stream, value=new_contents)