diff options
author | Frederick Muriuki Muriithi | 2022-05-19 15:26:15 +0300 |
---|---|---|
committer | Frederick Muriuki Muriithi | 2022-05-19 15:30:20 +0300 |
commit | 2abe10ea8ac059d7cab83895bb5d2ec6d4a6ce1c (patch) | |
tree | 9e0fdcc010925a9f6d5674d41991b6185aea54e3 /qc_app/jobs.py | |
parent | 27f6e9e28f2a3244bdd00336cf918de97b2ceed6 (diff) | |
download | gn-uploader-2abe10ea8ac059d7cab83895bb5d2ec6d4a6ce1c.tar.gz |
Update Web-UI: use new error collection paradigm
- README.org: document how to run scripts manually
- manifest.scm: remove python-rq as a dependency
- qc_app/jobs.py: rework job launching and processing
- qc_app/parse.py: use reworked job processing
- qc_app/templates/job_progress.html: display progress correctly
- qc_app/templates/parse_results.html: display final results
- scripts/worker.py: new worker script
Diffstat (limited to 'qc_app/jobs.py')
-rw-r--r-- | qc_app/jobs.py | 96 |
1 files changed, 23 insertions, 73 deletions
diff --git a/qc_app/jobs.py b/qc_app/jobs.py index afea419..4d3dba6 100644 --- a/qc_app/jobs.py +++ b/qc_app/jobs.py @@ -1,82 +1,32 @@ -import jsonpickle -import sqlite3 +import os +import shlex +import subprocess +from uuid import uuid4 -from rq import Queue -from rq.job import Job from redis import Redis -from flask import current_app as app -def enqueue_job(delayed_fn: str, *args, **kwargs): - """Add job to queue""" - with Redis.from_url(app.config["REDIS_URL"]) as rconn: - job = Job.create( - delayed_fn, args, **{ - key: val for key, val in kwargs.items() - if key != "additional_jobs_meta" - }, - connection = rconn, - timeout = "2h", - result_ttl = "5h", - failure_ttl = "5h" - ) +def error_filename(job_id, error_dir): + return f"{error_dir}/job_{job_id}.error" - queue = Queue("qcapp_queue", connection=rconn) - queue.enqueue_job(job) +def launch_job(redis_conn: Redis, filepath, filetype, redisurl, error_dir): + """Launch a job in the background""" + job_id = str(uuid4()) + command = [ + "python3", "-m" "scripts.worker", filetype, filepath, redisurl, job_id] + job = { + "job_id": job_id, "command": shlex.join(command), "status": "pending", + "filename": os.path.basename(filepath), "percent": 0 + } + redis_conn.hset(name=job["job_id"], mapping=job) - return job + if not os.path.exists(error_dir): + os.mkdir(error_dir) -def job(job_id: str): - "Retrieve the job" - with Redis.from_url(app.config["REDIS_URL"]) as rconn: - queue = Queue("qcapp_queue", connection=rconn) - job = queue.fetch_job(job_id) - job.refresh() + with open(error_filename(job_id, error_dir), "w") as errorfile: + subprocess.Popen(command, stderr=errorfile) return job -def create_jobs_table(dbpath: str): - """Create sqlite3 table to track job progress""" - conn = sqlite3.connect(dbpath) - cursor = conn.cursor() - cursor.execute("CREATE TABLE IF NOT EXISTS jobs(job_id TEXT, job_meta TEXT)") - cursor.close() - conn.close() - -def retrieve_meta(conn, job_id: str): - """Retrieve the job's metadata.""" - meta = {} - try: - cursor = conn.cursor() - cursor.execute( - "SELECT * FROM jobs WHERE job_id=:job_id", {"job_id": job_id}) - db_row = cursor.fetchone() - # meta = json.loads(db_row [1]) if db_row else {} - meta = jsonpickle.decode(db_row [1]) if db_row else {} - except Exception as exc: - cursor.close() - raise exc - finally: - cursor.close() - - return meta - -def update_meta(conn, job_id: str, **meta): - """Update the job's metadata.""" - try: - cursor = conn.cursor() - old_meta = retrieve_meta(conn, job_id) - meta = {**old_meta, **meta} - query = "UPDATE jobs SET job_meta = :meta WHERE job_id = :job_id" - if not bool(old_meta): - query = "INSERT INTO jobs VALUES (:job_id, :meta)" - - cursor.execute( - query, {"job_id": job_id, "meta": jsonpickle.encode(meta)}) - conn.commit() - except Exception as exc: - cursor.close() - raise exc - finally: - cursor.close() - - return meta +def job(redis_conn, job_id: str): + "Retrieve the job" + return redis_conn.hgetall(job_id) |