1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
import jsonpickle
import sqlite3
from rq import Queue
from rq.job import Job
from redis import Redis
from flask import current_app as app
def enqueue_job(delayed_fn: str, *args, **kwargs):
"""Add job to queue"""
with Redis.from_url(app.config["REDIS_URL"]) as rconn:
job = Job.create(
delayed_fn, args, **{
key: val for key, val in kwargs.items()
if key != "additional_jobs_meta"
},
connection = rconn,
timeout = "2h",
result_ttl = "5h",
failure_ttl = "5h"
)
queue = Queue("qcapp_queue", connection=rconn)
queue.enqueue_job(job)
return job
def job(job_id: str):
"Retrieve the job"
with Redis.from_url(app.config["REDIS_URL"]) as rconn:
queue = Queue("qcapp_queue", connection=rconn)
job = queue.fetch_job(job_id)
job.refresh()
return job
def create_jobs_table(dbpath: str):
"""Create sqlite3 table to track job progress"""
conn = sqlite3.connect(dbpath)
cursor = conn.cursor()
cursor.execute("CREATE TABLE IF NOT EXISTS jobs(job_id TEXT, job_meta TEXT)")
cursor.close()
conn.close()
def retrieve_meta(conn, job_id: str):
"""Retrieve the job's metadata."""
meta = {}
try:
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM jobs WHERE job_id=:job_id", {"job_id": job_id})
db_row = cursor.fetchone()
# meta = json.loads(db_row [1]) if db_row else {}
meta = jsonpickle.decode(db_row [1]) if db_row else {}
except Exception as exc:
cursor.close()
raise exc
finally:
cursor.close()
return meta
def update_meta(conn, job_id: str, **meta):
"""Update the job's metadata."""
try:
cursor = conn.cursor()
old_meta = retrieve_meta(conn, job_id)
meta = {**old_meta, **meta}
query = "UPDATE jobs SET job_meta = :meta WHERE job_id = :job_id"
if not bool(old_meta):
query = "INSERT INTO jobs VALUES (:job_id, :meta)"
cursor.execute(
query, {"job_id": job_id, "meta": jsonpickle.encode(meta)})
conn.commit()
except Exception as exc:
cursor.close()
raise exc
finally:
cursor.close()
return meta
|