From 2abe10ea8ac059d7cab83895bb5d2ec6d4a6ce1c Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Thu, 19 May 2022 15:26:15 +0300 Subject: Update Web-UI: use new error collection paradigm - README.org: document how to run scripts manually - manifest.scm: remove python-rq as a dependency - qc_app/jobs.py: rework job launching and processing - qc_app/parse.py: use reworked job processing - qc_app/templates/job_progress.html: display progress correctly - qc_app/templates/parse_results.html: display final results - scripts/worker.py: new worker script --- qc_app/jobs.py | 96 ++++++++++++++-------------------------------------------- 1 file changed, 23 insertions(+), 73 deletions(-) (limited to 'qc_app/jobs.py') diff --git a/qc_app/jobs.py b/qc_app/jobs.py index afea419..4d3dba6 100644 --- a/qc_app/jobs.py +++ b/qc_app/jobs.py @@ -1,82 +1,32 @@ -import jsonpickle -import sqlite3 +import os +import shlex +import subprocess +from uuid import uuid4 -from rq import Queue -from rq.job import Job from redis import Redis -from flask import current_app as app -def enqueue_job(delayed_fn: str, *args, **kwargs): - """Add job to queue""" - with Redis.from_url(app.config["REDIS_URL"]) as rconn: - job = Job.create( - delayed_fn, args, **{ - key: val for key, val in kwargs.items() - if key != "additional_jobs_meta" - }, - connection = rconn, - timeout = "2h", - result_ttl = "5h", - failure_ttl = "5h" - ) +def error_filename(job_id, error_dir): + return f"{error_dir}/job_{job_id}.error" - queue = Queue("qcapp_queue", connection=rconn) - queue.enqueue_job(job) +def launch_job(redis_conn: Redis, filepath, filetype, redisurl, error_dir): + """Launch a job in the background""" + job_id = str(uuid4()) + command = [ + "python3", "-m" "scripts.worker", filetype, filepath, redisurl, job_id] + job = { + "job_id": job_id, "command": shlex.join(command), "status": "pending", + "filename": os.path.basename(filepath), "percent": 0 + } + redis_conn.hset(name=job["job_id"], mapping=job) - return job + if not os.path.exists(error_dir): + os.mkdir(error_dir) -def job(job_id: str): - "Retrieve the job" - with Redis.from_url(app.config["REDIS_URL"]) as rconn: - queue = Queue("qcapp_queue", connection=rconn) - job = queue.fetch_job(job_id) - job.refresh() + with open(error_filename(job_id, error_dir), "w") as errorfile: + subprocess.Popen(command, stderr=errorfile) return job -def create_jobs_table(dbpath: str): - """Create sqlite3 table to track job progress""" - conn = sqlite3.connect(dbpath) - cursor = conn.cursor() - cursor.execute("CREATE TABLE IF NOT EXISTS jobs(job_id TEXT, job_meta TEXT)") - cursor.close() - conn.close() - -def retrieve_meta(conn, job_id: str): - """Retrieve the job's metadata.""" - meta = {} - try: - cursor = conn.cursor() - cursor.execute( - "SELECT * FROM jobs WHERE job_id=:job_id", {"job_id": job_id}) - db_row = cursor.fetchone() - # meta = json.loads(db_row [1]) if db_row else {} - meta = jsonpickle.decode(db_row [1]) if db_row else {} - except Exception as exc: - cursor.close() - raise exc - finally: - cursor.close() - - return meta - -def update_meta(conn, job_id: str, **meta): - """Update the job's metadata.""" - try: - cursor = conn.cursor() - old_meta = retrieve_meta(conn, job_id) - meta = {**old_meta, **meta} - query = "UPDATE jobs SET job_meta = :meta WHERE job_id = :job_id" - if not bool(old_meta): - query = "INSERT INTO jobs VALUES (:job_id, :meta)" - - cursor.execute( - query, {"job_id": job_id, "meta": jsonpickle.encode(meta)}) - conn.commit() - except Exception as exc: - cursor.close() - raise exc - finally: - cursor.close() - - return meta +def job(redis_conn, job_id: str): + "Retrieve the job" + return redis_conn.hgetall(job_id) -- cgit v1.2.3