aboutsummaryrefslogtreecommitdiff
path: root/qc_app/jobs.py
blob: 908c2445fee42719217e8aee708727c3d70fc29b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from rq import Queue
from rq.job import Job
from redis import Redis
from flask import current_app as app

def enqueue_job(delayed_fn, *args, **kwargs):
    with Redis.from_url(app.config["REDIS_URL"]) as rconn:
        job = Job.create(
            delayed_fn, args, **{
                key: val for key, val in kwargs.items()
                if key != "additional_jobs_meta"
            },
            connection = rconn)
        job.meta["status"] = "enqueued"
        job.meta["progress"] = 0
        if "additional_jobs_meta" in kwargs:
            for key, val in kwargs["additional_jobs_meta"].items():
                job.meta[key] = val

        job.save_meta()

        queue = Queue("qcapp_queue", connection=rconn)
        queue.enqueue_job(job)

    return job

def job(job_id):
    with Redis.from_url(app.config["REDIS_URL"]) as rconn:
        queue = Queue("qcapp_queue", connection=rconn)
        job = queue.fetch_job(job_id)
        job.refresh()

    return job

def update_meta(stale_job, **kwargs):
    with Redis.from_url(app.config["REDIS_URL"]) as rconn:
        queue = Queue("qcapp_queue", connection=rconn)
        job = queue.fetch_job(stale_job.get_id())
        job.refresh()
        meta_dict = {**stale_job.meta, **job.meta, **kwargs}
        for key, val in meta_dict.items():
            job.meta[key] = val

        job.save_meta()

    return job