# --------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # --------------------------------------------------------- # pylint: disable=protected-access, redefined-builtin # disable redefined-builtin to use id/type as argument name import os from contextlib import contextmanager from os import PathLike from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Union from uuid import UUID import yaml # type: ignore[import] from marshmallow import Schema from ... import Input, Output from ..._restclient.v2022_10_01.models import ComponentVersion, ComponentVersionProperties from ..._schema import PathAwareSchema from ..._utils._arm_id_utils import parse_name_label from ..._utils._asset_utils import IgnoreFile from ...constants._common import DefaultOpenEncoding from ...entities import Component from ...entities._assets import Code from ...entities._component._additional_includes import AdditionalIncludes, AdditionalIncludesMixin from ...entities._component.code import ComponentIgnoreFile from ...entities._job.distribution import DistributionConfiguration from ...entities._system_data import SystemData from ...entities._util import convert_ordered_dict_to_dict from ...entities._validation import MutableValidationResult from .._schema.component import InternalComponentSchema from ._input_outputs import InternalInput, InternalOutput from ._merkle_tree import create_merkletree from .code import InternalCode from .environment import InternalEnvironment from .node import InternalBaseNode _ADDITIONAL_INCLUDES_CONFIG_KEY = "additional_includes" _ADDITIONAL_INCLUDES_SUFFIX = ".additional_includes" class InternalComponent(Component, AdditionalIncludesMixin): # pylint: disable=too-many-instance-attributes, too-many-locals """Base class for internal component version, used to define an internal component. Recommended to create instance with component_factory. :param name: Name of the resource. :type name: str :param version: Version of the resource. :type version: str :param id: Global id of the resource, Azure Resource Manager ID. :type id: str :param type: Type of the command, supported is 'command'. :type type: str :param description: Description of the resource. :type description: str :param tags: Tag dictionary. Tags can be added, removed, and updated. :type tags: dict :param properties: Internal use only. :type properties: dict :param display_name: Display name of the component. :type display_name: str :param is_deterministic: Whether the component is deterministic. :type is_deterministic: bool :param inputs: Inputs of the component. :type inputs: dict :param outputs: Outputs of the component. :type outputs: dict :param yaml_str: The yaml string of the component. :type yaml_str: str :param _schema: Schema of the component. :type _schema: str :param creation_context: Creation metadata of the component. :type creation_context: ~azure.ai.ml.entities.SystemData """ def __init__( self, *, _schema: Optional[str] = None, name: Optional[str] = None, version: Optional[str] = None, display_name: Optional[str] = None, type: Optional[str] = None, description: Optional[str] = None, tags: Optional[Dict] = None, is_deterministic: Optional[bool] = None, successful_return_code: Optional[str] = None, inputs: Optional[Dict] = None, outputs: Optional[Dict] = None, code: Optional[Union[str, os.PathLike]] = None, environment: Optional[Dict] = None, environment_variables: Optional[Dict] = None, command: Optional[str] = None, id: Optional[str] = None, properties: Optional[Dict] = None, yaml_str: Optional[str] = None, creation_context: Optional[SystemData] = None, scope: Optional[Dict] = None, hemera: Optional[Dict] = None, hdinsight: Optional[Dict] = None, parallel: Optional[Dict] = None, starlite: Optional[Dict] = None, ae365exepool: Optional[Dict] = None, launcher: Optional[Dict] = None, datatransfer: Optional[Dict] = None, aether: Optional[Dict] = None, **kwargs, ): _type, self._type_label = parse_name_label(type) super().__init__( name=name, version=version, id=id, type=_type, description=description, tags=tags, properties=properties, display_name=display_name, is_deterministic=is_deterministic, # type: ignore[arg-type] inputs=inputs, outputs=outputs, yaml_str=yaml_str, _schema=_schema, creation_context=creation_context, **kwargs, ) # Store original yaml self._yaml_str = yaml_str self._other_parameter = kwargs self.successful_return_code = successful_return_code self.code = code self.environment = InternalEnvironment(**environment) if isinstance(environment, dict) else environment self.environment_variables = environment_variables # TODO: remove these to keep it a general component class self.command = command self.scope = scope self.hemera = hemera self.hdinsight = hdinsight self.parallel = parallel self.starlite = starlite self.ae365exepool = ae365exepool self.launcher = launcher self.datatransfer = datatransfer self.aether = aether @classmethod def _build_io(cls, io_dict: Union[Dict, Input, Output], is_input: bool): component_io = {} for name, port in io_dict.items(): if is_input: component_io[name] = InternalInput._from_base(port) else: component_io[name] = InternalOutput._from_base(port) return component_io # region AdditionalIncludesMixin @classmethod def _read_additional_include_configs(cls, yaml_path: Path) -> List[str]: """Read additional include configs from the additional includes file. The name of the file is the same as the component spec file, with a suffix of ".additional_includes". It can be either a yaml file or a text file: 1. If it is a yaml file, yaml format of additional_includes looks like below: ``` additional_includes: - your/local/path - type: artifact organization: devops_organization project: devops_project feed: artifacts_feed_name name: universal_package_name version: package_version scope: scope_type ``` 2. If it is a text file, each line is a path to include. Note that artifact config is not supported in this format. :param yaml_path: The yaml path :type yaml_path: Path :return: The list of additional includes :rtype: List[str] """ additional_includes_config_path = yaml_path.with_suffix(_ADDITIONAL_INCLUDES_SUFFIX) if additional_includes_config_path.is_file(): with open(additional_includes_config_path, encoding=DefaultOpenEncoding.READ) as f: file_content = f.read() try: configs = yaml.safe_load(file_content) if isinstance(configs, dict): return configs.get(_ADDITIONAL_INCLUDES_CONFIG_KEY, []) except Exception: # pylint: disable=W0718 # TODO: check if we should catch yaml.YamlError instead here pass return [line.strip() for line in file_content.splitlines(keepends=False) if len(line.strip()) > 0] return [] @classmethod def _get_additional_includes_field_name(cls) -> str: # additional includes for internal components are configured by a file, which is not a field in the yaml # return '*' as diagnostics yaml paths and override _get_all_additional_includes_configs. return "*" def _get_all_additional_includes_configs(self) -> List: # internal components must have a source path return self._read_additional_include_configs(Path(self._source_path)) # type: ignore[arg-type] # TODO: Bug 2881943 def _get_base_path_for_code(self) -> Path: # internal components must have a source path return Path(self._source_path).parent # type: ignore[arg-type] # TODO: Bug 2881943 def _get_origin_code_value(self) -> Union[str, PathLike, None]: return super()._get_origin_code_value() or "." # endregion def _to_ordered_dict_for_yaml_dump(self) -> Dict: """Dump the component content into a sorted yaml string. :return: The ordered dict :rtype: Dict """ obj = super()._to_ordered_dict_for_yaml_dump() # dict dumped base on schema will transfer code to an absolute path, while we want to keep its original value if "code" in obj: if not self.code: del obj["code"] else: obj["code"] = self.code return obj @property def _additional_includes(self) -> AdditionalIncludes: """This property is kept for compatibility with old mldesigner sdk. :return: The additional includes :rtype: AdditionalIncludes """ obj = self._generate_additional_includes_obj() from azure.ai.ml._internal.entities._additional_includes import InternalAdditionalIncludes obj.__class__ = InternalAdditionalIncludes return obj # region SchemaValidatableMixin @classmethod def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]: return InternalComponentSchema(context=context) def _customized_validate(self) -> MutableValidationResult: validation_result = super(InternalComponent, self)._customized_validate() skip_path_validation = not self._append_diagnostics_and_check_if_origin_code_reliable_for_local_path_validation( validation_result ) # resolving additional includes & update self._base_path can be dangerous, # so we just skip path validation if additional includes is provided. # note that there will still be client-side error on job submission (after code is resolved) # if paths in environment are invalid if isinstance(self.environment, InternalEnvironment): validation_result.merge_with( self.environment.validate( self._base_path, skip_path_validation=skip_path_validation, ), field_name="environment", ) return validation_result # endregion @classmethod def _from_rest_object_to_init_params(cls, obj: ComponentVersion) -> Dict: # put it here as distribution is shared by some components, e.g. command distribution = obj.properties.component_spec.pop("distribution", None) init_kwargs = super()._from_rest_object_to_init_params(obj) if distribution: init_kwargs["distribution"] = DistributionConfiguration._from_rest_object(distribution) return init_kwargs def _to_rest_object(self) -> ComponentVersion: component: Union[Dict[Any, Any], List[Any]] = convert_ordered_dict_to_dict(self._to_dict()) component["_source"] = self._source # type: ignore[call-overload] # TODO: 2883063 properties = ComponentVersionProperties( component_spec=component, description=self.description, is_anonymous=self._is_anonymous, properties=self.properties, tags=self.tags, ) result = ComponentVersion(properties=properties) result.name = self.name return result @classmethod def _get_snapshot_id( cls, code_path: Union[str, PathLike], ignore_file: IgnoreFile, ) -> str: """Get the snapshot id of a component with specific working directory in ml-components. Use this as the name of code asset to reuse steps in a pipeline job from ml-components runs. :param code_path: The path of the working directory. :type code_path: str :param ignore_file: The ignore file of the snapshot. :type ignore_file: IgnoreFile :return: The snapshot id of a component in ml-components with code_path as its working directory. :rtype: str """ curr_root = create_merkletree(code_path, ignore_file.is_file_excluded) snapshot_id = str(UUID(curr_root.hexdigest_hash[::4])) return snapshot_id @contextmanager # type: ignore[arg-type] def _try_build_local_code(self) -> Iterable[Code]: """Build final code when origin code is a local code. Will merge code path with additional includes into a temp folder if additional includes is specified. For internal components, file dependencies in environment will be resolved based on the final code. :return: The code instance :rtype: Iterable[Code] """ tmp_code_dir: Path # origin code value of internal component will never be None. check _get_origin_code_value for details with self._generate_additional_includes_obj().merge_local_code_and_additional_includes() as tmp_code_dir: # use absolute path in case temp folder & work dir are in different drive tmp_code_dir = tmp_code_dir.absolute() # file dependency in code will be read during internal environment resolution # for example, docker file of the environment may be in additional includes; # and it will be read then insert to the environment object during resolution. # so we need to resolve environment based on the temporary code path if isinstance(self.environment, InternalEnvironment): self.environment.resolve(base_path=tmp_code_dir) # additional includes config file itself should be ignored rebased_ignore_file = ComponentIgnoreFile( tmp_code_dir, additional_includes_file_name=Path(self._source_path) .with_suffix(_ADDITIONAL_INCLUDES_SUFFIX) .name, # type: ignore[arg-type] # TODO: Bug 2881943 ) # Use the snapshot id in ml-components as code name to enable anonymous # component reuse from ml-component runs. # calculate snapshot id here instead of inside InternalCode to ensure that # snapshot id is calculated based on the built code path yield InternalCode( name=self._get_snapshot_id( # use absolute path in case temp folder & work dir are in different drive tmp_code_dir, # this ignore-file should be rebased to the built code path rebased_ignore_file, ), version="1", base_path=self._base_path, path=tmp_code_dir, is_anonymous=True, ignore_file=rebased_ignore_file, ) def __call__(self, *args, **kwargs) -> InternalBaseNode: return super(InternalComponent, self).__call__(*args, **kwargs)