aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gn_libs/jobs/__init__.py6
-rw-r--r--gn_libs/jobs/jobs.py23
2 files changed, 28 insertions, 1 deletions
diff --git a/gn_libs/jobs/__init__.py b/gn_libs/jobs/__init__.py
index a1a5dd6..6f400ef 100644
--- a/gn_libs/jobs/__init__.py
+++ b/gn_libs/jobs/__init__.py
@@ -1,5 +1,9 @@
from .migrations import run_migrations
-from .jobs import job, launch_job, initialise_job
+from .jobs import (job,
+ launch_job,
+ initialise_job,
+ push_to_stream,
+ update_metadata)
def init_app(flask_app):
"""Initialise the migrations for flask"""
diff --git a/gn_libs/jobs/jobs.py b/gn_libs/jobs/jobs.py
index e2a6b00..a31c036 100644
--- a/gn_libs/jobs/jobs.py
+++ b/gn_libs/jobs/jobs.py
@@ -134,3 +134,26 @@ def update_metadata(conn: DbConnection, job_id: Union[str, uuid.UUID], key: str,
"key": key,
"value": value
})
+
+
+def push_to_stream(conn: DbConnection, job_id: Union[str, uuid.UUID], stream_name: str, content: str):
+ """Initialise, and keep adding content to the stream from the provided content."""
+ with _cursor(conn) as cursor:
+ cursor.execute("SELECT * FROM jobs_standard_outputs "
+ "WHERE job_id=:job_id AND output_stream=:stream",
+ {
+ "job_id": str(job_id),
+ "stream": stream_name
+ })
+ result = cursor.fetchone()
+ new_content = ((bool(result) and result["value"]) or "") + content
+ cursor.execute(
+ "INSERT INTO jobs_standard_outputs(job_id, output_stream, value) "
+ "VALUES(:job_id, :stream, :content) "
+ "ON CONFLICT (job_id, output_stream) DO UPDATE "
+ "SET value=:content",
+ {
+ "job_id": str(job_id),
+ "stream": stream_name,
+ "content": new_content
+ })