import jsonpickle import sqlite3 from rq import Queue from rq.job import Job from redis import Redis from flask import current_app as app def enqueue_job(delayed_fn: str, *args, **kwargs): """Add job to queue""" with Redis.from_url(app.config["REDIS_URL"]) as rconn: job = Job.create( delayed_fn, args, **{ key: val for key, val in kwargs.items() if key != "additional_jobs_meta" }, connection = rconn, timeout = "2h", result_ttl = "5h", failure_ttl = "5h" ) queue = Queue("qcapp_queue", connection=rconn) queue.enqueue_job(job) return job def job(job_id: str): "Retrieve the job" with Redis.from_url(app.config["REDIS_URL"]) as rconn: queue = Queue("qcapp_queue", connection=rconn) job = queue.fetch_job(job_id) job.refresh() return job def create_jobs_table(dbpath: str): """Create sqlite3 table to track job progress""" conn = sqlite3.connect(dbpath) cursor = conn.cursor() cursor.execute("CREATE TABLE IF NOT EXISTS jobs(job_id TEXT, job_meta TEXT)") cursor.close() conn.close() def retrieve_meta(conn, job_id: str): """Retrieve the job's metadata.""" meta = {} try: cursor = conn.cursor() cursor.execute( "SELECT * FROM jobs WHERE job_id=:job_id", {"job_id": job_id}) db_row = cursor.fetchone() # meta = json.loads(db_row [1]) if db_row else {} meta = jsonpickle.decode(db_row [1]) if db_row else {} except Exception as exc: cursor.close() raise exc finally: cursor.close() return meta def update_meta(conn, job_id: str, **meta): """Update the job's metadata.""" try: cursor = conn.cursor() old_meta = retrieve_meta(conn, job_id) meta = {**old_meta, **meta} query = "UPDATE jobs SET job_meta = :meta WHERE job_id = :job_id" if not bool(old_meta): query = "INSERT INTO jobs VALUES (:job_id, :meta)" cursor.execute( query, {"job_id": job_id, "meta": jsonpickle.encode(meta)}) conn.commit() except Exception as exc: cursor.close() raise exc finally: cursor.close() return meta