From 19c376c4b60592f4bba0e26952faa3a71b6f5641 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Fri, 5 Aug 2022 10:10:56 +0300 Subject: Update job status. Display stdout and stderr outputs * Display the status of the job, as it is running * Display STDERR output if an error occurs * Display STDOUT output as job is running and on successful completion of the job --- scripts/insert_data.py | 5 ++++- scripts/worker.py | 15 +++++++++++---- 2 files changed, 15 insertions(+), 5 deletions(-) (limited to 'scripts') diff --git a/scripts/insert_data.py b/scripts/insert_data.py index 6491b06..7bf20f1 100644 --- a/scripts/insert_data.py +++ b/scripts/insert_data.py @@ -122,7 +122,9 @@ def __format_query__(query, params): for param in params) values_str = ", ".join( f"('{__param_str__(value_tup)}')" for value_tup in values) - return f"{query[:idx]} VALUES{values_str};" + insert_str = query[:idx].replace( + "INSERT INTO ", "INSERT INTO\n\t") + return f"{insert_str}\nVALUES\n\t{values_str};" def insert_means( filepath: str, speciesid: int, datasetid: int, dbconn: mdb.Connection, @@ -149,6 +151,7 @@ def insert_means( if not bool(means): break print(__format_query__(means_query, means)) + print() print(__format_query__(xref_query, means)) cursor.executemany(means_query, means) cursor.executemany(xref_query, means) diff --git a/scripts/worker.py b/scripts/worker.py index 03751d2..391f522 100644 --- a/scripts/worker.py +++ b/scripts/worker.py @@ -27,13 +27,16 @@ def parse_args(): return args -def update_stdout_stderr(bytes_read, stream: str, rconn, job_id): +def update_stdout_stderr(rconn, job_id, bytes_read, stream: str): "Update the stdout/stderr keys according to the value of `stream`." job = jobs.job(rconn, job_id) contents = job.get(stream, '') new_contents = contents + bytes_read.decode("utf-8") rconn.hset(name=job_id, key=stream, value=new_contents) +def update_status(rconn, job_id, status): + rconn.hset(name=job_id, key="status", value=status) + def run_job(job, rconn): "Run the actual job." job_id = job["job_id"] @@ -45,18 +48,22 @@ def run_job(job, rconn): shlex.split(job["command"]), stdout=subprocess.PIPE, stderr=tmpfl) as process: while process.poll() is None: + update_status(rconn, job_id, "running") update_stdout_stderr( - process.stdout.read1(), "stdout", rconn, job_id) + rconn, job_id, process.stdout.read1(), "stdout") sleep(1) with open(stderrpath, "rb") as stderr: - update_stdout_stderr(stderr.read(), "stderr", rconn, job_id) + stderr_content = stderr.read() + update_stdout_stderr(rconn, job_id, stderr_content, "stderr") + update_status(rconn, job_id, ("error" if bool(stderr_content) else "success")) os.remove(stderrpath) return process.poll() except Exception as exc:# pylint: disable=[broad-except,unused-variable] + update_status(rconn, job_id, "error") update_stdout_stderr( - traceback.format_exc().encode("utf-8"), "stderr", rconn, job_id) + rconn, job_id, traceback.format_exc().encode("utf-8"), "stderr") print(traceback.format_exc(), file=sys.stderr) sys.exit(4) -- cgit v1.2.3