aboutsummaryrefslogtreecommitdiff
path: root/qc_app/jobs.py
blob: afea419b840b30cc7d241be164182774a62de693 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import jsonpickle
import sqlite3

from rq import Queue
from rq.job import Job
from redis import Redis
from flask import current_app as app

def enqueue_job(delayed_fn: str, *args, **kwargs):
    """Add job to queue"""
    with Redis.from_url(app.config["REDIS_URL"]) as rconn:
        job = Job.create(
            delayed_fn, args, **{
                key: val for key, val in kwargs.items()
                if key != "additional_jobs_meta"
            },
            connection = rconn,
            timeout = "2h",
            result_ttl = "5h",
            failure_ttl = "5h"
        )

        queue = Queue("qcapp_queue", connection=rconn)
        queue.enqueue_job(job)

    return job

def job(job_id: str):
    "Retrieve the job"
    with Redis.from_url(app.config["REDIS_URL"]) as rconn:
        queue = Queue("qcapp_queue", connection=rconn)
        job = queue.fetch_job(job_id)
        job.refresh()

    return job

def create_jobs_table(dbpath: str):
    """Create sqlite3 table to track job progress"""
    conn = sqlite3.connect(dbpath)
    cursor = conn.cursor()
    cursor.execute("CREATE TABLE IF NOT EXISTS jobs(job_id TEXT, job_meta TEXT)")
    cursor.close()
    conn.close()

def retrieve_meta(conn, job_id: str):
    """Retrieve the job's metadata."""
    meta = {}
    try:
        cursor = conn.cursor()
        cursor.execute(
            "SELECT * FROM jobs WHERE job_id=:job_id", {"job_id": job_id})
        db_row = cursor.fetchone()
        # meta = json.loads(db_row [1]) if db_row else {}
        meta = jsonpickle.decode(db_row [1]) if db_row else {}
    except Exception as exc:
        cursor.close()
        raise exc
    finally:
        cursor.close()

    return meta

def update_meta(conn, job_id: str, **meta):
    """Update the job's metadata."""
    try:
        cursor = conn.cursor()
        old_meta = retrieve_meta(conn, job_id)
        meta = {**old_meta, **meta}
        query = "UPDATE jobs SET job_meta = :meta WHERE job_id = :job_id"
        if not bool(old_meta):
            query = "INSERT INTO jobs VALUES (:job_id, :meta)"

        cursor.execute(
            query, {"job_id": job_id, "meta": jsonpickle.encode(meta)})
        conn.commit()
    except Exception as exc:
        cursor.close()
        raise exc
    finally:
        cursor.close()

    return meta