from rq import Queue from rq.job import Job from redis import Redis from flask import current_app as app def enqueue_job(delayed_fn, *args, **kwargs): """Add job to queue""" with Redis.from_url(app.config["REDIS_URL"]) as rconn: job = Job.create( delayed_fn, args, **{ key: val for key, val in kwargs.items() if key != "additional_jobs_meta" }, connection = rconn) job.meta["status"] = "enqueued" job.meta["progress"] = 0 if "additional_jobs_meta" in kwargs: for key, val in kwargs["additional_jobs_meta"].items(): job.meta[key] = val job.save_meta() queue = Queue("qcapp_queue", connection=rconn) queue.enqueue_job(job) return job def job(job_id): "Retrieve the job" with Redis.from_url(app.config["REDIS_URL"]) as rconn: queue = Queue("qcapp_queue", connection=rconn) job = queue.fetch_job(job_id) job.refresh() return job def update_meta(rconn, stale_job, **kwargs): """Update the job's metadata.""" job = Job.fetch(stale_job.get_id(), connection=rconn) job.refresh() meta_dict = {**stale_job.meta, **job.meta, **kwargs} for key, val in meta_dict.items(): job.meta[key] = val job.save_meta() return job