about summary refs log tree commit diff
path: root/scripts
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2022-08-05 10:10:56 +0300
committerFrederick Muriuki Muriithi2022-08-05 10:10:56 +0300
commit19c376c4b60592f4bba0e26952faa3a71b6f5641 (patch)
tree0916ef296a57bb450102093429422953d92c009e /scripts
parent3da6848381b6103fbb58eeab8d7051cba0bded58 (diff)
downloadgn-uploader-19c376c4b60592f4bba0e26952faa3a71b6f5641.tar.gz
Update job status. Display stdout and stderr outputs
* Display the status of the job, as it is running
* Display STDERR output if an error occurs
* Display STDOUT output as job is running and on successful completion
  of the job
Diffstat (limited to 'scripts')
-rw-r--r--scripts/insert_data.py5
-rw-r--r--scripts/worker.py15
2 files changed, 15 insertions, 5 deletions
diff --git a/scripts/insert_data.py b/scripts/insert_data.py
index 6491b06..7bf20f1 100644
--- a/scripts/insert_data.py
+++ b/scripts/insert_data.py
@@ -122,7 +122,9 @@ def __format_query__(query, params):
         for param in params)
     values_str = ", ".join(
         f"('{__param_str__(value_tup)}')" for value_tup in values)
-    return f"{query[:idx]} VALUES{values_str};"
+    insert_str = query[:idx].replace(
+        "INSERT INTO ", "INSERT INTO\n\t")
+    return f"{insert_str}\nVALUES\n\t{values_str};"
 
 def insert_means(
         filepath: str, speciesid: int, datasetid: int, dbconn: mdb.Connection,
@@ -149,6 +151,7 @@ def insert_means(
             if not bool(means):
                 break
             print(__format_query__(means_query, means))
+            print()
             print(__format_query__(xref_query, means))
             cursor.executemany(means_query, means)
             cursor.executemany(xref_query, means)
diff --git a/scripts/worker.py b/scripts/worker.py
index 03751d2..391f522 100644
--- a/scripts/worker.py
+++ b/scripts/worker.py
@@ -27,13 +27,16 @@ def parse_args():
 
     return args
 
-def update_stdout_stderr(bytes_read, stream: str, rconn, job_id):
+def update_stdout_stderr(rconn, job_id, bytes_read, stream: str):
     "Update the stdout/stderr keys according to the value of `stream`."
     job = jobs.job(rconn, job_id)
     contents = job.get(stream, '')
     new_contents = contents + bytes_read.decode("utf-8")
     rconn.hset(name=job_id, key=stream, value=new_contents)
 
+def update_status(rconn, job_id, status):
+    rconn.hset(name=job_id, key="status", value=status)
+
 def run_job(job, rconn):
     "Run the actual job."
     job_id = job["job_id"]
@@ -45,18 +48,22 @@ def run_job(job, rconn):
                         shlex.split(job["command"]), stdout=subprocess.PIPE,
                         stderr=tmpfl) as process:
                     while process.poll() is None:
+                        update_status(rconn, job_id, "running")
                         update_stdout_stderr(
-                            process.stdout.read1(), "stdout", rconn, job_id)
+                            rconn, job_id, process.stdout.read1(), "stdout")
                         sleep(1)
 
             with open(stderrpath, "rb") as stderr:
-                update_stdout_stderr(stderr.read(), "stderr", rconn, job_id)
+                stderr_content = stderr.read()
+                update_stdout_stderr(rconn, job_id, stderr_content, "stderr")
+                update_status(rconn, job_id, ("error" if bool(stderr_content) else "success"))
 
             os.remove(stderrpath)
         return process.poll()
     except Exception as exc:# pylint: disable=[broad-except,unused-variable]
+        update_status(rconn, job_id, "error")
         update_stdout_stderr(
-            traceback.format_exc().encode("utf-8"), "stderr", rconn, job_id)
+            rconn, job_id, traceback.format_exc().encode("utf-8"), "stderr")
         print(traceback.format_exc(), file=sys.stderr)
         sys.exit(4)