diff options
-rw-r--r-- | gn_libs/jobs/__init__.py | 6 | ||||
-rw-r--r-- | gn_libs/jobs/jobs.py | 23 |
2 files changed, 28 insertions, 1 deletions
diff --git a/gn_libs/jobs/__init__.py b/gn_libs/jobs/__init__.py index a1a5dd6..6f400ef 100644 --- a/gn_libs/jobs/__init__.py +++ b/gn_libs/jobs/__init__.py @@ -1,5 +1,9 @@ from .migrations import run_migrations -from .jobs import job, launch_job, initialise_job +from .jobs import (job, + launch_job, + initialise_job, + push_to_stream, + update_metadata) def init_app(flask_app): """Initialise the migrations for flask""" diff --git a/gn_libs/jobs/jobs.py b/gn_libs/jobs/jobs.py index e2a6b00..a31c036 100644 --- a/gn_libs/jobs/jobs.py +++ b/gn_libs/jobs/jobs.py @@ -134,3 +134,26 @@ def update_metadata(conn: DbConnection, job_id: Union[str, uuid.UUID], key: str, "key": key, "value": value }) + + +def push_to_stream(conn: DbConnection, job_id: Union[str, uuid.UUID], stream_name: str, content: str): + """Initialise, and keep adding content to the stream from the provided content.""" + with _cursor(conn) as cursor: + cursor.execute("SELECT * FROM jobs_standard_outputs " + "WHERE job_id=:job_id AND output_stream=:stream", + { + "job_id": str(job_id), + "stream": stream_name + }) + result = cursor.fetchone() + new_content = ((bool(result) and result["value"]) or "") + content + cursor.execute( + "INSERT INTO jobs_standard_outputs(job_id, output_stream, value) " + "VALUES(:job_id, :stream, :content) " + "ON CONFLICT (job_id, output_stream) DO UPDATE " + "SET value=:content", + { + "job_id": str(job_id), + "stream": stream_name, + "content": new_content + }) |