about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/component.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/component.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/component.py')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/component.py370
1 files changed, 370 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/component.py
new file mode 100644
index 00000000..e54c1906
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/component.py
@@ -0,0 +1,370 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+# pylint: disable=protected-access, redefined-builtin
+# disable redefined-builtin to use id/type as argument name
+import os
+from contextlib import contextmanager
+from os import PathLike
+from pathlib import Path
+from typing import Any, Dict, Iterable, List, Optional, Union
+from uuid import UUID
+
+import yaml  # type: ignore[import]
+from marshmallow import Schema
+
+from ... import Input, Output
+from ..._restclient.v2022_10_01.models import ComponentVersion, ComponentVersionProperties
+from ..._schema import PathAwareSchema
+from ..._utils._arm_id_utils import parse_name_label
+from ..._utils._asset_utils import IgnoreFile
+from ...constants._common import DefaultOpenEncoding
+from ...entities import Component
+from ...entities._assets import Code
+from ...entities._component._additional_includes import AdditionalIncludes, AdditionalIncludesMixin
+from ...entities._component.code import ComponentIgnoreFile
+from ...entities._job.distribution import DistributionConfiguration
+from ...entities._system_data import SystemData
+from ...entities._util import convert_ordered_dict_to_dict
+from ...entities._validation import MutableValidationResult
+from .._schema.component import InternalComponentSchema
+from ._input_outputs import InternalInput, InternalOutput
+from ._merkle_tree import create_merkletree
+from .code import InternalCode
+from .environment import InternalEnvironment
+from .node import InternalBaseNode
+
+_ADDITIONAL_INCLUDES_CONFIG_KEY = "additional_includes"
+_ADDITIONAL_INCLUDES_SUFFIX = ".additional_includes"
+
+
+class InternalComponent(Component, AdditionalIncludesMixin):
+    # pylint: disable=too-many-instance-attributes, too-many-locals
+    """Base class for internal component version, used to define an internal component. Recommended to create instance
+    with component_factory.
+
+    :param name: Name of the resource.
+    :type name: str
+    :param version: Version of the resource.
+    :type version: str
+    :param id:  Global id of the resource, Azure Resource Manager ID.
+    :type id: str
+    :param type:  Type of the command, supported is 'command'.
+    :type type: str
+    :param description: Description of the resource.
+    :type description: str
+    :param tags: Tag dictionary. Tags can be added, removed, and updated.
+    :type tags: dict
+    :param properties: Internal use only.
+    :type properties: dict
+    :param display_name: Display name of the component.
+    :type display_name: str
+    :param is_deterministic: Whether the component is deterministic.
+    :type is_deterministic: bool
+    :param inputs: Inputs of the component.
+    :type inputs: dict
+    :param outputs: Outputs of the component.
+    :type outputs: dict
+    :param yaml_str: The yaml string of the component.
+    :type yaml_str: str
+    :param _schema: Schema of the component.
+    :type _schema: str
+    :param creation_context: Creation metadata of the component.
+    :type creation_context: ~azure.ai.ml.entities.SystemData
+    """
+
+    def __init__(
+        self,
+        *,
+        _schema: Optional[str] = None,
+        name: Optional[str] = None,
+        version: Optional[str] = None,
+        display_name: Optional[str] = None,
+        type: Optional[str] = None,
+        description: Optional[str] = None,
+        tags: Optional[Dict] = None,
+        is_deterministic: Optional[bool] = None,
+        successful_return_code: Optional[str] = None,
+        inputs: Optional[Dict] = None,
+        outputs: Optional[Dict] = None,
+        code: Optional[Union[str, os.PathLike]] = None,
+        environment: Optional[Dict] = None,
+        environment_variables: Optional[Dict] = None,
+        command: Optional[str] = None,
+        id: Optional[str] = None,
+        properties: Optional[Dict] = None,
+        yaml_str: Optional[str] = None,
+        creation_context: Optional[SystemData] = None,
+        scope: Optional[Dict] = None,
+        hemera: Optional[Dict] = None,
+        hdinsight: Optional[Dict] = None,
+        parallel: Optional[Dict] = None,
+        starlite: Optional[Dict] = None,
+        ae365exepool: Optional[Dict] = None,
+        launcher: Optional[Dict] = None,
+        datatransfer: Optional[Dict] = None,
+        aether: Optional[Dict] = None,
+        **kwargs,
+    ):
+        _type, self._type_label = parse_name_label(type)
+        super().__init__(
+            name=name,
+            version=version,
+            id=id,
+            type=_type,
+            description=description,
+            tags=tags,
+            properties=properties,
+            display_name=display_name,
+            is_deterministic=is_deterministic,  # type: ignore[arg-type]
+            inputs=inputs,
+            outputs=outputs,
+            yaml_str=yaml_str,
+            _schema=_schema,
+            creation_context=creation_context,
+            **kwargs,
+        )
+        # Store original yaml
+        self._yaml_str = yaml_str
+        self._other_parameter = kwargs
+
+        self.successful_return_code = successful_return_code
+        self.code = code
+        self.environment = InternalEnvironment(**environment) if isinstance(environment, dict) else environment
+        self.environment_variables = environment_variables
+        # TODO: remove these to keep it a general component class
+        self.command = command
+        self.scope = scope
+        self.hemera = hemera
+        self.hdinsight = hdinsight
+        self.parallel = parallel
+        self.starlite = starlite
+        self.ae365exepool = ae365exepool
+        self.launcher = launcher
+        self.datatransfer = datatransfer
+        self.aether = aether
+
+    @classmethod
+    def _build_io(cls, io_dict: Union[Dict, Input, Output], is_input: bool):
+        component_io = {}
+        for name, port in io_dict.items():
+            if is_input:
+                component_io[name] = InternalInput._from_base(port)
+            else:
+                component_io[name] = InternalOutput._from_base(port)
+        return component_io
+
+    # region AdditionalIncludesMixin
+
+    @classmethod
+    def _read_additional_include_configs(cls, yaml_path: Path) -> List[str]:
+        """Read additional include configs from the additional includes file.
+        The name of the file is the same as the component spec file, with a suffix of ".additional_includes".
+        It can be either a yaml file or a text file:
+        1. If it is a yaml file, yaml format of additional_includes looks like below:
+        ```
+        additional_includes:
+         - your/local/path
+         - type: artifact
+           organization: devops_organization
+           project: devops_project
+           feed: artifacts_feed_name
+           name: universal_package_name
+           version: package_version
+           scope: scope_type
+        ```
+        2. If it is a text file, each line is a path to include. Note that artifact config is not supported
+        in this format.
+
+        :param yaml_path: The yaml path
+        :type yaml_path: Path
+        :return: The list of additional includes
+        :rtype: List[str]
+        """
+        additional_includes_config_path = yaml_path.with_suffix(_ADDITIONAL_INCLUDES_SUFFIX)
+        if additional_includes_config_path.is_file():
+            with open(additional_includes_config_path, encoding=DefaultOpenEncoding.READ) as f:
+                file_content = f.read()
+                try:
+                    configs = yaml.safe_load(file_content)
+                    if isinstance(configs, dict):
+                        return configs.get(_ADDITIONAL_INCLUDES_CONFIG_KEY, [])
+                except Exception:  # pylint: disable=W0718
+                    # TODO: check if we should catch yaml.YamlError instead here
+                    pass
+                return [line.strip() for line in file_content.splitlines(keepends=False) if len(line.strip()) > 0]
+        return []
+
+    @classmethod
+    def _get_additional_includes_field_name(cls) -> str:
+        # additional includes for internal components are configured by a file, which is not a field in the yaml
+        # return '*' as diagnostics yaml paths and override _get_all_additional_includes_configs.
+        return "*"
+
+    def _get_all_additional_includes_configs(self) -> List:
+        # internal components must have a source path
+        return self._read_additional_include_configs(Path(self._source_path))  # type: ignore[arg-type]
+        # TODO: Bug 2881943
+
+    def _get_base_path_for_code(self) -> Path:
+        # internal components must have a source path
+        return Path(self._source_path).parent  # type: ignore[arg-type]
+        # TODO: Bug 2881943
+
+    def _get_origin_code_value(self) -> Union[str, PathLike, None]:
+        return super()._get_origin_code_value() or "."
+
+    # endregion
+
+    def _to_ordered_dict_for_yaml_dump(self) -> Dict:
+        """Dump the component content into a sorted yaml string.
+
+        :return: The ordered dict
+        :rtype: Dict
+        """
+
+        obj = super()._to_ordered_dict_for_yaml_dump()
+        # dict dumped base on schema will transfer code to an absolute path, while we want to keep its original value
+        if "code" in obj:
+            if not self.code:
+                del obj["code"]
+            else:
+                obj["code"] = self.code
+        return obj
+
+    @property
+    def _additional_includes(self) -> AdditionalIncludes:
+        """This property is kept for compatibility with old mldesigner sdk.
+
+        :return: The additional includes
+        :rtype: AdditionalIncludes
+        """
+        obj = self._generate_additional_includes_obj()
+        from azure.ai.ml._internal.entities._additional_includes import InternalAdditionalIncludes
+
+        obj.__class__ = InternalAdditionalIncludes
+        return obj
+
+    # region SchemaValidatableMixin
+    @classmethod
+    def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]:
+        return InternalComponentSchema(context=context)
+
+    def _customized_validate(self) -> MutableValidationResult:
+        validation_result = super(InternalComponent, self)._customized_validate()
+        skip_path_validation = not self._append_diagnostics_and_check_if_origin_code_reliable_for_local_path_validation(
+            validation_result
+        )
+        # resolving additional includes & update self._base_path can be dangerous,
+        # so we just skip path validation if additional includes is provided.
+        # note that there will still be client-side error on job submission (after code is resolved)
+        # if paths in environment are invalid
+        if isinstance(self.environment, InternalEnvironment):
+            validation_result.merge_with(
+                self.environment.validate(
+                    self._base_path,
+                    skip_path_validation=skip_path_validation,
+                ),
+                field_name="environment",
+            )
+        return validation_result
+
+    # endregion
+
+    @classmethod
+    def _from_rest_object_to_init_params(cls, obj: ComponentVersion) -> Dict:
+        # put it here as distribution is shared by some components, e.g. command
+        distribution = obj.properties.component_spec.pop("distribution", None)
+        init_kwargs = super()._from_rest_object_to_init_params(obj)
+        if distribution:
+            init_kwargs["distribution"] = DistributionConfiguration._from_rest_object(distribution)
+        return init_kwargs
+
+    def _to_rest_object(self) -> ComponentVersion:
+        component: Union[Dict[Any, Any], List[Any]] = convert_ordered_dict_to_dict(self._to_dict())
+        component["_source"] = self._source  # type: ignore[call-overload]
+        # TODO: 2883063
+
+        properties = ComponentVersionProperties(
+            component_spec=component,
+            description=self.description,
+            is_anonymous=self._is_anonymous,
+            properties=self.properties,
+            tags=self.tags,
+        )
+        result = ComponentVersion(properties=properties)
+        result.name = self.name
+        return result
+
+    @classmethod
+    def _get_snapshot_id(
+        cls,
+        code_path: Union[str, PathLike],
+        ignore_file: IgnoreFile,
+    ) -> str:
+        """Get the snapshot id of a component with specific working directory in ml-components. Use this as the name of
+        code asset to reuse steps in a pipeline job from ml-components runs.
+
+        :param code_path: The path of the working directory.
+        :type code_path: str
+        :param ignore_file: The ignore file of the snapshot.
+        :type ignore_file: IgnoreFile
+        :return: The snapshot id of a component in ml-components with code_path as its working directory.
+        :rtype: str
+        """
+        curr_root = create_merkletree(code_path, ignore_file.is_file_excluded)
+        snapshot_id = str(UUID(curr_root.hexdigest_hash[::4]))
+        return snapshot_id
+
+    @contextmanager  # type: ignore[arg-type]
+    def _try_build_local_code(self) -> Iterable[Code]:
+        """Build final code when origin code is a local code.
+        Will merge code path with additional includes into a temp folder if additional includes is specified.
+        For internal components, file dependencies in environment will be resolved based on the final code.
+
+        :return: The code instance
+        :rtype: Iterable[Code]
+        """
+
+        tmp_code_dir: Path
+        # origin code value of internal component will never be None. check _get_origin_code_value for details
+        with self._generate_additional_includes_obj().merge_local_code_and_additional_includes() as tmp_code_dir:
+            # use absolute path in case temp folder & work dir are in different drive
+            tmp_code_dir = tmp_code_dir.absolute()
+
+            # file dependency in code will be read during internal environment resolution
+            # for example, docker file of the environment may be in additional includes;
+            # and it will be read then insert to the environment object during resolution.
+            # so we need to resolve environment based on the temporary code path
+            if isinstance(self.environment, InternalEnvironment):
+                self.environment.resolve(base_path=tmp_code_dir)
+
+            # additional includes config file itself should be ignored
+            rebased_ignore_file = ComponentIgnoreFile(
+                tmp_code_dir,
+                additional_includes_file_name=Path(self._source_path)
+                .with_suffix(_ADDITIONAL_INCLUDES_SUFFIX)
+                .name,  # type: ignore[arg-type]
+                # TODO: Bug 2881943
+            )
+
+            # Use the snapshot id in ml-components as code name to enable anonymous
+            # component reuse from ml-component runs.
+            # calculate snapshot id here instead of inside InternalCode to ensure that
+            # snapshot id is calculated based on the built code path
+            yield InternalCode(
+                name=self._get_snapshot_id(
+                    # use absolute path in case temp folder & work dir are in different drive
+                    tmp_code_dir,
+                    # this ignore-file should be rebased to the built code path
+                    rebased_ignore_file,
+                ),
+                version="1",
+                base_path=self._base_path,
+                path=tmp_code_dir,
+                is_anonymous=True,
+                ignore_file=rebased_ignore_file,
+            )
+
+    def __call__(self, *args, **kwargs) -> InternalBaseNode:
+        return super(InternalComponent, self).__call__(*args, **kwargs)