1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
"""Handle jobs"""
import os
import shlex
import subprocess
from uuid import uuid4
from datetime import timedelta
from redis import Redis
def error_filename(job_id, error_dir):
"Compute the path of the file where errors will be dumped."
return f"{error_dir}/job_{job_id}.error"
def launch_job(# pylint: disable=[too-many-arguments]
redis_conn: Redis, filepath: str, filetype, redisurl, error_dir,
ttl_seconds: int):
"""Launch a job in the background"""
job_id = str(uuid4())
command = [
"python3", "-m", "scripts.worker", filetype, filepath, redisurl, job_id]
the_job = {
"job_id": job_id, "command": shlex.join(command), "status": "pending",
"filename": os.path.basename(filepath), "percent": 0,
"filetype": filetype
}
redis_conn.hset(name=the_job["job_id"], mapping=the_job)
redis_conn.expire(name=the_job["job_id"], time=timedelta(seconds=ttl_seconds))
if not os.path.exists(error_dir):
os.mkdir(error_dir)
with open(error_filename(job_id, error_dir),
"w",
encoding="utf-8") as errorfile:
subprocess.Popen(command, stderr=errorfile) # pylint: disable=[consider-using-with]
return the_job
def job(redis_conn, job_id: str):
"Retrieve the job"
return redis_conn.hgetall(job_id)
|