diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/_internal')
27 files changed, 3118 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/__init__.py new file mode 100644 index 00000000..14fbaf87 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/__init__.py @@ -0,0 +1,53 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from ._setup import enable_internal_components_in_pipeline +from .entities import ( + Ae365exepool, + AISuperComputerConfiguration, + AISuperComputerScalePolicy, + AISuperComputerStorageReferenceConfiguration, + Command, + DataTransfer, + Distributed, + HDInsight, + Hemera, + InternalInput, + ITPConfiguration, + ITPInteractiveConfiguration, + ITPPriorityConfiguration, + ITPResourceConfiguration, + ITPRetrySettings, + Parallel, + Pipeline, + Scope, + Starlite, + TargetSelector, +) + +# enable internal components if users has imported this module directly +enable_internal_components_in_pipeline() + +__all__ = [ + "TargetSelector", + "ITPInteractiveConfiguration", + "ITPPriorityConfiguration", + "ITPResourceConfiguration", + "ITPRetrySettings", + "ITPConfiguration", + "AISuperComputerConfiguration", + "AISuperComputerScalePolicy", + "AISuperComputerStorageReferenceConfiguration", + "Command", + "Distributed", + "Parallel", + "Scope", + "DataTransfer", + "Ae365exepool", + "HDInsight", + "Starlite", + "Pipeline", + "Hemera", + "InternalInput", +] diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/__init__.py new file mode 100644 index 00000000..d540fd20 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/__init__.py @@ -0,0 +1,3 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/command.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/command.py new file mode 100644 index 00000000..2dddf02b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/command.py @@ -0,0 +1,37 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +from marshmallow import fields + +from ..._schema import NestedField +from ..._schema.core.fields import DumpableEnumField, EnvironmentField +from ..._schema.job import ParameterizedCommandSchema, ParameterizedParallelSchema +from ..._schema.job.job_limits import CommandJobLimitsSchema +from .._schema.node import InternalBaseNodeSchema, NodeType + + +class CommandSchema(InternalBaseNodeSchema, ParameterizedCommandSchema): + class Meta: + exclude = ["code", "distribution"] # internal command doesn't have code & distribution + + environment = EnvironmentField() + type = DumpableEnumField(allowed_values=[NodeType.COMMAND]) + limits = NestedField(CommandJobLimitsSchema) + + +class DistributedSchema(CommandSchema): + class Meta: + exclude = ["code"] # need to enable distribution comparing to CommandSchema + + type = DumpableEnumField(allowed_values=[NodeType.DISTRIBUTED]) + + +class ParallelSchema(InternalBaseNodeSchema, ParameterizedParallelSchema): + class Meta: + # partition_keys can still be used with unknown warning, but need to do dump before setting + exclude = ["task", "input_data", "mini_batch_error_threshold", "partition_keys"] + + type = DumpableEnumField(allowed_values=[NodeType.PARALLEL]) + compute = fields.Str() + environment = fields.Str() + limits = NestedField(CommandJobLimitsSchema) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/component.py new file mode 100644 index 00000000..11d4bb56 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/component.py @@ -0,0 +1,232 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +import os.path + +import pydash +from marshmallow import EXCLUDE, INCLUDE, fields, post_dump, pre_load + +from ..._schema import NestedField, StringTransformedEnum, UnionField +from ..._schema.component.component import ComponentSchema +from ..._schema.core.fields import ArmVersionedStr, CodeField, EnvironmentField, RegistryStr +from ..._schema.job.parameterized_spark import SparkEntryClassSchema, SparkEntryFileSchema +from ..._utils._arm_id_utils import parse_name_label +from ..._utils.utils import get_valid_dot_keys_with_wildcard +from ...constants._common import ( + LABELLED_RESOURCE_NAME, + SOURCE_PATH_CONTEXT_KEY, + AzureMLResourceType, + DefaultOpenEncoding, +) +from ...constants._component import NodeType as PublicNodeType +from .._utils import yaml_safe_load_with_base_resolver +from .environment import InternalEnvironmentSchema +from .input_output import ( + InternalEnumParameterSchema, + InternalInputPortSchema, + InternalOutputPortSchema, + InternalParameterSchema, + InternalPrimitiveOutputSchema, + InternalSparkParameterSchema, +) + + +class NodeType: + COMMAND = "CommandComponent" + DATA_TRANSFER = "DataTransferComponent" + DISTRIBUTED = "DistributedComponent" + HDI = "HDInsightComponent" + SCOPE_V2 = "scope" + HDI_V2 = "hdinsight" + HEMERA_V2 = "hemera" + STARLITE_V2 = "starlite" + AE365EXEPOOL_V2 = "ae365exepool" + AETHER_BRIDGE_V2 = "aetherbridge" + PARALLEL = "ParallelComponent" + SCOPE = "ScopeComponent" + STARLITE = "StarliteComponent" + SWEEP = "SweepComponent" + PIPELINE = "PipelineComponent" + HEMERA = "HemeraComponent" + AE365EXEPOOL = "AE365ExePoolComponent" + IPP = "IntellectualPropertyProtectedComponent" + # internal spake component got a type value conflict with spark component + # this enum is used to identify its create_function in factories + SPARK = "DummySpark" + AETHER_BRIDGE = "AetherBridgeComponent" + + @classmethod + def all_values(cls): + all_values = [] + for key, value in vars(cls).items(): + if not key.startswith("_") and isinstance(value, str): + all_values.append(value) + return all_values + + +class InternalComponentSchema(ComponentSchema): + class Meta: + unknown = INCLUDE + + # override name as 1p components allow . in name, which is not allowed in v2 components + name = fields.Str() + + # override to allow empty properties + tags = fields.Dict(keys=fields.Str()) + + # override inputs & outputs to support 1P inputs & outputs, may need to do strict validation later + # no need to check io type match since server will do that + inputs = fields.Dict( + keys=fields.Str(), + values=UnionField( + [ + NestedField(InternalParameterSchema), + NestedField(InternalEnumParameterSchema), + NestedField(InternalInputPortSchema), + ] + ), + ) + # support primitive output for all internal components for now + outputs = fields.Dict( + keys=fields.Str(), + values=UnionField( + [ + NestedField(InternalPrimitiveOutputSchema, unknown=EXCLUDE), + NestedField(InternalOutputPortSchema, unknown=EXCLUDE), + ] + ), + ) + + # type field is required for registration + type = StringTransformedEnum( + allowed_values=NodeType.all_values(), + casing_transform=lambda x: parse_name_label(x)[0], + pass_original=True, + ) + + # need to resolve as it can be a local field + code = CodeField() + + environment = UnionField( + [ + RegistryStr(azureml_type=AzureMLResourceType.ENVIRONMENT), + ArmVersionedStr(azureml_type=AzureMLResourceType.ENVIRONMENT), + NestedField(InternalEnvironmentSchema), + ] + ) + + def get_skip_fields(self): + return ["properties"] + + def _serialize(self, obj, *, many: bool = False): + if many and obj is not None: + return super(InternalComponentSchema, self)._serialize(obj, many=many) + ret = super(InternalComponentSchema, self)._serialize(obj) + for attr_name in obj.__dict__.keys(): + if ( + not attr_name.startswith("_") + and attr_name not in self.get_skip_fields() + and attr_name not in self.dump_fields + ): + ret[attr_name] = self.get_attribute(obj, attr_name, None) + return ret + + # override param_override to ensure that param override happens after reloading the yaml + @pre_load + def add_param_overrides(self, data, **kwargs): + source_path = self.context.pop(SOURCE_PATH_CONTEXT_KEY, None) + if isinstance(data, dict) and source_path and os.path.isfile(source_path): + + def should_node_overwritten(_root, _parts): + parts = _parts.copy() + parts.pop() + parts.append("type") + _input_type = pydash.get(_root, parts, None) + return isinstance(_input_type, str) and _input_type.lower() not in ["boolean"] + + # do override here + with open(source_path, "r", encoding=DefaultOpenEncoding.READ) as f: + origin_data = yaml_safe_load_with_base_resolver(f) + for dot_key_wildcard, condition_func in [ + ("version", None), + ("inputs.*.default", should_node_overwritten), + ("inputs.*.enum", should_node_overwritten), + ]: + for dot_key in get_valid_dot_keys_with_wildcard( + origin_data, dot_key_wildcard, validate_func=condition_func + ): + pydash.set_(data, dot_key, pydash.get(origin_data, dot_key)) + return super().add_param_overrides(data, **kwargs) + + @post_dump(pass_original=True) + def simplify_input_output_port(self, data, original, **kwargs): # pylint:disable=unused-argument + # remove None in input & output + for io_ports in [data["inputs"], data["outputs"]]: + for port_name, port_definition in io_ports.items(): + io_ports[port_name] = dict(filter(lambda item: item[1] is not None, port_definition.items())) + + # hack, to match current serialization match expectation + for port_name, port_definition in data["inputs"].items(): + if "mode" in port_definition: + del port_definition["mode"] + + return data + + @post_dump(pass_original=True) + def add_back_type_label(self, data, original, **kwargs): # pylint:disable=unused-argument + type_label = original._type_label # pylint:disable=protected-access + if type_label: + data["type"] = LABELLED_RESOURCE_NAME.format(data["type"], type_label) + return data + + +class InternalSparkComponentSchema(InternalComponentSchema): + # type field is required for registration + type = StringTransformedEnum( + allowed_values=PublicNodeType.SPARK, + casing_transform=lambda x: parse_name_label(x)[0].lower(), + pass_original=True, + ) + + # override inputs: + # https://componentsdk.azurewebsites.net/components/spark_component.html#differences-with-other-component-types + inputs = fields.Dict( + keys=fields.Str(), + values=UnionField( + [ + NestedField(InternalSparkParameterSchema), + NestedField(InternalInputPortSchema), + ] + ), + ) + + environment = EnvironmentField( + extra_fields=[NestedField(InternalEnvironmentSchema)], + allow_none=True, + ) + + jars = UnionField( + [ + fields.List(fields.Str()), + fields.Str(), + ], + ) + py_files = UnionField( + [ + fields.List(fields.Str()), + fields.Str(), + ], + data_key="pyFiles", + attribute="py_files", + ) + + entry = UnionField( + [NestedField(SparkEntryFileSchema), NestedField(SparkEntryClassSchema)], + required=True, + metadata={"description": "Entry."}, + ) + + files = fields.List(fields.Str(required=True)) + archives = fields.List(fields.Str(required=True)) + conf = fields.Dict(keys=fields.Str(), values=fields.Raw()) + args = fields.Str(metadata={"description": "Command Line arguments."}) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/environment.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/environment.py new file mode 100644 index 00000000..f7c20228 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/environment.py @@ -0,0 +1,21 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from marshmallow import fields + +from ..._schema import PathAwareSchema +from ..._schema.core.fields import DumpableEnumField, VersionField + + +class InternalEnvironmentSchema(PathAwareSchema): + docker = fields.Dict() + conda = fields.Dict() + os = DumpableEnumField( + # add enum instead of use string transformer here to avoid changing the value + allowed_values=["Linux", "Windows", "linux", "windows"], + required=False, + ) + name = fields.Str() + version = VersionField() + python = fields.Dict() diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/input_output.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/input_output.py new file mode 100644 index 00000000..b1fe2188 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/input_output.py @@ -0,0 +1,113 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from marshmallow import fields, post_dump, post_load + +from ..._schema import PatchedSchemaMeta, StringTransformedEnum, UnionField +from ..._schema.component.input_output import InputPortSchema, ParameterSchema +from ..._schema.core.fields import DumpableEnumField, PrimitiveValueField + +SUPPORTED_INTERNAL_PARAM_TYPES = [ + "integer", + "Integer", + "boolean", + "Boolean", + "string", + "String", + "float", + "Float", + "double", + "Double", +] + + +SUPPORTED_INTERNAL_SPARK_PARAM_TYPES = [ + "integer", + "Integer", + "boolean", + "Boolean", + "string", + "String", + "double", + "Double", + # remove float and add number + "number", +] + + +class InternalInputPortSchema(InputPortSchema): + # skip client-side validate for type enum & support list + type = UnionField( + [ + fields.Str(), + fields.List(fields.Str()), + ], + required=True, + data_key="type", + ) + is_resource = fields.Bool() + datastore_mode = fields.Str() + + @post_dump(pass_original=True) + def resolve_list_type(self, data, original_data, **kwargs): # pylint: disable=unused-argument + if isinstance(original_data.type, list): + data["type"] = original_data.type + return data + + +class InternalOutputPortSchema(metaclass=PatchedSchemaMeta): + # skip client-side validate for type enum + type = fields.Str( + required=True, + data_key="type", + ) + description = fields.Str() + is_link_mode = fields.Bool() + datastore_mode = fields.Str() + + +class InternalPrimitiveOutputSchema(metaclass=PatchedSchemaMeta): + type = DumpableEnumField( + allowed_values=SUPPORTED_INTERNAL_PARAM_TYPES, + required=True, + ) + description = fields.Str() + + +class InternalParameterSchema(ParameterSchema): + type = DumpableEnumField( + allowed_values=SUPPORTED_INTERNAL_PARAM_TYPES, + required=True, + data_key="type", + ) + + +class InternalSparkParameterSchema(ParameterSchema): + type = DumpableEnumField( + allowed_values=SUPPORTED_INTERNAL_SPARK_PARAM_TYPES, + required=True, + data_key="type", + ) + + +class InternalEnumParameterSchema(ParameterSchema): + type = StringTransformedEnum( + allowed_values=["enum"], + required=True, + data_key="type", + ) + default = PrimitiveValueField() + enum = fields.List( + PrimitiveValueField(), + required=True, + ) + + @post_dump + @post_load + def enum_value_to_string(self, data, **kwargs): # pylint: disable=unused-argument + if "enum" in data: + data["enum"] = list(map(str, data["enum"])) + if "default" in data and data["default"] is not None: + data["default"] = str(data["default"]) + return data diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/node.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/node.py new file mode 100644 index 00000000..6dbadcd3 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/node.py @@ -0,0 +1,75 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from marshmallow import INCLUDE, fields, post_load, pre_dump + +from ..._schema import ArmVersionedStr, NestedField, RegistryStr, UnionField +from ..._schema.core.fields import DumpableEnumField +from ..._schema.pipeline.component_job import BaseNodeSchema, _resolve_inputs_outputs +from ...constants._common import AzureMLResourceType +from .component import InternalComponentSchema, NodeType + + +class InternalBaseNodeSchema(BaseNodeSchema): + class Meta: + unknown = INCLUDE + + component = UnionField( + [ + # for registry type assets + RegistryStr(azureml_type=AzureMLResourceType.ENVIRONMENT), + # existing component + ArmVersionedStr(azureml_type=AzureMLResourceType.COMPONENT, allow_default_version=True), + # inline component or component file reference starting with FILE prefix + NestedField(InternalComponentSchema, unknown=INCLUDE), + ], + required=True, + ) + type = DumpableEnumField( + allowed_values=NodeType.all_values(), + ) + + @post_load + def make(self, data, **kwargs): # pylint: disable=unused-argument + from ...entities._builders import parse_inputs_outputs + + # parse inputs/outputs + data = parse_inputs_outputs(data) + + # dict to node object + from ...entities._job.pipeline._load_component import pipeline_node_factory + + return pipeline_node_factory.load_from_dict(data=data) + + @pre_dump + def resolve_inputs_outputs(self, job, **kwargs): # pylint: disable=unused-argument + return _resolve_inputs_outputs(job) + + +class ScopeSchema(InternalBaseNodeSchema): + type = DumpableEnumField(allowed_values=[NodeType.SCOPE]) + adla_account_name = fields.Str(required=True) + scope_param = fields.Str() + custom_job_name_suffix = fields.Str() + priority = fields.Int() + auto_token = fields.Int() + tokens = fields.Int() + vcp = fields.Float() + + +class HDInsightSchema(InternalBaseNodeSchema): + type = DumpableEnumField(allowed_values=[NodeType.HDI]) + + compute_name = fields.Str() + queue = fields.Str() + driver_memory = fields.Str() + driver_cores = fields.Int() + executor_memory = fields.Str() + executor_cores = fields.Int() + number_executors = fields.Int() + conf = UnionField( + # dictionary or json string + union_fields=[fields.Dict(keys=fields.Str()), fields.Str()], + ) + hdinsight_spark_job_name = fields.Str() diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_setup.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_setup.py new file mode 100644 index 00000000..2baf3237 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_setup.py @@ -0,0 +1,100 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +from typing import NoReturn + +# pylint: disable=protected-access +from marshmallow import INCLUDE + +from .._schema import NestedField +from ..entities._builders.control_flow_node import LoopNode +from ..entities._component.component_factory import component_factory +from ..entities._job.pipeline._load_component import pipeline_node_factory +from ._schema.command import CommandSchema, DistributedSchema, ParallelSchema +from ._schema.component import NodeType +from ._schema.node import HDInsightSchema, InternalBaseNodeSchema, ScopeSchema +from .entities import ( + Command, + DataTransfer, + Distributed, + HDInsight, + Hemera, + InternalBaseNode, + InternalComponent, + Parallel, + Pipeline, + Scope, + Starlite, +) +from .entities.spark import InternalSparkComponent + +_registered = False + + +def _set_registered(value: bool): + global _registered # pylint: disable=global-statement + _registered = value + + +def _enable_internal_components(): + create_schema_func = InternalComponent._create_schema_for_validation + for _type in NodeType.all_values(): + component_factory.register_type( + _type=_type, + create_instance_func=lambda: InternalComponent.__new__(InternalComponent), + create_schema_func=create_schema_func, + ) + component_factory.register_type( + _type=NodeType.SPARK, + create_instance_func=lambda: InternalSparkComponent.__new__(InternalSparkComponent), + create_schema_func=InternalSparkComponent._create_schema_for_validation, + ) + + +def _register_node(_type, node_cls, schema_cls): + pipeline_node_factory.register_type( + _type=_type, + create_instance_func=lambda: node_cls.__new__(node_cls), + load_from_rest_object_func=node_cls._from_rest_object, + nested_schema=NestedField(schema_cls, unknown=INCLUDE), + ) + + +def enable_internal_components_in_pipeline(*, force=False) -> NoReturn: + """Enable internal components in pipeline. + + :keyword force: Whether to force re-enable internal components. Defaults to False. + :type force: bool + :return: No return value. + :rtype: None + """ + if _registered and not force: + return # already registered + + _enable_internal_components() + for _type in NodeType.all_values(): + # if we do not register node class for all node types, the only difference will be the type of created node + # instance (Ae365exepool => InternalBaseNode). Not sure if this is acceptable. + _register_node(_type, InternalBaseNode, InternalBaseNodeSchema) + + # redo the registration for those with specific runsettings + _register_node(NodeType.DATA_TRANSFER, DataTransfer, InternalBaseNodeSchema) + _register_node(NodeType.COMMAND, Command, CommandSchema) + _register_node(NodeType.DISTRIBUTED, Distributed, DistributedSchema) + _register_node(NodeType.PARALLEL, Parallel, ParallelSchema) + _register_node(NodeType.HEMERA, Hemera, InternalBaseNodeSchema) + _register_node(NodeType.STARLITE, Starlite, InternalBaseNodeSchema) + _register_node(NodeType.SCOPE, Scope, ScopeSchema) + _register_node(NodeType.HDI, HDInsight, HDInsightSchema) + + # register v2 style 1p only components + _register_node(NodeType.HEMERA_V2, Hemera, InternalBaseNodeSchema) + _register_node(NodeType.STARLITE_V2, Starlite, InternalBaseNodeSchema) + _register_node(NodeType.SCOPE_V2, Scope, ScopeSchema) + _register_node(NodeType.HDI_V2, HDInsight, HDInsightSchema) + # Ae365exepool and AetherBridge have been registered to InternalBaseNode + + # allow using internal nodes in do-while loop + LoopNode._extra_body_types = (Command, Pipeline) + + _set_registered(True) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_utils/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_utils/__init__.py new file mode 100644 index 00000000..13d8db80 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_utils/__init__.py @@ -0,0 +1,7 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from ._yaml_utils import yaml_safe_load_with_base_resolver + +__all__ = ["yaml_safe_load_with_base_resolver"] diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_utils/_yaml_utils.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_utils/_yaml_utils.py new file mode 100644 index 00000000..fe3fa05e --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_utils/_yaml_utils.py @@ -0,0 +1,58 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +import typing + +import strictyaml + + +class _SafeLoaderWithBaseLoader(strictyaml.ruamel.SafeLoader): + """This is a SafeLoader with base resolver instead of version default resolver. + + Differences between BaseResolver and VersionedResolver: + 1) BaseResolver won't try to resolve node value. For example, "yes" and "no" will be resolved to "true"(bool) + and "false"(bool) by VersionedResolver, but won't be resolved by BaseResolver. + 2) VersionedResolver will delay loading the pattern matching rules to pass yaml versions on loading. + + Given SafeLoader inherits from VersionedResolver, we can't directly remove VersionedResolver + from the inheritance list. Instead, we overwrite add_version_implicit_resolver method to make + _SafeLoaderWithBaseLoader._version_implicit_resolver empty. Then the resolver will act like a BaseResolver. + """ + + def fetch_comment(self, comment): + pass + + def add_version_implicit_resolver(self, version, tag, regexp, first): + """Overwrite the method to make the resolver act like a base resolver instead of version default resolver. + + :param version: version of yaml, like (1, 1)(yaml 1.1) and (1, 2)(yaml 1.2) + :type version: VersionType + :param tag: a tag indicating the type of the resolved node, e.g., tag:yaml.org,2002:bool. + :type tag: Any + :param regexp: the regular expression to match the node to be resolved + :type regexp: Any + :param first: a list of first characters to match + :type first: Any + """ + self._version_implicit_resolver.setdefault(version, {}) + + +def yaml_safe_load_with_base_resolver(stream: typing.IO): + """Load yaml string with base resolver instead of version default resolver. + + For example: + 1) "yes" and "no" will be loaded as "yes"(string) and "no"(string) instead of "true"(bool) and "false"(bool); + 2) "0.10" will be loaded as "0.10"(string) instead of "0.1"(float). + 3) "2019-01-01" will be loaded as "2019-01-01"(string) instead of "2019-01-01T00:00:00Z"(datetime). + 4) "1" will be loaded as "1"(string) instead of "1"(int). + 5) "1.0" will be loaded as "1.0"(string) instead of "1.0"(float). + 6) "~" will be loaded as "~"(string) instead of "None"(NoneType). + + Please refer to strictyaml.ruamel.resolver.implicit_resolvers for more details. + + :param stream: A readable stream + :type stream: typing.IO + :return: The return value of strictyaml.ruamel.load + :rtype: Any + """ + return strictyaml.ruamel.load(stream, Loader=_SafeLoaderWithBaseLoader) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/dsl/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/dsl/__init__.py new file mode 100644 index 00000000..ef7c42d9 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/dsl/__init__.py @@ -0,0 +1,7 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from ...dsl._settings import set_pipeline_settings + +__all__ = ["set_pipeline_settings"] diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/__init__.py new file mode 100644 index 00000000..47296a80 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/__init__.py @@ -0,0 +1,46 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +from ._input_outputs import InternalInput +from .command import Command, Distributed +from .component import InternalComponent +from .node import Ae365exepool, AetherBridge, DataTransfer, HDInsight, Hemera, InternalBaseNode, Pipeline, Starlite +from .parallel import Parallel +from .runsettings import ( + AISuperComputerConfiguration, + AISuperComputerScalePolicy, + AISuperComputerStorageReferenceConfiguration, + ITPConfiguration, + ITPInteractiveConfiguration, + ITPPriorityConfiguration, + ITPResourceConfiguration, + ITPRetrySettings, + TargetSelector, +) +from .scope import Scope + +__all__ = [ + "InternalBaseNode", + "DataTransfer", + "Ae365exepool", + "AetherBridge", + "HDInsight", + "Parallel", + "Starlite", + "Pipeline", + "Hemera", + "Command", + "Distributed", + "Scope", + "InternalComponent", + "TargetSelector", + "ITPInteractiveConfiguration", + "ITPPriorityConfiguration", + "ITPResourceConfiguration", + "ITPRetrySettings", + "ITPConfiguration", + "AISuperComputerConfiguration", + "AISuperComputerScalePolicy", + "AISuperComputerStorageReferenceConfiguration", + "InternalInput", +] diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/_additional_includes.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/_additional_includes.py new file mode 100644 index 00000000..71a8c0a4 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/_additional_includes.py @@ -0,0 +1,31 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +from pathlib import Path +from typing import Optional + +from ...constants._common import AzureDevopsArtifactsType +from ...entities._component._additional_includes import AdditionalIncludes + + +class InternalAdditionalIncludes(AdditionalIncludes): + """This class is kept for compatibility with mldesigner.""" + + @property + def includes(self): + if self._is_artifact_includes: + return self._get_resolved_additional_include_configs() + return self.origin_configs + + @property + def code_path(self) -> Optional[Path]: + return self.resolved_code_path + + @property + def _is_artifact_includes(self): + return any( + map( + lambda x: isinstance(x, dict) and x.get("type", None) == AzureDevopsArtifactsType.ARTIFACT, + self.origin_configs, + ) + ) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/_input_outputs.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/_input_outputs.py new file mode 100644 index 00000000..f9d0a970 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/_input_outputs.py @@ -0,0 +1,193 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +from typing import Dict, Optional, Union, overload + +from ... import Input, Output +from ..._utils.utils import get_all_enum_values_iter +from ...constants import AssetTypes +from ...constants._common import InputTypes +from ...constants._component import ComponentParameterTypes, IOConstants +from .._schema.input_output import SUPPORTED_INTERNAL_PARAM_TYPES + +_INPUT_TYPE_ENUM = "enum" +_INPUT_TYPE_ENUM_CAP = "Enum" +_INPUT_TYPE_FLOAT = "float" +_INPUT_TYPE_FLOAT_CAP = "Float" + + +class InternalInput(Input): + """Internal input class for internal components only. Comparing to the public Input class, this class has additional + primitive input types: + + - String + - Integer + - Float, float + - Boolean + - Enum, enum (new) + """ + + def __init__(self, *, datastore_mode=None, is_resource=None, **kwargs): + self.datastore_mode = datastore_mode + self.is_resource = is_resource + super().__init__(**kwargs) + + @property + def _allowed_types(self): + if self._lower_type == _INPUT_TYPE_ENUM: + return str + if self._lower_type == _INPUT_TYPE_FLOAT: + return float + return IOConstants.PRIMITIVE_STR_2_TYPE.get(self._lower_type, None) + + @property + def _lower_type(self) -> Optional[str]: + if isinstance(self.type, str): + return self.type.lower() + if self._multiple_types: + return None + return self.type.__name__.lower() + + @property + def _is_primitive_type(self): + if self._lower_type in [_INPUT_TYPE_ENUM, _INPUT_TYPE_FLOAT]: + return True + if self._lower_type in IOConstants.PRIMITIVE_STR_2_TYPE: + return True + return super()._is_primitive_type + + def _simple_parse(self, value, _type=None): + # simple parse is used to parse min, max & optional only, so we don't need to handle enum + if _type is None: + _type = self._lower_type + if _type == _INPUT_TYPE_FLOAT: + _type = ComponentParameterTypes.NUMBER + return super()._simple_parse(value, _type) + + def _to_rest_object(self) -> Dict: + rest_object = super()._to_rest_object() + if self._lower_type == _INPUT_TYPE_ENUM: + rest_object["type"] = _INPUT_TYPE_ENUM_CAP + if self._lower_type == _INPUT_TYPE_FLOAT: + rest_object["type"] = _INPUT_TYPE_FLOAT_CAP + return rest_object + + @classmethod + def _map_from_rest_type(cls, _type): + mapping_dict = { + _INPUT_TYPE_ENUM_CAP: _INPUT_TYPE_ENUM, + _INPUT_TYPE_FLOAT_CAP: _INPUT_TYPE_FLOAT, + } + if isinstance(_type, str) and _type in mapping_dict: + return mapping_dict[_type] + return super()._map_from_rest_type(_type) + + @classmethod + def _from_rest_object(cls, obj: Dict) -> "InternalInput": + obj["type"] = cls._map_from_rest_type(obj["type"]) + return InternalInput(**obj) + + def _get_python_builtin_type_str(self) -> str: + if self._lower_type in [_INPUT_TYPE_ENUM, _INPUT_TYPE_FLOAT]: + return self._lower_type + if self._is_primitive_type: + return IOConstants.PRIMITIVE_STR_2_TYPE[self._lower_type].__name__ # type: ignore[index] + # TODO: Bug 2881900 + return super()._get_python_builtin_type_str() + + @overload + @classmethod + def _from_base(cls, _input: None) -> None: # type: ignore[misc] + ... + + @overload + @classmethod + def _from_base(cls, _input: Union[Input, Dict]) -> "InternalInput": ... + + @classmethod + def _from_base(cls, _input: Optional[Union[Input, Dict]]) -> Optional["InternalInput"]: + """Cast from Input or Dict to InternalInput. + + Do not guarantee to create a new object. + + :param _input: The base input + :type _input: Union[Input, Dict] + :return: + * None if _input is None + * InternalInput + :rtype: Optional["InternalInput"] + """ + if _input is None: + return None + if isinstance(_input, InternalInput): + return _input + if isinstance(_input, Input): + # do force cast directly as there is no new field added in InternalInput + # need to change the logic if new field is added + _input.__class__ = InternalInput + return _input # type: ignore[return-value] + return InternalInput(**_input) + + +def _map_v1_io_type(output_type: str) -> str: + """Map v1 IO type to v2. + + :param output_type: The v1 IO type + :type output_type: str + :return: The v2 IO type name + :rtype: str + """ + + # TODO: put it in a common place + def _map_primitive_type(_type: str) -> str: + """Convert double and float to number type. + + :param _type: A primitive v1 IO type + :type _type: str + :return: + * InputTypes.NUMBER if _type is "double" or "float" + * The provided type otherwise + :rtype: str + """ + _type = _type.lower() + if _type in ["double", "float"]: + return InputTypes.NUMBER + return _type + + if output_type in list(get_all_enum_values_iter(AssetTypes)): + return output_type + if output_type in SUPPORTED_INTERNAL_PARAM_TYPES: + return _map_primitive_type(output_type) + if output_type in ["AnyFile"]: + return AssetTypes.URI_FILE + # Handle AnyDirectory and the other types. + return AssetTypes.URI_FOLDER + + +class InternalOutput(Output): + def __init__(self, *, datastore_mode=None, is_link_mode=None, **kwargs): + self.datastore_mode = datastore_mode + self.is_link_mode = is_link_mode + super().__init__(**kwargs) + + @classmethod + def _from_base(cls, _output: Union[Output, Dict]) -> Optional["InternalOutput"]: + if _output is None: + return None + if isinstance(_output, InternalOutput): + return _output + if isinstance(_output, Output): + # do force cast directly as there is no new field added in InternalInput + # need to change the logic if new field is added + _output.__class__ = InternalOutput + return _output # type: ignore[return-value] + return InternalOutput(**_output) + + def map_pipeline_output_type(self) -> str: + """Map output type to pipeline output type. + + :return: The pipeline output type + :rtype: str + """ + # TODO: call this for node output + return _map_v1_io_type(self.type) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/_merkle_tree.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/_merkle_tree.py new file mode 100644 index 00000000..3a27b562 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/_merkle_tree.py @@ -0,0 +1,195 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=pointless-string-statement + +import hashlib +import json +import os +from collections import deque +from datetime import datetime +from os import listdir +from os.path import isfile, join + +HASH_FILE_CHUNK_SIZE = 65536 +HASH_ALGORITHM = "sha512" + +""" Copied from ml-components +Create a merkle tree for the given directory path +The directory would typically represent a project directory""" + + +def create_merkletree(file_or_folder_path, exclude_function): + root = DirTreeNode("", "Directory", datetime.fromtimestamp(os.path.getmtime(file_or_folder_path)).isoformat()) + if os.path.isdir(file_or_folder_path): + folder_path = file_or_folder_path + _create_merkletree_helper(folder_path, root, exclude_function) + else: + file_path = file_or_folder_path + file_node = DirTreeNode(file_path, "File", datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat()) + hexdigest_hash, bytehash = _get_hash(os.path.normpath(file_path), file_path, "File") + if hexdigest_hash and bytehash: + file_node.add_hash(hexdigest_hash, bytehash) + root.add_child(file_node) + + _populate_hashes(root) + return root + + +""" Populate hashes for directory nodes +by hashing the hashes of child nodes under them""" + + +def _populate_hashes(rootNode): + if rootNode.is_file(): + return rootNode.bytehash + h = hashlib.new(HASH_ALGORITHM) + for child in rootNode.children: + if child.is_file(): + h.update(child.bytehash) + else: + h.update(_populate_hashes(child)) + rootNode.bytehash = h.digest() + rootNode.hexdigest_hash = h.hexdigest() + return h.digest() + + +""" Create a merkle tree for the given directory path + :param projectDir: Directory for which to create a tree. + :param rootNode: Root node . + Walks the directory and create a dirTree """ + + +def _create_merkletree_helper(projectDir, rootNode, exclude_function): + for f in sorted(listdir(projectDir)): + path = os.path.normpath(join(projectDir, f)) + if not exclude_function(path): + if isfile(join(projectDir, f)): + newNode = DirTreeNode(f, "File", datetime.fromtimestamp(os.path.getmtime(path)).isoformat()) + hexdigest_hash, bytehash = _get_hash(path, f, "File") + if hexdigest_hash and bytehash: + newNode.add_hash(hexdigest_hash, bytehash) + rootNode.add_child(newNode) + else: + newNode = DirTreeNode(f, "Directory", datetime.fromtimestamp(os.path.getmtime(path)).isoformat()) + rootNode.add_child(newNode) + _create_merkletree_helper(path, newNode, exclude_function) + + +def _get_hash(filePath, name, file_type): + h = hashlib.new(HASH_ALGORITHM) + if not os.access(filePath, os.R_OK): + print(filePath, os.R_OK) + print("Cannot access file, so excluded from snapshot: {}".format(filePath)) + return (None, None) + with open(filePath, "rb") as f: + while True: + data = f.read(HASH_FILE_CHUNK_SIZE) + if not data: + break + h.update(data) + h.update(name.encode("utf-8")) + h.update(file_type.encode("utf-8")) + return (h.hexdigest(), h.digest()) + + +""" We compute both hexdigest and digest for hashes. +digest (bytes) is used so that we can compute the bytehash of a parent directory based on bytehash of its children +hexdigest is used so that we can serialize the tree using json""" + + +class DirTreeNode(object): + def __init__(self, name=None, file_type=None, timestamp=None, hexdigest_hash=None, bytehash=None): + self.file_type = file_type + self.name = name + self.timestamp = timestamp + self.children = [] + self.hexdigest_hash = hexdigest_hash + self.bytehash = bytehash + + def load_children_from_dict(self, node_dict): + if len(node_dict.items()) == 0: + return None + self.name = node_dict["name"] + self.file_type = node_dict["type"] + self.hexdigest_hash = node_dict["hash"] + self.timestamp = node_dict["timestamp"] + for _, child in node_dict["children"].items(): + node = DirTreeNode() + node.load_children_from_dict(child) + self.add_child(node) + return self + + def load_children_from_json(self, node_dict): + self.name = node_dict["name"] + self.file_type = node_dict["type"] + self.hexdigest_hash = node_dict["hash"] + self.timestamp = node_dict["timestamp"] + for child in node_dict["children"]: + node = DirTreeNode() + node.load_children_from_json(child) + self.add_child(node) + return self + + def load_object_from_dict(self, node_dict): + self.load_children_from_dict(node_dict) + + def load_root_object_from_json_string(self, jsondata): + node_dict = json.loads(jsondata) + self.load_children_from_json(node_dict) + + def add_hash(self, hexdigest_hash, bytehash): + self.hexdigest_hash = hexdigest_hash + self.bytehash = bytehash + + def add_child(self, node): + self.children.append(node) + + def is_file(self): + return self.file_type == "File" + + """ Only for debugging purposes""" + + def print_tree(self): + queue = deque() + print("Name: " + self.name) + print("Type: " + self.file_type) + for child in self.children: + print(" " + child.name) + queue.append(child) + for i in queue: + i.print_tree() + + +""" Serialize merkle tree. +Serialize all fields except digest (bytes) +""" + + +class DirTreeJsonEncoder(json.JSONEncoder): + def default(self, o): + if not isinstance(o, DirTreeNode): + return super(DirTreeJsonEncoder, self).default(o) + _dict = o.__dict__ + _dict.pop("bytehash", None) + _dict["type"] = _dict.pop("file_type") + _dict["hash"] = _dict.pop("hexdigest_hash") + + return _dict + + +class DirTreeJsonEncoderV2(json.JSONEncoder): + def default(self, o): + if not isinstance(o, DirTreeNode): + return super(DirTreeJsonEncoderV2, self).default(o) + _dict = o.__dict__ + _dict.pop("bytehash", None) + if "file_type" in _dict: + _dict["type"] = _dict.pop("file_type") + if "hexdigest_hash" in _dict: + _dict["hash"] = _dict.pop("hexdigest_hash") + if isinstance(_dict["children"], list): + _dict["children"] = {x.name: x for x in _dict["children"]} + + return _dict diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/code.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/code.py new file mode 100644 index 00000000..d4437663 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/code.py @@ -0,0 +1,35 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +from typing import Optional + +from ...entities._assets import Code + + +class InternalCode(Code): + @property + def _upload_hash(self) -> Optional[str]: + # This property will be used to identify the uploaded content when trying to + # upload to datastore. The tracebacks will be as below: + # Traceback (most recent call last): + # _artifact_utilities._check_and_upload_path + # _artifact_utilities._upload_to_datastore + # _artifact_utilities.upload_artifact + # _blob_storage_helper.upload + # where asset id will be calculated based on the upload hash. + + if self._is_anonymous is True: + # Name of an anonymous internal code is the same as its snapshot id + # in ml-component, use it as the upload hash to avoid duplicate hash + # calculation with _asset_utils.get_object_hash. + return self.name + + return getattr(super(InternalCode, self), "_upload_hash") + + def __setattr__(self, key, value): + if key == "name" and hasattr(self, key) and self._is_anonymous is True and value != self.name: + raise AttributeError( + "InternalCode name are calculated based on its content and cannot " + "be changed: current name is {} and new value is {}".format(self.name, value) + ) + super().__setattr__(key, value) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/command.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/command.py new file mode 100644 index 00000000..9ed732ec --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/command.py @@ -0,0 +1,203 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +# pylint: disable=protected-access +from typing import Dict, List, Optional, Union + +from marshmallow import INCLUDE, Schema + +from ... import MpiDistribution, PyTorchDistribution, RayDistribution, TensorFlowDistribution +from ..._schema import PathAwareSchema +from ..._schema.core.fields import DistributionField +from ...entities import CommandJobLimits, JobResourceConfiguration +from ...entities._util import get_rest_dict_for_node_attrs +from .._schema.component import NodeType +from ..entities.component import InternalComponent +from ..entities.node import InternalBaseNode + + +class Command(InternalBaseNode): + """Node of internal command components in pipeline with specific run settings. + + Different from azure.ai.ml.entities.Command, type of this class is CommandComponent. + """ + + def __init__(self, **kwargs): + node_type = kwargs.pop("type", None) or NodeType.COMMAND + super(Command, self).__init__(type=node_type, **kwargs) + self._init = True + self._resources = kwargs.pop("resources", JobResourceConfiguration()) + self._compute = kwargs.pop("compute", None) + self._environment = kwargs.pop("environment", None) + self._environment_variables = kwargs.pop("environment_variables", None) + self._limits = kwargs.pop("limits", CommandJobLimits()) + self._init = False + + @property + def compute(self) -> Optional[str]: + """Get the compute definition for the command. + + :return: The compute definition + :rtype: Optional[str] + """ + return self._compute + + @compute.setter + def compute(self, value: str) -> None: + """Set the compute definition for the command. + + :param value: The new compute definition + :type value: str + """ + self._compute = value + + @property + def environment(self) -> Optional[str]: + """Get the environment definition for the command. + + :return: The environment definition + :rtype: Optional[str] + """ + return self._environment + + @environment.setter + def environment(self, value: str) -> None: + """Set the environment definition for the command. + + :param value: The new environment definition + :type value: str + """ + self._environment = value + + @property + def environment_variables(self) -> Optional[Dict[str, str]]: + """Get the environment variables for the command. + + :return: The environment variables + :rtype: Optional[Dict[str, str]] + """ + return self._environment_variables + + @environment_variables.setter + def environment_variables(self, value: Dict[str, str]) -> None: + """Set the environment variables for the command. + + :param value: The new environment variables + :type value: Dict[str, str] + """ + self._environment_variables = value + + @property + def limits(self) -> CommandJobLimits: + return self._limits + + @limits.setter + def limits(self, value: CommandJobLimits): + self._limits = value + + @property + def resources(self) -> JobResourceConfiguration: + """Compute Resource configuration for the component. + + :return: The resource configuration + :rtype: JobResourceConfiguration + """ + return self._resources + + @resources.setter + def resources(self, value: JobResourceConfiguration): + self._resources = value + + @classmethod + def _picked_fields_from_dict_to_rest_object(cls) -> List[str]: + return ["environment", "limits", "resources", "environment_variables"] + + @classmethod + def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]: + from .._schema.command import CommandSchema + + return CommandSchema(context=context) + + def _to_rest_object(self, **kwargs) -> dict: + rest_obj = super()._to_rest_object(**kwargs) + rest_obj.update( + { + "limits": get_rest_dict_for_node_attrs(self.limits, clear_empty_value=True), + "resources": get_rest_dict_for_node_attrs(self.resources, clear_empty_value=True), + } + ) + return rest_obj + + @classmethod + def _from_rest_object_to_init_params(cls, obj): + obj = InternalBaseNode._from_rest_object_to_init_params(obj) + + if "resources" in obj and obj["resources"]: + obj["resources"] = JobResourceConfiguration._from_rest_object(obj["resources"]) + + # handle limits + if "limits" in obj and obj["limits"]: + obj["limits"] = CommandJobLimits._from_rest_object(obj["limits"]) + return obj + + +class Distributed(Command): + def __init__(self, **kwargs): + super(Distributed, self).__init__(**kwargs) + self._distribution = kwargs.pop("distribution", None) + self._type = NodeType.DISTRIBUTED + if self._distribution is None: + # hack: distribution.type is required to set distribution, which is defined in launcher.type + if ( + isinstance(self.component, InternalComponent) + and self.component.launcher + and "type" in self.component.launcher + ): + self.distribution = {"type": self.component.launcher["type"]} + else: + raise ValueError( + "launcher.type must be specified in definition of DistributedComponent but got {}".format( + self.component + ) + ) + + @property + def distribution( + self, + ) -> Union[PyTorchDistribution, MpiDistribution, TensorFlowDistribution, RayDistribution]: + """The distribution config of component, e.g. distribution={'type': 'mpi'}. + + :return: The distribution config + :rtype: Union[PyTorchDistribution, MpiDistribution, TensorFlowDistribution, RayDistribution] + """ + return self._distribution + + @distribution.setter + def distribution( + self, + value: Union[Dict, PyTorchDistribution, TensorFlowDistribution, MpiDistribution, RayDistribution], + ): + if isinstance(value, dict): + dist_schema = DistributionField(unknown=INCLUDE) + value = dist_schema._deserialize(value=value, attr=None, data=None) + self._distribution = value + + @classmethod + def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]: + from .._schema.command import DistributedSchema + + return DistributedSchema(context=context) + + @classmethod + def _picked_fields_from_dict_to_rest_object(cls) -> List[str]: + return Command._picked_fields_from_dict_to_rest_object() + ["distribution"] + + def _to_rest_object(self, **kwargs) -> dict: + rest_obj = super()._to_rest_object(**kwargs) + distribution = self.distribution._to_rest_object() if self.distribution else None # pylint: disable=no-member + rest_obj.update( + { + "distribution": get_rest_dict_for_node_attrs(distribution), + } + ) + return rest_obj diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/component.py new file mode 100644 index 00000000..e54c1906 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/component.py @@ -0,0 +1,370 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +# pylint: disable=protected-access, redefined-builtin +# disable redefined-builtin to use id/type as argument name +import os +from contextlib import contextmanager +from os import PathLike +from pathlib import Path +from typing import Any, Dict, Iterable, List, Optional, Union +from uuid import UUID + +import yaml # type: ignore[import] +from marshmallow import Schema + +from ... import Input, Output +from ..._restclient.v2022_10_01.models import ComponentVersion, ComponentVersionProperties +from ..._schema import PathAwareSchema +from ..._utils._arm_id_utils import parse_name_label +from ..._utils._asset_utils import IgnoreFile +from ...constants._common import DefaultOpenEncoding +from ...entities import Component +from ...entities._assets import Code +from ...entities._component._additional_includes import AdditionalIncludes, AdditionalIncludesMixin +from ...entities._component.code import ComponentIgnoreFile +from ...entities._job.distribution import DistributionConfiguration +from ...entities._system_data import SystemData +from ...entities._util import convert_ordered_dict_to_dict +from ...entities._validation import MutableValidationResult +from .._schema.component import InternalComponentSchema +from ._input_outputs import InternalInput, InternalOutput +from ._merkle_tree import create_merkletree +from .code import InternalCode +from .environment import InternalEnvironment +from .node import InternalBaseNode + +_ADDITIONAL_INCLUDES_CONFIG_KEY = "additional_includes" +_ADDITIONAL_INCLUDES_SUFFIX = ".additional_includes" + + +class InternalComponent(Component, AdditionalIncludesMixin): + # pylint: disable=too-many-instance-attributes, too-many-locals + """Base class for internal component version, used to define an internal component. Recommended to create instance + with component_factory. + + :param name: Name of the resource. + :type name: str + :param version: Version of the resource. + :type version: str + :param id: Global id of the resource, Azure Resource Manager ID. + :type id: str + :param type: Type of the command, supported is 'command'. + :type type: str + :param description: Description of the resource. + :type description: str + :param tags: Tag dictionary. Tags can be added, removed, and updated. + :type tags: dict + :param properties: Internal use only. + :type properties: dict + :param display_name: Display name of the component. + :type display_name: str + :param is_deterministic: Whether the component is deterministic. + :type is_deterministic: bool + :param inputs: Inputs of the component. + :type inputs: dict + :param outputs: Outputs of the component. + :type outputs: dict + :param yaml_str: The yaml string of the component. + :type yaml_str: str + :param _schema: Schema of the component. + :type _schema: str + :param creation_context: Creation metadata of the component. + :type creation_context: ~azure.ai.ml.entities.SystemData + """ + + def __init__( + self, + *, + _schema: Optional[str] = None, + name: Optional[str] = None, + version: Optional[str] = None, + display_name: Optional[str] = None, + type: Optional[str] = None, + description: Optional[str] = None, + tags: Optional[Dict] = None, + is_deterministic: Optional[bool] = None, + successful_return_code: Optional[str] = None, + inputs: Optional[Dict] = None, + outputs: Optional[Dict] = None, + code: Optional[Union[str, os.PathLike]] = None, + environment: Optional[Dict] = None, + environment_variables: Optional[Dict] = None, + command: Optional[str] = None, + id: Optional[str] = None, + properties: Optional[Dict] = None, + yaml_str: Optional[str] = None, + creation_context: Optional[SystemData] = None, + scope: Optional[Dict] = None, + hemera: Optional[Dict] = None, + hdinsight: Optional[Dict] = None, + parallel: Optional[Dict] = None, + starlite: Optional[Dict] = None, + ae365exepool: Optional[Dict] = None, + launcher: Optional[Dict] = None, + datatransfer: Optional[Dict] = None, + aether: Optional[Dict] = None, + **kwargs, + ): + _type, self._type_label = parse_name_label(type) + super().__init__( + name=name, + version=version, + id=id, + type=_type, + description=description, + tags=tags, + properties=properties, + display_name=display_name, + is_deterministic=is_deterministic, # type: ignore[arg-type] + inputs=inputs, + outputs=outputs, + yaml_str=yaml_str, + _schema=_schema, + creation_context=creation_context, + **kwargs, + ) + # Store original yaml + self._yaml_str = yaml_str + self._other_parameter = kwargs + + self.successful_return_code = successful_return_code + self.code = code + self.environment = InternalEnvironment(**environment) if isinstance(environment, dict) else environment + self.environment_variables = environment_variables + # TODO: remove these to keep it a general component class + self.command = command + self.scope = scope + self.hemera = hemera + self.hdinsight = hdinsight + self.parallel = parallel + self.starlite = starlite + self.ae365exepool = ae365exepool + self.launcher = launcher + self.datatransfer = datatransfer + self.aether = aether + + @classmethod + def _build_io(cls, io_dict: Union[Dict, Input, Output], is_input: bool): + component_io = {} + for name, port in io_dict.items(): + if is_input: + component_io[name] = InternalInput._from_base(port) + else: + component_io[name] = InternalOutput._from_base(port) + return component_io + + # region AdditionalIncludesMixin + + @classmethod + def _read_additional_include_configs(cls, yaml_path: Path) -> List[str]: + """Read additional include configs from the additional includes file. + The name of the file is the same as the component spec file, with a suffix of ".additional_includes". + It can be either a yaml file or a text file: + 1. If it is a yaml file, yaml format of additional_includes looks like below: + ``` + additional_includes: + - your/local/path + - type: artifact + organization: devops_organization + project: devops_project + feed: artifacts_feed_name + name: universal_package_name + version: package_version + scope: scope_type + ``` + 2. If it is a text file, each line is a path to include. Note that artifact config is not supported + in this format. + + :param yaml_path: The yaml path + :type yaml_path: Path + :return: The list of additional includes + :rtype: List[str] + """ + additional_includes_config_path = yaml_path.with_suffix(_ADDITIONAL_INCLUDES_SUFFIX) + if additional_includes_config_path.is_file(): + with open(additional_includes_config_path, encoding=DefaultOpenEncoding.READ) as f: + file_content = f.read() + try: + configs = yaml.safe_load(file_content) + if isinstance(configs, dict): + return configs.get(_ADDITIONAL_INCLUDES_CONFIG_KEY, []) + except Exception: # pylint: disable=W0718 + # TODO: check if we should catch yaml.YamlError instead here + pass + return [line.strip() for line in file_content.splitlines(keepends=False) if len(line.strip()) > 0] + return [] + + @classmethod + def _get_additional_includes_field_name(cls) -> str: + # additional includes for internal components are configured by a file, which is not a field in the yaml + # return '*' as diagnostics yaml paths and override _get_all_additional_includes_configs. + return "*" + + def _get_all_additional_includes_configs(self) -> List: + # internal components must have a source path + return self._read_additional_include_configs(Path(self._source_path)) # type: ignore[arg-type] + # TODO: Bug 2881943 + + def _get_base_path_for_code(self) -> Path: + # internal components must have a source path + return Path(self._source_path).parent # type: ignore[arg-type] + # TODO: Bug 2881943 + + def _get_origin_code_value(self) -> Union[str, PathLike, None]: + return super()._get_origin_code_value() or "." + + # endregion + + def _to_ordered_dict_for_yaml_dump(self) -> Dict: + """Dump the component content into a sorted yaml string. + + :return: The ordered dict + :rtype: Dict + """ + + obj = super()._to_ordered_dict_for_yaml_dump() + # dict dumped base on schema will transfer code to an absolute path, while we want to keep its original value + if "code" in obj: + if not self.code: + del obj["code"] + else: + obj["code"] = self.code + return obj + + @property + def _additional_includes(self) -> AdditionalIncludes: + """This property is kept for compatibility with old mldesigner sdk. + + :return: The additional includes + :rtype: AdditionalIncludes + """ + obj = self._generate_additional_includes_obj() + from azure.ai.ml._internal.entities._additional_includes import InternalAdditionalIncludes + + obj.__class__ = InternalAdditionalIncludes + return obj + + # region SchemaValidatableMixin + @classmethod + def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]: + return InternalComponentSchema(context=context) + + def _customized_validate(self) -> MutableValidationResult: + validation_result = super(InternalComponent, self)._customized_validate() + skip_path_validation = not self._append_diagnostics_and_check_if_origin_code_reliable_for_local_path_validation( + validation_result + ) + # resolving additional includes & update self._base_path can be dangerous, + # so we just skip path validation if additional includes is provided. + # note that there will still be client-side error on job submission (after code is resolved) + # if paths in environment are invalid + if isinstance(self.environment, InternalEnvironment): + validation_result.merge_with( + self.environment.validate( + self._base_path, + skip_path_validation=skip_path_validation, + ), + field_name="environment", + ) + return validation_result + + # endregion + + @classmethod + def _from_rest_object_to_init_params(cls, obj: ComponentVersion) -> Dict: + # put it here as distribution is shared by some components, e.g. command + distribution = obj.properties.component_spec.pop("distribution", None) + init_kwargs = super()._from_rest_object_to_init_params(obj) + if distribution: + init_kwargs["distribution"] = DistributionConfiguration._from_rest_object(distribution) + return init_kwargs + + def _to_rest_object(self) -> ComponentVersion: + component: Union[Dict[Any, Any], List[Any]] = convert_ordered_dict_to_dict(self._to_dict()) + component["_source"] = self._source # type: ignore[call-overload] + # TODO: 2883063 + + properties = ComponentVersionProperties( + component_spec=component, + description=self.description, + is_anonymous=self._is_anonymous, + properties=self.properties, + tags=self.tags, + ) + result = ComponentVersion(properties=properties) + result.name = self.name + return result + + @classmethod + def _get_snapshot_id( + cls, + code_path: Union[str, PathLike], + ignore_file: IgnoreFile, + ) -> str: + """Get the snapshot id of a component with specific working directory in ml-components. Use this as the name of + code asset to reuse steps in a pipeline job from ml-components runs. + + :param code_path: The path of the working directory. + :type code_path: str + :param ignore_file: The ignore file of the snapshot. + :type ignore_file: IgnoreFile + :return: The snapshot id of a component in ml-components with code_path as its working directory. + :rtype: str + """ + curr_root = create_merkletree(code_path, ignore_file.is_file_excluded) + snapshot_id = str(UUID(curr_root.hexdigest_hash[::4])) + return snapshot_id + + @contextmanager # type: ignore[arg-type] + def _try_build_local_code(self) -> Iterable[Code]: + """Build final code when origin code is a local code. + Will merge code path with additional includes into a temp folder if additional includes is specified. + For internal components, file dependencies in environment will be resolved based on the final code. + + :return: The code instance + :rtype: Iterable[Code] + """ + + tmp_code_dir: Path + # origin code value of internal component will never be None. check _get_origin_code_value for details + with self._generate_additional_includes_obj().merge_local_code_and_additional_includes() as tmp_code_dir: + # use absolute path in case temp folder & work dir are in different drive + tmp_code_dir = tmp_code_dir.absolute() + + # file dependency in code will be read during internal environment resolution + # for example, docker file of the environment may be in additional includes; + # and it will be read then insert to the environment object during resolution. + # so we need to resolve environment based on the temporary code path + if isinstance(self.environment, InternalEnvironment): + self.environment.resolve(base_path=tmp_code_dir) + + # additional includes config file itself should be ignored + rebased_ignore_file = ComponentIgnoreFile( + tmp_code_dir, + additional_includes_file_name=Path(self._source_path) + .with_suffix(_ADDITIONAL_INCLUDES_SUFFIX) + .name, # type: ignore[arg-type] + # TODO: Bug 2881943 + ) + + # Use the snapshot id in ml-components as code name to enable anonymous + # component reuse from ml-component runs. + # calculate snapshot id here instead of inside InternalCode to ensure that + # snapshot id is calculated based on the built code path + yield InternalCode( + name=self._get_snapshot_id( + # use absolute path in case temp folder & work dir are in different drive + tmp_code_dir, + # this ignore-file should be rebased to the built code path + rebased_ignore_file, + ), + version="1", + base_path=self._base_path, + path=tmp_code_dir, + is_anonymous=True, + ignore_file=rebased_ignore_file, + ) + + def __call__(self, *args, **kwargs) -> InternalBaseNode: + return super(InternalComponent, self).__call__(*args, **kwargs) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/environment.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/environment.py new file mode 100644 index 00000000..673afeac --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/environment.py @@ -0,0 +1,157 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from os import PathLike +from pathlib import Path +from typing import Dict, Optional, Union + +from ..._utils.utils import load_yaml +from ...constants._common import FILE_PREFIX, DefaultOpenEncoding +from ...entities._validation import MutableValidationResult, ValidationResultBuilder + + +class InternalEnvironment: + # conda section + CONDA_DEPENDENCIES = "conda_dependencies" + CONDA_DEPENDENCIES_FILE = "conda_dependencies_file" + PIP_REQUIREMENTS_FILE = "pip_requirements_file" + DEFAULT_PYTHON_VERSION = "3.8.5" + # docker section + BUILD = "build" + DOCKERFILE = "dockerfile" + + def __init__( + self, + docker: Optional[Dict] = None, + conda: Optional[Dict] = None, + os: Optional[str] = None, + name: Optional[str] = None, + version: Optional[str] = None, + python: Optional[Dict] = None, + ): + self.docker = docker + self.conda = conda + self.os = os if os else "Linux" + self.name = name + self.version = version + self.python = python + self._docker_file_resolved = False + + @staticmethod + def _parse_file_path(value: str) -> str: + return value[len(FILE_PREFIX) :] if value.startswith(FILE_PREFIX) else value + + def _validate_conda_section( + self, base_path: Union[str, PathLike], skip_path_validation: bool + ) -> MutableValidationResult: + validation_result = ValidationResultBuilder.success() + if not self.conda: + return validation_result + dependencies_field_names = {self.CONDA_DEPENDENCIES, self.CONDA_DEPENDENCIES_FILE, self.PIP_REQUIREMENTS_FILE} + if len(set(self.conda) & dependencies_field_names) > 1: + validation_result.append_warning( + yaml_path="conda", + message="Duplicated declaration of dependencies, will honor in the order " + "conda_dependencies, conda_dependencies_file and pip_requirements_file.", + ) + if self.conda.get(self.CONDA_DEPENDENCIES_FILE): + conda_dependencies_file = self.conda[self.CONDA_DEPENDENCIES_FILE] + if not skip_path_validation and not (Path(base_path) / conda_dependencies_file).is_file(): + validation_result.append_error( + yaml_path=f"conda.{self.CONDA_DEPENDENCIES_FILE}", + message=f"Cannot find conda dependencies file: {conda_dependencies_file!r}", + ) + if self.conda.get(self.PIP_REQUIREMENTS_FILE): + pip_requirements_file = self.conda[self.PIP_REQUIREMENTS_FILE] + if not skip_path_validation and not (Path(base_path) / pip_requirements_file).is_file(): + validation_result.append_error( + yaml_path=f"conda.{self.PIP_REQUIREMENTS_FILE}", + message=f"Cannot find pip requirements file: {pip_requirements_file!r}", + ) + return validation_result + + def _validate_docker_section( + self, base_path: Union[str, PathLike], skip_path_validation: bool + ) -> MutableValidationResult: + validation_result = ValidationResultBuilder.success() + if not self.docker: + return validation_result + if not self.docker.get(self.BUILD) or not self.docker[self.BUILD].get(self.DOCKERFILE): + return validation_result + dockerfile_file = self.docker[self.BUILD][self.DOCKERFILE] + dockerfile_file = self._parse_file_path(dockerfile_file) + if ( + not self._docker_file_resolved + and not skip_path_validation + and not (Path(base_path) / dockerfile_file).is_file() + ): + validation_result.append_error( + yaml_path=f"docker.{self.BUILD}.{self.DOCKERFILE}", + message=f"Dockerfile not exists: {dockerfile_file}", + ) + return validation_result + + def validate(self, base_path: Union[str, PathLike], skip_path_validation: bool = False) -> MutableValidationResult: + """Validate the environment section. + + This is a public method but won't be exposed to user given InternalEnvironment is an internal class. + + :param base_path: The base path + :type base_path: Union[str, PathLike] + :param skip_path_validation: Whether to skip path validation. Defaults to False + :type skip_path_validation: bool + :return: The validation result + :rtype: MutableValidationResult + """ + validation_result = ValidationResultBuilder.success() + if self.os is not None and self.os not in {"Linux", "Windows", "linux", "windows"}: + validation_result.append_error( + yaml_path="os", + message=f"Only support 'Linux' and 'Windows', but got {self.os!r}", + ) + validation_result.merge_with(self._validate_conda_section(base_path, skip_path_validation)) + validation_result.merge_with(self._validate_docker_section(base_path, skip_path_validation)) + return validation_result + + def _resolve_conda_section(self, base_path: Union[str, PathLike]) -> None: + if not self.conda: + return + if self.conda.get(self.CONDA_DEPENDENCIES_FILE): + conda_dependencies_file = self.conda.pop(self.CONDA_DEPENDENCIES_FILE) + self.conda[self.CONDA_DEPENDENCIES] = load_yaml(Path(base_path) / conda_dependencies_file) + return + if self.conda.get(self.PIP_REQUIREMENTS_FILE): + pip_requirements_file = self.conda.pop(self.PIP_REQUIREMENTS_FILE) + with open(Path(base_path) / pip_requirements_file, encoding=DefaultOpenEncoding.READ) as f: + pip_requirements = f.read().splitlines() + self.conda = { + self.CONDA_DEPENDENCIES: { + "name": "project_environment", + "dependencies": [ + f"python={self.DEFAULT_PYTHON_VERSION}", + { + "pip": pip_requirements, + }, + ], + } + } + return + + def _resolve_docker_section(self, base_path: Union[str, PathLike]) -> None: + if not self.docker: + return + if not self.docker.get(self.BUILD) or not self.docker[self.BUILD].get(self.DOCKERFILE): + return + dockerfile_file = self.docker[self.BUILD][self.DOCKERFILE] + if not dockerfile_file.startswith(FILE_PREFIX): + return + dockerfile_file = self._parse_file_path(dockerfile_file) + with open(Path(base_path) / dockerfile_file, "r", encoding=DefaultOpenEncoding.READ) as f: + self.docker[self.BUILD][self.DOCKERFILE] = f.read() + self._docker_file_resolved = True + return + + def resolve(self, base_path: Union[str, PathLike]) -> None: + self._resolve_conda_section(base_path) + self._resolve_docker_section(base_path) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/node.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/node.py new file mode 100644 index 00000000..89fc032c --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/node.py @@ -0,0 +1,338 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +# pylint: disable=protected-access + +from enum import Enum +from typing import Dict, List, Optional, Union + +from marshmallow import Schema + +from ... import Input, Output +from ..._schema import PathAwareSchema +from ...constants import JobType +from ...entities import Component, Job +from ...entities._builders import BaseNode +from ...entities._job.pipeline._io import NodeInput, NodeOutput, PipelineInput +from ...entities._util import convert_ordered_dict_to_dict +from .._schema.component import NodeType + + +class InternalBaseNode(BaseNode): + """Base class for node of internal components in pipeline. Can be instantiated directly. + + :param type: Type of pipeline node + :type type: str + :param component: Id or instance of the component version to be run for the step + :type component: Union[Component, str] + :param inputs: Inputs to the node. + :type inputs: Dict[str, Union[Input, str, bool, int, float, Enum, dict]] + :param outputs: Mapping of output data bindings used in the job. + :type outputs: Dict[str, Union[str, Output, dict]] + :param properties: The job property dictionary. + :type properties: dict[str, str] + :param compute: Compute definition containing the compute information for the step + :type compute: str + """ + + def __init__( + self, + *, + type: str = JobType.COMPONENT, # pylint: disable=redefined-builtin + component: Union[Component, str], + inputs: Optional[ + Dict[ + str, + Union[ + PipelineInput, + NodeOutput, + Input, + str, + bool, + int, + float, + Enum, + "Input", + ], + ] + ] = None, + outputs: Optional[Dict[str, Union[str, Output, "Output"]]] = None, + properties: Optional[Dict] = None, + compute: Optional[str] = None, + **kwargs, + ): + kwargs.pop("type", None) + BaseNode.__init__( + self, + type=type, + component=component, # type: ignore[arg-type] + # TODO: Bug 2881892 + inputs=inputs, + outputs=outputs, + compute=compute, + properties=properties, + **kwargs, + ) + + @property + def _skip_required_compute_missing_validation(self) -> bool: + return True + + def _to_node(self, context: Optional[Dict] = None, **kwargs) -> BaseNode: + return self + + def _to_component(self, context: Optional[Dict] = None, **kwargs) -> Component: + return self.component + + def _to_job(self) -> Job: + raise RuntimeError("Internal components doesn't support to job") + + @classmethod + def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs) -> "Job": + raise RuntimeError("Internal components doesn't support load from dict") + + @classmethod + def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]: + from .._schema.node import InternalBaseNodeSchema + + return InternalBaseNodeSchema(context=context) + + @property + def component(self) -> Component: + return self._component + + def _to_rest_inputs(self) -> Dict[str, Dict]: + rest_dataset_literal_inputs = super(InternalBaseNode, self)._to_rest_inputs() + for input_name, input_value in self.inputs.items(): + # hack: remove unfilled input from rest object instead a default input of {"job_input_type": "literal"} + # note that this hack is not always effective as _data will be set to Input() when visiting input_value.type + if ( + isinstance(input_value, NodeInput) + and input_value._data is None + and input_name in rest_dataset_literal_inputs + ): + del rest_dataset_literal_inputs[input_name] + return rest_dataset_literal_inputs + + def _to_rest_object(self, **kwargs) -> dict: + base_dict = super(InternalBaseNode, self)._to_rest_object(**kwargs) + for key in ["name", "display_name", "tags"]: + if key in base_dict: + del base_dict[key] + for key in ["computeId"]: + if key in base_dict and base_dict[key] is None: + del base_dict[key] + + base_dict.update( + convert_ordered_dict_to_dict( + { + "componentId": self._get_component_id(), + "type": self.type, + } + ) + ) + return base_dict + + +class DataTransfer(InternalBaseNode): + def __init__(self, **kwargs): + kwargs.pop("type", None) + super(DataTransfer, self).__init__(type=NodeType.DATA_TRANSFER, **kwargs) + + +class HDInsight(InternalBaseNode): + def __init__(self, **kwargs): + kwargs.pop("type", None) + super(HDInsight, self).__init__(type=NodeType.HDI, **kwargs) + self._init = True + self._compute_name: str = kwargs.pop("compute_name", None) + self._queue: str = kwargs.pop("queue", None) + self._driver_memory: str = kwargs.pop("driver_memory", None) + self._driver_cores: int = kwargs.pop("driver_cores", None) + self._executor_memory: str = kwargs.pop("executor_memory", None) + self._executor_cores: int = kwargs.pop("executor_cores", None) + self._number_executors: int = kwargs.pop("number_executors", None) + self._conf: Union[dict, str] = kwargs.pop("conf", None) + self._hdinsight_spark_job_name: str = kwargs.pop("hdinsight_spark_job_name", None) + self._init = False + + @property + def compute_name(self) -> str: + """Name of the compute to be used. + + :return: Compute name + :rtype: str + """ + return self._compute_name + + @compute_name.setter + def compute_name(self, value: str): + self._compute_name = value + + @property + def queue(self) -> str: + """The name of the YARN queue to which submitted. + + :return: YARN queue name + :rtype: str + """ + return self._queue + + @queue.setter + def queue(self, value: str): + self._queue = value + + @property + def driver_memory(self) -> str: + """Amount of memory to use for the driver process. + + It's the same format as JVM memory strings. Use lower-case suffixes, e.g. k, m, g, t, and p, for kilobyte, + megabyte, gigabyte and terabyte respectively. Example values are 10k, 10m and 10g. + + :return: Amount of memory to use for the driver process + :rtype: str + """ + return self._driver_memory + + @driver_memory.setter + def driver_memory(self, value: str): + self._driver_memory = value + + @property + def driver_cores(self) -> int: + """Number of cores to use for the driver process. + + :return: Number of cores to use for the driver process. + :rtype: int + """ + return self._driver_cores + + @driver_cores.setter + def driver_cores(self, value: int): + self._driver_cores = value + + @property + def executor_memory(self) -> str: + """Amount of memory to use per executor process. + + It's the same format as JVM memory strings. Use lower-case suffixes, e.g. k, m, g, t, and p, for kilobyte, + megabyte, gigabyte and terabyte respectively. Example values are 10k, 10m and 10g. + + :return: The executor memory + :rtype: str + """ + return self._executor_memory + + @executor_memory.setter + def executor_memory(self, value: str): + self._executor_memory = value + + @property + def executor_cores(self) -> int: + """Number of cores to use for each executor. + + :return: The number of cores to use for each executor + :rtype: int + """ + return self._executor_cores + + @executor_cores.setter + def executor_cores(self, value: int): + self._executor_cores = value + + @property + def number_executors(self) -> int: + """Number of executors to launch for this session. + + :return: The number of executors to launch + :rtype: int + """ + return self._number_executors + + @number_executors.setter + def number_executors(self, value: int): + self._number_executors = value + + @property + def conf(self) -> Union[dict, str]: + """Spark configuration properties. + + :return: The spark configuration properties. + :rtype: Union[dict, str] + """ + return self._conf + + @conf.setter + def conf(self, value: Union[dict, str]): + self._conf = value + + @property + def hdinsight_spark_job_name(self) -> str: + """ + + :return: The name of this session + :rtype: str + """ + return self._hdinsight_spark_job_name + + @hdinsight_spark_job_name.setter + def hdinsight_spark_job_name(self, value: str): + self._hdinsight_spark_job_name = value + + @classmethod + def _picked_fields_from_dict_to_rest_object(cls) -> List[str]: + return [ + "compute_name", + "queue", + "driver_cores", + "executor_memory", + "conf", + "hdinsight_spark_job_name", + "driver_memory", + "executor_cores", + "number_executors", + ] + + @classmethod + def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]: + from .._schema.node import HDInsightSchema + + return HDInsightSchema(context=context) + + +class Starlite(InternalBaseNode): + def __init__(self, **kwargs): + kwargs.pop("type", None) + super(Starlite, self).__init__(type=NodeType.STARLITE, **kwargs) + + +class Pipeline(InternalBaseNode): + # this is only for using registered pipeline component + def __init__(self, **kwargs): + kwargs.pop("type", None) + super(Pipeline, self).__init__(type=NodeType.PIPELINE, **kwargs) + + +class Hemera(InternalBaseNode): + def __init__(self, **kwargs): + kwargs.pop("type", None) + super(Hemera, self).__init__(type=NodeType.HEMERA, **kwargs) + + +class Ae365exepool(InternalBaseNode): + def __init__(self, **kwargs): + kwargs.pop("type", None) + super(Ae365exepool, self).__init__(type=NodeType.AE365EXEPOOL, **kwargs) + + +class Sweep(InternalBaseNode): + # this is not in our scope + def __init__(self, **kwargs): + kwargs.pop("type", None) + super(Sweep, self).__init__(type=NodeType.SWEEP, **kwargs) + + +class AetherBridge(InternalBaseNode): + def __init__(self, **kwargs): + kwargs.pop("type", None) + super(AetherBridge, self).__init__(type=NodeType.AETHER_BRIDGE, **kwargs) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/parallel.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/parallel.py new file mode 100644 index 00000000..86fa8939 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/parallel.py @@ -0,0 +1,114 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from typing import List, Union + +from marshmallow import Schema + +from ..._schema import PathAwareSchema +from ...entities import BatchRetrySettings +from .._schema.component import NodeType +from ..entities import Command + + +class Parallel(Command): + """Node of scope components in pipeline with specific run settings.""" + + def __init__(self, **kwargs): + kwargs.pop("type", None) + super(Parallel, self).__init__(type=NodeType.PARALLEL, **kwargs) + self._init = True + self._max_concurrency_per_instance = kwargs.pop("max_concurrency_per_instance", None) + self._error_threshold = kwargs.pop("error_threshold", None) + self._mini_batch_size = kwargs.pop("mini_batch_size", None) + self._partition_keys = kwargs.pop("partition_keys", None) + self._logging_level = kwargs.pop("logging_level", None) + self._retry_settings = kwargs.pop("retry_settings", BatchRetrySettings()) + self._init = False + + @property + def max_concurrency_per_instance(self) -> int: + """The max parallellism that each compute instance has. + + :return: The max concurrence per compute instance + :rtype: int + """ + return self._max_concurrency_per_instance + + @max_concurrency_per_instance.setter + def max_concurrency_per_instance(self, value: int): + self._max_concurrency_per_instance = value + + @property + def error_threshold(self) -> int: + """The number of record failures for Tabular Dataset and file failures for File Dataset that should be ignored + during processing. + + If the error count goes above this value, then the job will be aborted. Error threshold is for the entire input + rather than the individual mini-batch sent to run() method. The range is [-1, int.max]. -1 indicates ignore all + failures during processing. + + :return: The error threshold + :rtype: int + """ + return self._error_threshold + + @error_threshold.setter + def error_threshold(self, value: int): + self._error_threshold = value + + @property + def mini_batch_size(self) -> int: + """The number of records to be sent to run() method for each mini-batch. + + :return: The batch size + :rtype: int + """ + return self._mini_batch_size + + @mini_batch_size.setter + def mini_batch_size(self, value: int): + self._mini_batch_size = value + + @property + def logging_level(self) -> str: + """A string of the logging level name. + + :return: The loggin level + :rtype: str + """ + return self._logging_level + + @logging_level.setter + def logging_level(self, value: str): + self._logging_level = value + + @property + def retry_settings(self) -> BatchRetrySettings: + """Parallel job run failed retry. + + :return: The retry settings + :rtype: BatchRetrySettings + """ + return self._retry_settings + + @retry_settings.setter + def retry_settings(self, value: BatchRetrySettings): + self._retry_settings = value + + @classmethod + def _picked_fields_from_dict_to_rest_object(cls) -> List[str]: + return Command._picked_fields_from_dict_to_rest_object() + [ + "max_concurrency_per_instance", + "error_threshold", + "logging_level", + "retry_settings", + "mini_batch_size", + ] + + @classmethod + def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]: + from .._schema.command import ParallelSchema + + return ParallelSchema(context=context) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/__init__.py new file mode 100644 index 00000000..76c94c17 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/__init__.py @@ -0,0 +1,29 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from .ai_super_computer_configuration import ( + AISuperComputerConfiguration, + AISuperComputerScalePolicy, + AISuperComputerStorageReferenceConfiguration, +) +from .itp_configuration import ( + ITPConfiguration, + ITPInteractiveConfiguration, + ITPPriorityConfiguration, + ITPResourceConfiguration, + ITPRetrySettings, +) +from .target_selector import TargetSelector + +__all__ = [ + "ITPInteractiveConfiguration", + "ITPPriorityConfiguration", + "ITPResourceConfiguration", + "ITPRetrySettings", + "ITPConfiguration", + "TargetSelector", + "AISuperComputerConfiguration", + "AISuperComputerScalePolicy", + "AISuperComputerStorageReferenceConfiguration", +] diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/ai_super_computer_configuration.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/ai_super_computer_configuration.py new file mode 100644 index 00000000..89f338ca --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/ai_super_computer_configuration.py @@ -0,0 +1,194 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from typing import Any, Dict, List, Optional + +from ....entities._job.job_resource_configuration import BaseProperty + + +class PascalCaseProperty(BaseProperty): + _KEY_MAPPING: Dict[str, Any] = {} + + def items(self): + result = [] + for key, value in super().items(): + if key.lower() in self._KEY_MAPPING: + key = self._KEY_MAPPING[key.lower()] + result.append((key, value)) + return result + + +class AISuperComputerStorageReferenceConfiguration(PascalCaseProperty): # pylint: disable=name-too-long + _KEY_MAPPING = { + "container_name": "ContainerName", + "relative_path": "RelativePath", + } + + def __init__( + self, + container_name: str, + relative_path: str, + **kwargs, + ): + """ + :param container_name: The name of the ai-super-computer storage container. + :type container_name: str + :param relative_path: The path on the ai-super-computer storage container. + :type relative_path: str + """ + super().__init__(**kwargs) + self.container_name = container_name + self.relative_path = relative_path + + +class AISuperComputerScalePolicy(PascalCaseProperty): + _KEY_MAPPING = { + "auto_scale_instance_type_count_set": "AutoScaleInstanceTypeCountSet", + "auto_scale_interval_in_sec": "AutoScaleIntervalInSec", + "max_instance_type_count": "MaxInstanceTypeCount", + "min_instance_type_count": "MinInstanceTypeCount", + } + + def __init__( + self, + auto_scale_instance_type_count_set: Optional[List[int]] = None, + auto_scale_interval_in_sec: Optional[int] = None, + max_instance_type_count: Optional[int] = None, + min_instance_type_count: Optional[int] = None, + **kwargs, + ): + """ + :param auto_scale_instance_type_count_set: The list of instance type counts available + for elastically scaling the job. Assume currentInstanceTypeCount = 4 and + autoScaleInstanceTypeCountSet = [2,4,8], the job will automatically scale down as 8->4->2 + when less capacity is available, and scale up as 2->4->8 when more capacity is available. + The value should be a list of integers in ascending order. + :type auto_scale_instance_type_count_set: List[int] + :param auto_scale_interval_in_sec: The minimum interval in seconds between job autoscaling. + You are recommended to set the autoScaleIntervalInSec longer than the checkpoint interval, + to make sure at least one checkpoint is saved before auto-scaling of the job. + :type auto_scale_interval_in_sec: int + :param max_instance_type_count: The maximum instance type count. + :type max_instance_type_count: int + :param min_instance_type_count: The minimum instance type count. + :type min_instance_type_count: int + """ + super().__init__(**kwargs) + self.auto_scale_instance_type_count_set = auto_scale_instance_type_count_set + self.auto_scale_interval_in_sec = auto_scale_interval_in_sec + self.max_instance_type_count = max_instance_type_count + self.min_instance_type_count = min_instance_type_count + + +class AISuperComputerConfiguration(PascalCaseProperty): # pylint: disable=too-many-instance-attributes + """A class to manage AI Super Computer Configuration.""" + + _KEY_MAPPING = { + "instance_type": "InstanceType", + "instance_types": "InstanceTypes", + "image_version": "ImageVersion", + "location": "Location", + "locations": "Locations", + "ai_super_computer_storage_data": "AISuperComputerStorageData", + "interactive": "Interactive", + "scale_policy": "ScalePolicy", + "virtual_cluster_arm_id": "VirtualClusterArmId", + "tensorboard_log_directory": "TensorboardLogDirectory", + "ssh_public_key": "SSHPublicKey", + "ssh_public_keys": "SSHPublicKeys", + "enable_azml_int": "EnableAzmlInt", + "priority": "Priority", + "sla_tier": "SLATier", + "suspend_on_idle_time_hours": "SuspendOnIdleTimeHours", + "user_alias": "UserAlias", + } + + def __init__( + self, + instance_type: Optional[str] = None, + instance_types: Optional[List[str]] = None, + image_version: Optional[str] = None, + location: Optional[str] = None, + locations: Optional[List[str]] = None, + ai_super_computer_storage_data: Optional[Dict[str, AISuperComputerStorageReferenceConfiguration]] = None, + interactive: Optional[bool] = None, + scale_policy: Optional[AISuperComputerScalePolicy] = None, + virtual_cluster_arm_id: Optional[str] = None, + tensorboard_log_directory: Optional[str] = None, + ssh_public_key: Optional[str] = None, + ssh_public_keys: Optional[List[str]] = None, + enable_azml_int: Optional[bool] = None, + priority: Optional[str] = None, + sla_tier: Optional[str] = None, + suspend_on_idle_time_hours: Optional[int] = None, + user_alias: Optional[str] = None, + **kwargs, + ): + """ + :param instance_type: The class of compute to be used. The list of instance types is + available in https://singularitydocs.azurewebsites.net/docs/overview/instance_types/ + :type instance_type: str + :param instance_types: The class of compute to be used. The list of instance types is + available in https://singularitydocs.azurewebsites.net/docs/overview/instance_types/ + :type instance_types: List[str] + :param image_version: The image to use in ai-super-computer. Currently only a limited set of predefined + images are supported. + :type image_version: str + :param location: The location (region) where the job will run. The workspace region is used + if neither location nor locations is specified. + :type location: str + :param locations: The location (region) where the job will run. The workspace region is used + if neither location nor locations is specified. + :type locations: List[str] + :param ai_super_computer_storage_data: All of the AI SuperComputer storage data sources to + be made available to the run based on the configurations. + :type ai_super_computer_storage_data: Dict[str, AISuperComputerStorageReferenceConfiguration] + :param interactive: Specifies whether the job should be interactive. Interactive jobs will + start the requested nodes, but not run a command. + :type interactive: bool + :param scale_policy: The elasticity options for a job. By leveraging elastic training, + the job will automatically scale up when there is extra capacity available, + and automatically scale down when resources are gradually called back. + :type scale_policy: AISuperComputerScalePolicy + :param virtual_cluster_arm_id: The ARM Resource Id for the Virtual Cluster to submit the + job to. + :type virtual_cluster_arm_id: str + :param tensorboard_log_directory: The directory where the Tensorboard logs will be written. + :type tensorboard_log_directory: str + :param ssh_public_key: The SSH Public Key to use when enabling SSH access to the job. + If not specified, username/password auth will be enabled. + :type ssh_public_key: str + :param ssh_public_keys: The SSH Public Key to use when enabling SSH access to the job. + If not specified, username/password auth will be enabled. + :type ssh_public_keys: List[str] + :param enable_azml_int: Specifies whether the job should include the azml_int utility + :type enable_azml_int: bool + :param priority: The priority of the job. The default value is Medium. + :type priority: str + :param sla_tier: The SLA tier of the job. The default value is Standard. + :type sla_tier: str + :param suspend_on_idle_time_hours: Minimum idle time before run gets automatically suspended + (in hours). + :type suspend_on_idle_time_hours: int + :param user_alias: User alias, used for naming mount paths. + :type user_alias: str + """ + super().__init__(**kwargs) + self.instance_type = instance_type + self.instance_types = instance_types + self.image_version = image_version + self.location = location + self.locations = locations + self.ai_super_computer_storage_data = ai_super_computer_storage_data + self.interactive = interactive + self.scale_policy = scale_policy + self.virtual_cluster_arm_id = virtual_cluster_arm_id + self.tensorboard_log_directory = tensorboard_log_directory + self.ssh_public_key = ssh_public_key + self.ssh_public_keys = ssh_public_keys + self.enable_azml_int = enable_azml_int + self.priority = priority + self.sla_tier = sla_tier + self.suspend_on_idle_time_hours = suspend_on_idle_time_hours + self.user_alias = user_alias diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/itp_configuration.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/itp_configuration.py new file mode 100644 index 00000000..8868b33b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/itp_configuration.py @@ -0,0 +1,137 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +from typing import List, Optional + +from ....entities._job.job_resource_configuration import BaseProperty + + +class ITPResourceConfiguration(BaseProperty): + """ITP resource configuration.""" + + def __init__( + self, + gpu_count: Optional[int] = None, + cpu_count: Optional[int] = None, + memory_request_in_gb: Optional[int] = None, + **kwargs + ): + """ + :param gpu_count: Gpu count Defines how many gpu cores a single node gpu job will use. + Default value is 1. + :type gpu_count: int + :param cpu_count: Cpu count defines how many cpu cores that a single node cpu job will use. + Default value is 1. + :type cpu_count: int + :param memory_request_in_gb: Memory request defines how much GB memory a single node job + will request. Default value is 0 which means we will automatically calculate it for user. + :type memory_request_in_gb: int + """ + super().__init__(**kwargs) + self.gpu_count = gpu_count + self.cpu_count = cpu_count + self.memory_request_in_gb = memory_request_in_gb + + +class ITPPriorityConfiguration(BaseProperty): + """ITP priority configuration.""" + + def __init__( + self, + job_priority: Optional[int] = None, + is_preemptible: Optional[bool] = None, + node_count_set: Optional[List[int]] = None, + scale_interval: Optional[int] = None, + **kwargs + ): + """ + :param job_priority: The priority of a job. Default value is 200. User can set it to + 100~200. Any value larger than 200 or less than 100 will be treated as 200. + in azureml.components + :type job_priority: int + :param is_preemptible: Whether to preempt extra compute resources beyond the VC quota. + Default value is false. + in azureml.components + :type is_preemptible: bool + :param node_count_set: Node count set determines how compute auto-scale nodes. The value + should be a list of integers in ascending order. And Only available when IsPreemptible is + true. + :type node_count_set: List[int] + :param scale_interval: Scale interval in min. + :type scale_interval: int + """ + super().__init__(**kwargs) + self.job_priority = job_priority + self.is_preemptible = is_preemptible + self.node_count_set = node_count_set + self.scale_interval = scale_interval + + +class ITPInteractiveConfiguration(BaseProperty): + """ITP interactive configuration.""" + + def __init__( + self, + is_ssh_enabled: Optional[bool] = None, + ssh_public_key: Optional[str] = None, + is_i_python_enabled: Optional[bool] = None, + is_tensor_board_enabled: Optional[bool] = None, + interactive_port: Optional[int] = None, + **kwargs + ): + """ + :param is_ssh_enabled: Whether to enable SSH for interactive development. + Default value is false. + :type is_ssh_enabled: bool + :param ssh_public_key: SSH public key. + :type ssh_public_key: str + :param is_i_python_enabled: Is iPython enabled. + :type is_i_python_enabled: bool + :param is_tensor_board_enabled: Whether to enable TensorBoard. Default value is false. + + :type is_tensor_board_enabled: bool + :param interactive_port: Allows user to specify a different interactive port. Available + value from 40000 to 49999. + :type interactive_port: int + """ + super().__init__(**kwargs) + self.is_ssh_enabled = is_ssh_enabled + self.ssh_public_key = ssh_public_key + self.is_i_python_enabled = is_i_python_enabled + self.is_tensor_board_enabled = is_tensor_board_enabled + self.interactive_port = interactive_port + + +class ITPRetrySettings(BaseProperty): + def __init__(self, max_retry_count=None, **kwargs): + super().__init__(**kwargs) + self.max_retry_count = max_retry_count + + +class ITPConfiguration(BaseProperty): + """ITP configuration.""" + + def __init__( + self, + resource_configuration: Optional[ITPResourceConfiguration] = None, + priority_configuration: Optional[ITPPriorityConfiguration] = None, + interactive_configuration: Optional[ITPInteractiveConfiguration] = None, + retry: Optional[ITPRetrySettings] = None, + **kwargs + ): + """ + :param resource_configuration: Resource requirement for the compute. + + :type resource_configuration: ITPResourceConfiguration + :param priority_configuration: Priority requirement for the compute. + + :type priority_configuration: ITPPriorityConfiguration + :param interactive_configuration: Interactive configuration when trying to access the + compute. + :type interactive_configuration: ITPInteractiveConfiguration + """ + self.resource_configuration = resource_configuration or ITPResourceConfiguration() + self.priority_configuration = priority_configuration or ITPPriorityConfiguration() + self.interactive_configuration = interactive_configuration or ITPInteractiveConfiguration() + self.retry = retry or ITPRetrySettings() + super().__init__(**kwargs) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/target_selector.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/target_selector.py new file mode 100644 index 00000000..92f72db7 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/target_selector.py @@ -0,0 +1,47 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from typing import List, Optional + +from ....entities._job.job_resource_configuration import BaseProperty + + +class TargetSelector(BaseProperty): + """Compute target selector.""" + + def __init__( + self, + compute_type: str, + instance_types: Optional[List[str]] = None, + regions: Optional[List[str]] = None, + my_resource_only: Optional[bool] = None, + allow_spot_vm: Optional[bool] = None, + **kwargs, + ): + """ + :param compute_type: Compute type that target selector could route job to. + Example value: AmlCompute, AmlK8s. + :type compute_type: str + :param instance_types: List of instance_type that job could use. If no instance_types sre + specified, all sizes are allowed. Note instance_types here only contains VM SKU. + Example value: ["STANDARD_D2_V2", "ND24rs_v3"]. Note, this field is case sensitive. + :type instance_types: List[str] + :param regions: List of regions that would like to submit job to. + If no regions are specified, all regions are allowed. Example value: ["eastus"]. + Currently it only works for ITP. + :type regions: List[str] + :param my_resource_only: Flag to control whether the job should be sent to the cluster + owned by user. If False, target selector may send the job to shared cluster. Currently it + only works for ITP. + :type my_resource_only: bool + :param allow_spot_vm: Flag to enable target selector service to send job to low priority VM. + Currently it only works for ITP. + :type allow_spot_vm: bool + """ + super().__init__(**kwargs) + self.compute_type = compute_type + self.instance_types = instance_types + self.regions = regions + self.my_resource_only = my_resource_only + self.allow_spot_vm = allow_spot_vm diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/scope.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/scope.py new file mode 100644 index 00000000..9965d69f --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/scope.py @@ -0,0 +1,131 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from typing import List, Union + +from marshmallow import Schema + +from ..._schema import PathAwareSchema +from .._schema.component import NodeType +from ..entities.node import InternalBaseNode + + +class Scope(InternalBaseNode): + """Node of scope components in pipeline with specific run settings.""" + + def __init__(self, **kwargs): + kwargs.pop("type", None) + super(Scope, self).__init__(type=NodeType.SCOPE, **kwargs) + self._init = True + self._adla_account_name = kwargs.pop("adla_account_name", None) + self._scope_param = kwargs.pop("scope_param", None) + self._custom_job_name_suffix = kwargs.pop("custom_job_name_suffix", None) + self._priority = kwargs.pop("priority", None) + self._auto_token = kwargs.pop("auto_token", None) + self._tokens = kwargs.pop("tokens", None) + self._vcp = kwargs.pop("vcp", None) + self._init = False + + @property + def adla_account_name(self) -> str: + """The ADLA account name to use for the scope job. + + :return: ADLA account name + :rtype: str + """ + return self._adla_account_name + + @adla_account_name.setter + def adla_account_name(self, value: str): + self._adla_account_name = value + + @property + def scope_param(self) -> str: + """nebula command used when submit the scope job. + + :return: The nebula command + :rtype: str + """ + return self._scope_param + + @scope_param.setter + def scope_param(self, value: str): + self._scope_param = value + + @property + def custom_job_name_suffix(self) -> str: + """Optional string to append to scope job name. + + :return: The custom suffix + :rtype: str + """ + return self._custom_job_name_suffix + + @custom_job_name_suffix.setter + def custom_job_name_suffix(self, value: str): + self._custom_job_name_suffix = value + + @property + def priority(self) -> int: + """scope job priority. + + If set priority in scope_param, will override this setting. + + :return: The job priority + :rtype: int + """ + return self._priority + + @priority.setter + def priority(self, value: int): + self._priority = value + + @property + def auto_token(self) -> int: + """A predictor for estimating the peak resource usage of scope job. + + :return: auto token + :rtype: int + """ + return self._auto_token + + @auto_token.setter + def auto_token(self, value: int): + self._auto_token = value + + @property + def tokens(self) -> int: + """Standard token allocation. + + :return: The token allocation + :rtype: int + """ + return self._tokens + + @tokens.setter + def tokens(self, value: int): + self._tokens = value + + @property + def vcp(self) -> float: + """Standard VC percent allocation; should be a float between 0 and 1. + + :return: The VC allocation + :rtype: float + """ + return self._vcp + + @vcp.setter + def vcp(self, value: float): + self._vcp = value + + @classmethod + def _picked_fields_from_dict_to_rest_object(cls) -> List[str]: + return ["custom_job_name_suffix", "scope_param", "adla_account_name", "priority", "auto_token", "tokens", "vcp"] + + @classmethod + def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]: + from .._schema.node import ScopeSchema + + return ScopeSchema(context=context) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/spark.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/spark.py new file mode 100644 index 00000000..345fa5f2 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/spark.py @@ -0,0 +1,192 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +from typing import Dict, List, Optional, Union + +from marshmallow import Schema + +from ..._schema import PathAwareSchema +from ...constants._job.job import RestSparkConfKey +from ...entities import Environment, SparkJobEntry +from ...entities._job.parameterized_spark import DUMMY_IMAGE, ParameterizedSpark +from ...entities._job.spark_job_entry_mixin import SparkJobEntryMixin +from .._schema.component import InternalSparkComponentSchema +from ..entities import InternalComponent +from .environment import InternalEnvironment + + +class InternalSparkComponent( + InternalComponent, ParameterizedSpark, SparkJobEntryMixin +): # pylint: disable=too-many-instance-attributes, too-many-ancestors + """Internal Spark Component + This class is used to handle internal spark component. + It can be loaded from internal spark component yaml or from rest object of an internal spark component. + But after loaded, its structure will be the same as spark component. + """ + + def __init__( + self, + entry: Union[Dict[str, str], SparkJobEntry, None] = None, + py_files: Optional[List[str]] = None, + jars: Optional[List[str]] = None, + files: Optional[List[str]] = None, + archives: Optional[List[str]] = None, + driver_cores: Optional[int] = None, + driver_memory: Optional[str] = None, + executor_cores: Optional[int] = None, + executor_memory: Optional[str] = None, + executor_instances: Optional[int] = None, + dynamic_allocation_enabled: Optional[bool] = None, + dynamic_allocation_min_executors: Optional[int] = None, + dynamic_allocation_max_executors: Optional[int] = None, + conf: Optional[Dict[str, str]] = None, + args: Optional[str] = None, + **kwargs, + ): + SparkJobEntryMixin.__init__(self, entry=entry, **kwargs) + # environment.setter has been overridden in ParameterizedSpark, so we need to pop it out here + environment = kwargs.pop("environment", None) + InternalComponent.__init__(self, **kwargs) + # Pop it to avoid passing multiple values for code in ParameterizedSpark.__init__ + code = kwargs.pop("code", None) + ParameterizedSpark.__init__( + self, + code=self.base_path, + entry=entry, + py_files=py_files, + jars=jars, + files=files, + archives=archives, + conf=conf, + environment=environment, + args=args, + **kwargs, + ) + self.code = code + # For pipeline spark job, we also allow user to set driver_cores, driver_memory and so on by setting conf. + # If root level fields are not set by user, we promote conf setting to root level to facilitate subsequent + # verification. This usually happens when we use to_component(SparkJob) or builder function spark() as a node + # in pipeline sdk + conf = conf or {} + self.driver_cores = driver_cores or conf.get(RestSparkConfKey.DRIVER_CORES, None) + self.driver_memory = driver_memory or conf.get(RestSparkConfKey.DRIVER_MEMORY, None) + self.executor_cores = executor_cores or conf.get(RestSparkConfKey.EXECUTOR_CORES, None) + self.executor_memory = executor_memory or conf.get(RestSparkConfKey.EXECUTOR_MEMORY, None) + self.executor_instances = executor_instances or conf.get(RestSparkConfKey.EXECUTOR_INSTANCES, None) + self.dynamic_allocation_enabled = dynamic_allocation_enabled or conf.get( + RestSparkConfKey.DYNAMIC_ALLOCATION_ENABLED, None + ) + self.dynamic_allocation_min_executors = dynamic_allocation_min_executors or conf.get( + RestSparkConfKey.DYNAMIC_ALLOCATION_MIN_EXECUTORS, None + ) + self.dynamic_allocation_max_executors = dynamic_allocation_max_executors or conf.get( + RestSparkConfKey.DYNAMIC_ALLOCATION_MAX_EXECUTORS, None + ) + + self.conf = conf + self.args = args + + @classmethod + def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]: + return InternalSparkComponentSchema(context=context) + + @property # type: ignore[override] + def environment(self) -> Optional[Union[Environment, str]]: + """Get the environment of the component. + + :return: The environment of the component. + :rtype: Optional[Union[Environment, str]]] + """ + if isinstance(self._environment, Environment) and self._environment.image is None: + return Environment(conda_file=self._environment.conda_file, image=DUMMY_IMAGE) + return self._environment + + @environment.setter + def environment(self, value): + """Set the environment of the component. + + :param value: The environment of the component. + :type value: Union[str, Environment, dict] + :return: No return + :rtype: None + """ + if value is None or isinstance(value, (str, Environment)): + self._environment = value + elif isinstance(value, dict): + internal_environment = InternalEnvironment(**value) + internal_environment.resolve(self.base_path) + self._environment = Environment( + name=internal_environment.name, + version=internal_environment.version, + ) + if internal_environment.conda: + self._environment.conda_file = { + "dependencies": internal_environment.conda[InternalEnvironment.CONDA_DEPENDENCIES] + } + if internal_environment.docker: + self._environment.image = internal_environment.docker["image"] + # we suppose that loaded internal spark component won't be used to create another internal spark component + # so the environment construction here can be simplified + else: + raise ValueError(f"Unsupported environment type: {type(value)}") + + @property + def jars(self) -> Optional[List[str]]: + """Get the jars of the component. + + :return: The jars of the component. + :rtype: Optional[List[str]] + """ + return self._jars + + @jars.setter + def jars(self, value: Union[str, List[str]]): + """Set the jars of the component. + + :param value: The jars of the component. + :type value: Union[str, List[str]] + :return: No return + :rtype: None + """ + if isinstance(value, str): + value = [value] + self._jars = value + + @property + def py_files(self) -> Optional[List[str]]: + """Get the py_files of the component. + + :return: The py_files of the component. + :rtype: Optional[List[str]] + """ + return self._py_files + + @py_files.setter + def py_files(self, value): + """Set the py_files of the component. + + :param value: The py_files of the component. + :type value: Union[str, List[str]] + :return: No return + :rtype: None + """ + if isinstance(value, str): + value = [value] + self._py_files = value + + def _to_dict(self) -> Dict: + result = super()._to_dict() + return result + + def _to_rest_object(self): + result = super()._to_rest_object() + if "pyFiles" in result.properties.component_spec: + result.properties.component_spec["py_files"] = result.properties.component_spec.pop("pyFiles") + return result + + @classmethod + def _from_rest_object_to_init_params(cls, obj) -> Dict: + if "py_files" in obj.properties.component_spec: + obj.properties.component_spec["pyFiles"] = obj.properties.component_spec.pop("py_files") + result = super()._from_rest_object_to_init_params(obj) + return result |