1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
|
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
# pylint: disable=protected-access
import json
import logging
import os
import re
import subprocess
import sys
import time
from typing import Any, Dict, Iterable, List, Optional, TextIO, Union
from azure.ai.ml._artifacts._artifact_utilities import get_datastore_info, list_logs_in_datastore
from azure.ai.ml._restclient.runhistory.models import Run, RunDetails, TypedAssetReference
from azure.ai.ml._restclient.v2022_02_01_preview.models import DataType
from azure.ai.ml._restclient.v2022_02_01_preview.models import JobType as RestJobType
from azure.ai.ml._restclient.v2022_02_01_preview.models import ModelType
from azure.ai.ml._restclient.v2022_10_01.models import JobBase
from azure.ai.ml._utils._http_utils import HttpPipeline
from azure.ai.ml._utils.utils import create_requests_pipeline_with_retry, download_text_from_url
from azure.ai.ml.constants._common import GitProperties
from azure.ai.ml.constants._job.job import JobLogPattern, JobType
from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, JobException
from azure.ai.ml.operations._dataset_dataplane_operations import DatasetDataplaneOperations
from azure.ai.ml.operations._datastore_operations import DatastoreOperations
from azure.ai.ml.operations._model_dataplane_operations import ModelDataplaneOperations
from azure.ai.ml.operations._run_history_constants import JobStatus, RunHistoryConstants
from azure.ai.ml.operations._run_operations import RunOperations
STATUS_KEY = "status"
module_logger = logging.getLogger(__name__)
def _get_sorted_filtered_logs(
logs_iterable: Iterable[str],
job_type: str,
processed_logs: Optional[Dict[str, int]] = None,
only_streamable: bool = True,
) -> List[str]:
"""Filters log file names, sorts, and returns list starting with where we left off last iteration.
:param logs_iterable: An iterable of log paths.
:type logs_iterable: Iterable[str]
:param job_type: the job type to filter log files
:type job_type: str
:param processed_logs: dictionary tracking the state of how many lines of each file have been written out
:type processed_logs: dict[str, int]
:param only_streamable: Whether to only get streamable logs
:type only_streamable: bool
:return: List of logs to continue from
:rtype: list[str]
"""
processed_logs = processed_logs if processed_logs else {}
# First, attempt to read logs in new Common Runtime form
output_logs_pattern = (
JobLogPattern.COMMON_RUNTIME_STREAM_LOG_PATTERN
if only_streamable
else JobLogPattern.COMMON_RUNTIME_ALL_USER_LOG_PATTERN
)
logs = list(logs_iterable)
filtered_logs = [x for x in logs if re.match(output_logs_pattern, x)]
# fall back to legacy log format
if filtered_logs is None or len(filtered_logs) == 0:
job_type = job_type.lower()
if job_type in JobType.COMMAND:
output_logs_pattern = JobLogPattern.COMMAND_JOB_LOG_PATTERN
elif job_type in JobType.PIPELINE:
output_logs_pattern = JobLogPattern.PIPELINE_JOB_LOG_PATTERN
elif job_type in JobType.SWEEP:
output_logs_pattern = JobLogPattern.SWEEP_JOB_LOG_PATTERN
filtered_logs = [x for x in logs if re.match(output_logs_pattern, x)]
filtered_logs.sort()
previously_printed_index = 0
for i, v in enumerate(filtered_logs):
if processed_logs.get(v):
previously_printed_index = i
else:
break
# Slice inclusive from the last printed log (can be updated before printing new files)
return filtered_logs[previously_printed_index:]
def _incremental_print(log: str, processed_logs: Dict[str, int], current_log_name: str, fileout: TextIO) -> None:
"""Incremental print.
:param log:
:type log: str
:param processed_logs: The record of how many lines have been written for each log file
:type processed_logs: dict[str, int]
:param current_log_name: the file name being read out, used in header writing and accessing processed_logs
:type current_log_name: str
:param fileout:
:type fileout: TestIOWrapper
"""
lines = log.splitlines()
doc_length = len(lines)
if doc_length == 0:
# If a file is empty, skip writing out.
# This addresses issue where batch endpoint jobs can create log files before they are needed.
return
previous_printed_lines = processed_logs.get(current_log_name, 0)
# when a new log is first being written to console, print spacing and label
if previous_printed_lines == 0:
fileout.write("\n")
fileout.write("Streaming " + current_log_name + "\n")
fileout.write("=" * (len(current_log_name) + 10) + "\n")
fileout.write("\n")
# otherwise, continue to log the file where we left off
for line in lines[previous_printed_lines:]:
fileout.write(line + "\n")
# update state to record number of lines written for this log file
processed_logs[current_log_name] = doc_length
def _get_last_log_primary_instance(logs: List) -> Any:
"""Return last log for primary instance.
:param logs:
:type logs: builtin.list
:return: Returns the last log primary instance.
:rtype:
"""
primary_ranks = ["rank_0", "worker_0"]
rank_match_re = re.compile(r"(.*)_(.*?_.*?)\.txt")
last_log_name = logs[-1]
last_log_match = rank_match_re.match(last_log_name)
if not last_log_match:
return last_log_name
last_log_prefix = last_log_match.group(1)
matching_logs = sorted(filter(lambda x: x.startswith(last_log_prefix), logs))
# we have some specific ranks that denote the primary, use those if found
for log_name in matching_logs:
match = rank_match_re.match(log_name)
if not match:
continue
if match.group(2) in primary_ranks:
return log_name
# no definitively primary instance, just return the highest sorted
return matching_logs[0]
def _wait_before_polling(current_seconds: float) -> int:
if current_seconds < 0:
msg = "current_seconds must be positive"
raise JobException(
message=msg,
target=ErrorTarget.JOB,
no_personal_data_message=msg,
error_category=ErrorCategory.USER_ERROR,
)
import math
# Sigmoid that tapers off near the_get_logs max at ~ 3 min
duration = int(
int(RunHistoryConstants._WAIT_COMPLETION_POLLING_INTERVAL_MAX) / (1.0 + 100 * math.exp(-current_seconds / 20.0))
)
return max(int(RunHistoryConstants._WAIT_COMPLETION_POLLING_INTERVAL_MIN), duration)
def list_logs(run_operations: RunOperations, job_resource: JobBase) -> Dict:
details: RunDetails = run_operations.get_run_details(job_resource.name)
logs_dict = details.log_files
keys = _get_sorted_filtered_logs(logs_dict, job_resource.properties.job_type)
return {key: logs_dict[key] for key in keys}
# pylint: disable=too-many-statements,too-many-locals
def stream_logs_until_completion(
run_operations: RunOperations,
job_resource: JobBase,
datastore_operations: Optional[DatastoreOperations] = None,
raise_exception_on_failed_job: bool = True,
*,
requests_pipeline: HttpPipeline
) -> None:
"""Stream the experiment run output to the specified file handle. By default the the file handle points to stdout.
:param run_operations: The run history operations class.
:type run_operations: RunOperations
:param job_resource: The job to stream
:type job_resource: JobBase
:param datastore_operations: Optional, the datastore operations class, used to get logs from datastore
:type datastore_operations: Optional[DatastoreOperations]
:param raise_exception_on_failed_job: Should this method fail if job fails
:type raise_exception_on_failed_job: Boolean
:keyword requests_pipeline: The HTTP pipeline to use for requests.
:type requests_pipeline: ~azure.ai.ml._utils._http_utils.HttpPipeline
:return:
:rtype: None
"""
job_type = job_resource.properties.job_type
job_name = job_resource.name
studio_endpoint = job_resource.properties.services.get("Studio", None)
studio_endpoint = studio_endpoint.endpoint if studio_endpoint else None
# Feature store jobs should be linked to the Feature Store Workspace UI.
# Todo: Consolidate this logic to service side
if "azureml.FeatureStoreJobType" in job_resource.properties.properties:
url_format = (
"https://ml.azure.com/featureStore/{fs_name}/featureSets/{fset_name}/{fset_version}/matJobs/"
"jobs/{run_id}?wsid=/subscriptions/{fs_sub_id}/resourceGroups/{fs_rg_name}/providers/"
"Microsoft.MachineLearningServices/workspaces/{fs_name}"
)
studio_endpoint = url_format.format(
fs_name=job_resource.properties.properties["azureml.FeatureStoreName"],
fs_sub_id=run_operations._subscription_id,
fs_rg_name=run_operations._resource_group_name,
fset_name=job_resource.properties.properties["azureml.FeatureSetName"],
fset_version=job_resource.properties.properties["azureml.FeatureSetVersion"],
run_id=job_name,
)
elif "FeatureStoreJobType" in job_resource.properties.properties:
url_format = (
"https://ml.azure.com/featureStore/{fs_name}/featureSets/{fset_name}/{fset_version}/matJobs/"
"jobs/{run_id}?wsid=/subscriptions/{fs_sub_id}/resourceGroups/{fs_rg_name}/providers/"
"Microsoft.MachineLearningServices/workspaces/{fs_name}"
)
studio_endpoint = url_format.format(
fs_name=job_resource.properties.properties["FeatureStoreName"],
fs_sub_id=run_operations._subscription_id,
fs_rg_name=run_operations._resource_group_name,
fset_name=job_resource.properties.properties["FeatureSetName"],
fset_version=job_resource.properties.properties["FeatureSetVersion"],
run_id=job_name,
)
file_handle = sys.stdout
ds_properties = None
prefix = None
if (
hasattr(job_resource.properties, "outputs")
and job_resource.properties.job_type != RestJobType.AUTO_ML
and datastore_operations
):
# Get default output location
default_output = (
job_resource.properties.outputs.get("default", None) if job_resource.properties.outputs else None
)
is_uri_folder = default_output and default_output.job_output_type == DataType.URI_FOLDER
if is_uri_folder:
output_uri = default_output.uri # type: ignore
# Parse the uri format
output_uri = output_uri.split("datastores/")[1]
datastore_name, prefix = output_uri.split("/", 1)
ds_properties = get_datastore_info(datastore_operations, datastore_name)
try:
file_handle.write("RunId: {}\n".format(job_name))
file_handle.write("Web View: {}\n".format(studio_endpoint))
_current_details: RunDetails = run_operations.get_run_details(job_name)
processed_logs: Dict = {}
poll_start_time = time.time()
pipeline_with_retries = create_requests_pipeline_with_retry(requests_pipeline=requests_pipeline)
while (
_current_details.status in RunHistoryConstants.IN_PROGRESS_STATUSES
or _current_details.status == JobStatus.FINALIZING
):
file_handle.flush()
time.sleep(_wait_before_polling(time.time() - poll_start_time))
_current_details = run_operations.get_run_details(job_name) # TODO use FileWatcher
if job_type.lower() in JobType.PIPELINE:
legacy_folder_name = "/logs/azureml/"
else:
legacy_folder_name = "/azureml-logs/"
_current_logs_dict = (
list_logs_in_datastore(
ds_properties,
prefix=str(prefix),
legacy_log_folder_name=legacy_folder_name,
)
if ds_properties is not None
else _current_details.log_files
)
# Get the list of new logs available after filtering out the processed ones
available_logs = _get_sorted_filtered_logs(_current_logs_dict, job_type, processed_logs)
content = ""
for current_log in available_logs:
content = download_text_from_url(
_current_logs_dict[current_log],
pipeline_with_retries,
timeout=RunHistoryConstants._DEFAULT_GET_CONTENT_TIMEOUT,
)
_incremental_print(content, processed_logs, current_log, file_handle)
# TODO: Temporary solution to wait for all the logs to be printed in the finalizing state.
if (
_current_details.status not in RunHistoryConstants.IN_PROGRESS_STATUSES
and _current_details.status == JobStatus.FINALIZING
and "The activity completed successfully. Finalizing run..." in content
):
break
file_handle.write("\n")
file_handle.write("Execution Summary\n")
file_handle.write("=================\n")
file_handle.write("RunId: {}\n".format(job_name))
file_handle.write("Web View: {}\n".format(studio_endpoint))
warnings = _current_details.warnings
if warnings:
messages = [x.message for x in warnings if x.message]
if len(messages) > 0:
file_handle.write("\nWarnings:\n")
for message in messages:
file_handle.write(message + "\n")
file_handle.write("\n")
if _current_details.status == JobStatus.FAILED:
error = (
_current_details.error.as_dict()
if _current_details.error
else "Detailed error not set on the Run. Please check the logs for details."
)
# If we are raising the error later on, so we don't double print.
if not raise_exception_on_failed_job:
file_handle.write("\nError:\n")
file_handle.write(json.dumps(error, indent=4))
file_handle.write("\n")
else:
raise JobException(
message="Exception : \n {} ".format(json.dumps(error, indent=4)),
target=ErrorTarget.JOB,
no_personal_data_message="Exception raised on failed job.",
error_category=ErrorCategory.SYSTEM_ERROR,
)
file_handle.write("\n")
file_handle.flush()
except KeyboardInterrupt as e:
error_message = (
"The output streaming for the run interrupted.\n"
"But the run is still executing on the compute target. \n"
"Details for canceling the run can be found here: "
"https://aka.ms/aml-docs-cancel-run"
)
raise JobException(
message=error_message,
target=ErrorTarget.JOB,
no_personal_data_message=error_message,
error_category=ErrorCategory.USER_ERROR,
) from e
def get_git_properties() -> Dict[str, str]:
"""Gather Git tracking info from the local environment.
:return: Properties dictionary.
:rtype: dict
"""
def _clean_git_property_bool(value: Any) -> Optional[bool]:
if value is None:
return None
return str(value).strip().lower() in ["true", "1"]
def _clean_git_property_str(value: Any) -> Optional[str]:
if value is None:
return None
return str(value).strip() or None
def _run_git_cmd(args: Iterable[str]) -> Optional[str]:
"""Runs git with the provided arguments
:param args: A iterable of arguments for a git command. Should not include leading "git"
:type args: Iterable[str]
:return: The output of running git with arguments, or None if it fails.
:rtype: Optional[str]
"""
try:
with open(os.devnull, "wb") as devnull:
return subprocess.check_output(["git"] + list(args), stderr=devnull).decode()
except KeyboardInterrupt:
raise
except BaseException: # pylint: disable=W0718
return None
# Check for environment variable overrides.
repository_uri = os.environ.get(GitProperties.ENV_REPOSITORY_URI, None)
branch = os.environ.get(GitProperties.ENV_BRANCH, None)
commit = os.environ.get(GitProperties.ENV_COMMIT, None)
dirty: Optional[Union[str, bool]] = os.environ.get(GitProperties.ENV_DIRTY, None)
build_id = os.environ.get(GitProperties.ENV_BUILD_ID, None)
build_uri = os.environ.get(GitProperties.ENV_BUILD_URI, None)
is_git_repo = _run_git_cmd(["rev-parse", "--is-inside-work-tree"])
if _clean_git_property_bool(is_git_repo):
repository_uri = repository_uri or _run_git_cmd(["ls-remote", "--get-url"])
branch = branch or _run_git_cmd(["symbolic-ref", "--short", "HEAD"])
commit = commit or _run_git_cmd(["rev-parse", "HEAD"])
dirty = dirty or _run_git_cmd(["status", "--porcelain", "."]) and True
# Parsing logic.
repository_uri = _clean_git_property_str(repository_uri)
commit = _clean_git_property_str(commit)
branch = _clean_git_property_str(branch)
dirty = _clean_git_property_bool(dirty)
build_id = _clean_git_property_str(build_id)
build_uri = _clean_git_property_str(build_uri)
# Return with appropriate labels.
properties = {}
if repository_uri is not None:
properties[GitProperties.PROP_MLFLOW_GIT_REPO_URL] = repository_uri
if branch is not None:
properties[GitProperties.PROP_MLFLOW_GIT_BRANCH] = branch
if commit is not None:
properties[GitProperties.PROP_MLFLOW_GIT_COMMIT] = commit
if dirty is not None:
properties[GitProperties.PROP_DIRTY] = str(dirty)
if build_id is not None:
properties[GitProperties.PROP_BUILD_ID] = build_id
if build_uri is not None:
properties[GitProperties.PROP_BUILD_URI] = build_uri
return properties
def get_job_output_uris_from_dataplane(
job_name: Optional[str],
run_operations: RunOperations,
dataset_dataplane_operations: DatasetDataplaneOperations,
model_dataplane_operations: Optional[ModelDataplaneOperations],
output_names: Optional[Union[Iterable[str], str]] = None,
) -> Dict[str, str]:
"""Returns the output path for the given output in cloud storage of the given job.
If no output names are given, the output paths for all outputs will be returned.
URIs obtained from the service will be in the long-form azureml:// format.
For example:
azureml://subscriptions/<sub>/resource[gG]roups/<rg_name>/workspaces/<ws_name>/datastores/<ds_name>/paths/<ds_path>
:param job_name: The job name
:type job_name: str
:param run_operations: The RunOperations used to fetch run data for the job
:type run_operations: RunOperations
:param dataset_dataplane_operations: The DatasetDataplaneOperations used to fetch dataset uris
:type dataset_dataplane_operations: DatasetDataplaneOperations
:param model_dataplane_operations: The ModelDataplaneOperations used to fetch dataset uris
:type model_dataplane_operations: ModelDataplaneOperations
:param output_names: The output name(s) to fetch. If not specified, retrieves all.
:type output_names: Optional[Union[Iterable[str] str]]
:return: Dictionary mapping user-defined output name to output uri
:rtype: Dict[str, str]
"""
run_metadata: Run = run_operations.get_run_data(str(job_name)).run_metadata
run_outputs: Dict[str, TypedAssetReference] = run_metadata.outputs or {}
# Create a reverse mapping from internal asset id to user-defined output name
asset_id_to_output_name = {v.asset_id: k for k, v in run_outputs.items()}
if not output_names:
# Assume all outputs are needed if no output name is provided
output_names = run_outputs.keys()
else:
if isinstance(output_names, str):
output_names = [output_names]
output_names = [o for o in output_names if o in run_outputs]
# Collect all output ids that correspond to data assets
dataset_ids = [
run_outputs[output_name].asset_id
for output_name in output_names
if run_outputs[output_name].type in [o.value for o in DataType]
]
# Collect all output ids that correspond to models
model_ids = [
run_outputs[output_name].asset_id
for output_name in output_names
if run_outputs[output_name].type in [o.value for o in ModelType]
]
output_name_to_dataset_uri = {}
if dataset_ids:
# Get the data paths from the service
dataset_uris = dataset_dataplane_operations.get_batch_dataset_uris(dataset_ids)
# Map the user-defined output name to the output uri
# The service returns a mapping from internal asset id to output metadata, so we need the reverse map
# defined above to get the user-defined output name from the internal asset id.
output_name_to_dataset_uri = {asset_id_to_output_name[k]: v.uri for k, v in dataset_uris.values.items()}
# This is a repeat of the logic above for models.
output_name_to_model_uri = {}
if model_ids:
model_uris = (
model_dataplane_operations.get_batch_model_uris(model_ids)
if model_dataplane_operations is not None
else None
)
output_name_to_model_uri = {
asset_id_to_output_name[k]: v.path for k, v in model_uris.values.items() # type: ignore
}
return {**output_name_to_dataset_uri, **output_name_to_model_uri}
|