aboutsummaryrefslogtreecommitdiff
path: root/qc_app/jobs.py
blob: a8257a360657c847ace169039d084e2a1a13c8a9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
"""Handle jobs"""
import os
import sys
import shlex
import subprocess
from typing import Union
from uuid import UUID, uuid4
from datetime import timedelta

from redis import Redis

class JobNotFound(Exception):
    """Raised if we try to retrieve a non-existent job."""

def raise_jobnotfound(jobid: Union[str,UUID]):
    """Utility to raise a `NoSuchJobError`"""
    raise JobNotFound(f"Could not retrieve job '{jobid}'.")

def error_filename(job_id, error_dir):
    "Compute the path of the file where errors will be dumped."
    return f"{error_dir}/job_{job_id}.error"

def initialise_job(# pylint: disable=[too-many-arguments]
        redis_conn: Redis, job_id: str, command: list, job_type: str,
        ttl_seconds: int, extra_meta: dict) -> dict:
    "Initialise a job 'object' and put in on redis"
    the_job = {
        "job_id": job_id, "command": shlex.join(command), "status": "pending",
        "percent": 0, "job-type": job_type, **extra_meta
    }
    redis_conn.hset(name=the_job["job_id"], mapping=the_job)
    redis_conn.expire(name=the_job["job_id"], time=timedelta(seconds=ttl_seconds))
    return the_job

def build_file_verification_job(
        redis_conn: Redis, filepath: str, filetype: str, redisurl: str,
        ttl_seconds: int):
    "Build a file verification job"
    job_id = str(uuid4())
    command = [
        sys.executable, "-m", "scripts.validate_file", filetype, filepath, redisurl,
        job_id
    ]
    return initialise_job(
        redis_conn, job_id, command, "file-verification", ttl_seconds, {
            "filetype": filetype,
            "filename": os.path.basename(filepath), "percent": 0
        })

def data_insertion_job(# pylint: disable=[too-many-arguments]
        redis_conn: Redis, filepath: str, filetype: str, totallines: int,
        speciesid: int, platformid: int, datasetid: int, databaseuri: str,
        redisuri: str, ttl_seconds: int) -> dict:
    "Build a data insertion job"
    command = [
        sys.executable, "-m", "scripts.insert_data", filetype, filepath,
        speciesid, platformid, datasetid, databaseuri, redisuri
    ]
    return initialise_job(
        redis_conn, str(uuid4()), command, "data-insertion", ttl_seconds, {
            "filename": os.path.basename(filepath), "filetype": filetype,
            "totallines": totallines
        })

def launch_job(the_job: dict, redisurl: str, error_dir):
    """Launch a job in the background"""
    if not os.path.exists(error_dir):
        os.mkdir(error_dir)

    job_id = the_job["job_id"]
    with open(error_filename(job_id, error_dir),
              "w",
              encoding="utf-8") as errorfile:
        subprocess.Popen( # pylint: disable=[consider-using-with]
            [sys.executable, "-m", "scripts.worker", redisurl, job_id],
            stderr=errorfile,
            env={"PYTHONPATH": ":".join(sys.path)})

    return the_job

def job(redis_conn, job_id: Union[str,UUID]):
    "Retrieve the job"
    thejob = redis_conn.hgetall(str(job_id)) or raise_jobnotfound(job_id)
    return thejob