# --------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # --------------------------------------------------------- import re import uuid from os import PathLike from pathlib import Path from typing import IO, TYPE_CHECKING, Any, AnyStr, Callable, Dict, Iterable, Optional, Tuple, Union from marshmallow import INCLUDE from ..._restclient.v2024_01_01_preview.models import ( ComponentContainer, ComponentContainerProperties, ComponentVersion, ComponentVersionProperties, ) from ..._schema import PathAwareSchema from ..._schema.component import ComponentSchema from ..._utils.utils import dump_yaml_to_file, hash_dict from ...constants._common import ( ANONYMOUS_COMPONENT_NAME, BASE_PATH_CONTEXT_KEY, PARAMS_OVERRIDE_KEY, REGISTRY_URI_FORMAT, SOURCE_PATH_CONTEXT_KEY, CommonYamlFields, SchemaUrl, ) from ...constants._component import ComponentSource, IOConstants, NodeType from ...entities._assets.asset import Asset from ...entities._inputs_outputs import Input, Output from ...entities._mixins import LocalizableMixin, TelemetryMixin, YamlTranslatableMixin from ...entities._system_data import SystemData from ...entities._util import find_type_in_override from ...entities._validation import MutableValidationResult, PathAwareSchemaValidatableMixin, RemoteValidatableMixin from ...exceptions import ErrorCategory, ErrorTarget, ValidationException from .._inputs_outputs import GroupInput if TYPE_CHECKING: from ...entities.builders import BaseNode # pylint: disable=protected-access, redefined-builtin # disable redefined-builtin to use id/type as argument name COMPONENT_PLACEHOLDER = "COMPONENT_PLACEHOLDER" class Component( Asset, RemoteValidatableMixin, TelemetryMixin, YamlTranslatableMixin, PathAwareSchemaValidatableMixin, LocalizableMixin, ): """Base class for component version, used to define a component. Can't be instantiated directly. :param name: Name of the resource. :type name: str :param version: Version of the resource. :type version: str :param id: Global ID of the resource, Azure Resource Manager ID. :type id: str :param type: Type of the command, supported is 'command'. :type type: str :param description: Description of the resource. :type description: str :param tags: Tag dictionary. Tags can be added, removed, and updated. :type tags: dict :param properties: Internal use only. :type properties: dict :param display_name: Display name of the component. :type display_name: str :param is_deterministic: Whether the component is deterministic. Defaults to True. :type is_deterministic: bool :param inputs: Inputs of the component. :type inputs: dict :param outputs: Outputs of the component. :type outputs: dict :param yaml_str: The YAML string of the component. :type yaml_str: str :param _schema: Schema of the component. :type _schema: str :param creation_context: Creation metadata of the component. :type creation_context: ~azure.ai.ml.entities.SystemData :param kwargs: Additional parameters for the component. :raises ~azure.ai.ml.exceptions.ValidationException: Raised if Component cannot be successfully validated. Details will be provided in the error message. """ # pylint: disable=too-many-instance-attributes def __init__( self, *, name: Optional[str] = None, version: Optional[str] = None, id: Optional[str] = None, type: Optional[str] = None, description: Optional[str] = None, tags: Optional[Dict] = None, properties: Optional[Dict] = None, display_name: Optional[str] = None, is_deterministic: bool = True, inputs: Optional[Dict] = None, outputs: Optional[Dict] = None, yaml_str: Optional[str] = None, _schema: Optional[str] = None, creation_context: Optional[SystemData] = None, **kwargs: Any, ) -> None: self.latest_version = None self._intellectual_property = kwargs.pop("intellectual_property", None) # Setting this before super init because when asset init version, _auto_increment_version's value may change self._auto_increment_version = kwargs.pop("auto_increment", False) # Get source from id first, then kwargs. self._source = ( self._resolve_component_source_from_id(id) if id else kwargs.pop("_source", ComponentSource.CLASS) ) # use ANONYMOUS_COMPONENT_NAME instead of guid is_anonymous = kwargs.pop("is_anonymous", False) if not name and version is None: name = ANONYMOUS_COMPONENT_NAME version = "1" is_anonymous = True super().__init__( name=name, version=version, id=id, description=description, tags=tags, properties=properties, creation_context=creation_context, is_anonymous=is_anonymous, base_path=kwargs.pop(BASE_PATH_CONTEXT_KEY, None), source_path=kwargs.pop(SOURCE_PATH_CONTEXT_KEY, None), ) # store kwargs to self._other_parameter instead of pop to super class to allow component have extra # fields not defined in current schema. inputs = inputs if inputs else {} outputs = outputs if outputs else {} self.name = name self._schema = _schema self._type = type self._display_name = display_name self._is_deterministic = is_deterministic self._inputs = self._build_io(inputs, is_input=True) self._outputs = self._build_io(outputs, is_input=False) # Store original yaml self._yaml_str = yaml_str self._other_parameter = kwargs @property def _func(self) -> Callable[..., "BaseNode"]: from azure.ai.ml.entities._job.pipeline._load_component import _generate_component_function # validate input/output names before creating component function validation_result = self._validate_io_names(self.inputs) validation_result.merge_with(self._validate_io_names(self.outputs)) self._try_raise(validation_result) res: Callable = _generate_component_function(self) return res @property def type(self) -> Optional[str]: """Type of the component, default is 'command'. :return: Type of the component. :rtype: str """ return self._type @property def display_name(self) -> Optional[str]: """Display name of the component. :return: Display name of the component. :rtype: str """ return self._display_name @display_name.setter def display_name(self, custom_display_name: str) -> None: """Set display_name of the component. :param custom_display_name: The new display name :type custom_display_name: str """ self._display_name = custom_display_name @property def is_deterministic(self) -> Optional[bool]: """Whether the component is deterministic. :return: Whether the component is deterministic :rtype: bool """ return self._is_deterministic @property def inputs(self) -> Dict: """Inputs of the component. :return: Inputs of the component. :rtype: dict """ res: dict = self._inputs return res @property def outputs(self) -> Dict: """Outputs of the component. :return: Outputs of the component. :rtype: dict """ return self._outputs @property def version(self) -> Optional[str]: """Version of the component. :return: Version of the component. :rtype: str """ return self._version @version.setter def version(self, value: str) -> None: """Set the version of the component. :param value: The version of the component. :type value: str """ if value: if not isinstance(value, str): msg = f"Component version must be a string, not type {type(value)}." raise ValidationException( message=msg, target=ErrorTarget.COMPONENT, no_personal_data_message=msg, error_category=ErrorCategory.USER_ERROR, ) self._version = value self._auto_increment_version = self.name and not self._version def dump(self, dest: Union[str, PathLike, IO[AnyStr]], **kwargs: Any) -> None: """Dump the component content into a file in yaml format. :param dest: The destination to receive this component's content. Must be either a path to a local file, or an already-open file stream. If dest is a file path, a new file will be created, and an exception is raised if the file exists. If dest is an open file, the file will be written to directly, and an exception will be raised if the file is not writable. :type dest: Union[PathLike, str, IO[AnyStr]] """ path = kwargs.pop("path", None) yaml_serialized = self._to_dict() dump_yaml_to_file(dest, yaml_serialized, default_flow_style=False, path=path, **kwargs) @staticmethod def _resolve_component_source_from_id( # pylint: disable=docstring-type-do-not-use-class id: Optional[Union["Component", str]], ) -> Any: """Resolve the component source from id. :param id: The component ID :type id: Optional[str] :return: The component source :rtype: Literal[ ComponentSource.CLASS, ComponentSource.REMOTE_REGISTRY, ComponentSource.REMOTE_WORKSPACE_COMPONENT ] """ if id is None: return ComponentSource.CLASS # Consider default is workspace source, as # azureml: prefix will be removed for arm versioned id. return ( ComponentSource.REMOTE_REGISTRY if not isinstance(id, Component) and id.startswith(REGISTRY_URI_FORMAT) else ComponentSource.REMOTE_WORKSPACE_COMPONENT ) @classmethod def _validate_io_names(cls, io_names: Iterable[str], raise_error: bool = False) -> MutableValidationResult: """Validate input/output names, raise exception if invalid. :param io_names: The names to validate :type io_names: Iterable[str] :param raise_error: Whether to raise if validation fails. Defaults to False :type raise_error: bool :return: The validation result :rtype: MutableValidationResult """ validation_result = cls._create_empty_validation_result() lower2original_kwargs: dict = {} for name in io_names: if re.match(IOConstants.VALID_KEY_PATTERN, name) is None: msg = "{!r} is not a valid parameter name, must be composed letters, numbers, and underscores." validation_result.append_error(message=msg.format(name), yaml_path=f"inputs.{name}") # validate name conflict lower_key = name.lower() if lower_key in lower2original_kwargs: msg = "Invalid component input names {!r} and {!r}, which are equal ignore case." validation_result.append_error( message=msg.format(name, lower2original_kwargs[lower_key]), yaml_path=f"inputs.{name}" ) else: lower2original_kwargs[lower_key] = name return cls._try_raise(validation_result, raise_error=raise_error) @classmethod def _build_io(cls, io_dict: Union[Dict, Input, Output], is_input: bool) -> Dict: component_io: dict = {} for name, port in io_dict.items(): if is_input: component_io[name] = port if isinstance(port, Input) else Input(**port) else: component_io[name] = port if isinstance(port, Output) else Output(**port) if is_input: # Restore flattened parameters to group res: dict = GroupInput.restore_flattened_inputs(component_io) return res return component_io @classmethod def _create_schema_for_validation(cls, context: Any) -> PathAwareSchema: return ComponentSchema(context=context) @classmethod def _create_validation_error(cls, message: str, no_personal_data_message: str) -> ValidationException: return ValidationException( message=message, no_personal_data_message=no_personal_data_message, target=ErrorTarget.COMPONENT, ) @classmethod def _is_flow(cls, data: Any) -> bool: _schema = data.get(CommonYamlFields.SCHEMA, None) if _schema and _schema in [SchemaUrl.PROMPTFLOW_FLOW, SchemaUrl.PROMPTFLOW_RUN]: return True return False @classmethod def _load( cls, data: Optional[Dict] = None, yaml_path: Optional[Union[PathLike, str]] = None, params_override: Optional[list] = None, **kwargs: Any, ) -> "Component": data = data or {} params_override = params_override or [] base_path = Path(yaml_path).parent if yaml_path else Path("./") type_in_override = find_type_in_override(params_override) # type_in_override > type_in_yaml > default (command) if type_in_override is None: type_in_override = data.get(CommonYamlFields.TYPE, None) if type_in_override is None and cls._is_flow(data): type_in_override = NodeType.FLOW_PARALLEL if type_in_override is None: type_in_override = NodeType.COMMAND data[CommonYamlFields.TYPE] = type_in_override from azure.ai.ml.entities._component.component_factory import component_factory create_instance_func, _ = component_factory.get_create_funcs( data, for_load=True, ) new_instance: Component = create_instance_func() # specific keys must be popped before loading with schema using kwargs init_kwargs = { "yaml_str": kwargs.pop("yaml_str", None), "_source": kwargs.pop("_source", ComponentSource.YAML_COMPONENT), } init_kwargs.update( new_instance._load_with_schema( # pylint: disable=protected-access data, context={ BASE_PATH_CONTEXT_KEY: base_path, SOURCE_PATH_CONTEXT_KEY: yaml_path, PARAMS_OVERRIDE_KEY: params_override, }, unknown=INCLUDE, raise_original_exception=True, **kwargs, ) ) # Set base path separately to avoid doing this in post load, as return types of post load are not unified, # could be object or dict. # base_path in context can be changed in loading, so we use original base_path here. init_kwargs[BASE_PATH_CONTEXT_KEY] = base_path.absolute() if yaml_path: init_kwargs[SOURCE_PATH_CONTEXT_KEY] = Path(yaml_path).absolute().as_posix() # TODO: Bug Item number: 2883415 new_instance.__init__( # type: ignore **init_kwargs, ) return new_instance @classmethod def _from_container_rest_object(cls, component_container_rest_object: ComponentContainer) -> "Component": component_container_details: ComponentContainerProperties = component_container_rest_object.properties component = Component( id=component_container_rest_object.id, name=component_container_rest_object.name, description=component_container_details.description, creation_context=SystemData._from_rest_object(component_container_rest_object.system_data), tags=component_container_details.tags, properties=component_container_details.properties, type=NodeType._CONTAINER, # Set this field to None as it hold a default True in init. is_deterministic=None, # type: ignore[arg-type] ) component.latest_version = component_container_details.latest_version return component @classmethod def _from_rest_object(cls, obj: ComponentVersion) -> "Component": # TODO: Remove in PuP with native import job/component type support in MFE/Designer # Convert command component back to import component private preview component_spec = obj.properties.component_spec if component_spec[CommonYamlFields.TYPE] == NodeType.COMMAND and component_spec["command"] == NodeType.IMPORT: component_spec[CommonYamlFields.TYPE] = NodeType.IMPORT component_spec["source"] = component_spec.pop("inputs") component_spec["output"] = component_spec.pop("outputs")["output"] # shouldn't block serialization when name is not valid # maybe override serialization method for name field? from azure.ai.ml.entities._component.component_factory import component_factory create_instance_func, _ = component_factory.get_create_funcs(obj.properties.component_spec, for_load=True) instance: Component = create_instance_func() # TODO: Bug Item number: 2883415 instance.__init__(**instance._from_rest_object_to_init_params(obj)) # type: ignore return instance @classmethod def _from_rest_object_to_init_params(cls, obj: ComponentVersion) -> Dict: # Object got from rest data contain _source, we delete it. if "_source" in obj.properties.component_spec: del obj.properties.component_spec["_source"] rest_component_version = obj.properties _type = rest_component_version.component_spec[CommonYamlFields.TYPE] # inputs/outputs will be parsed by instance._build_io in instance's __init__ inputs = rest_component_version.component_spec.pop("inputs", {}) # parse String -> string, Integer -> integer, etc for _input in inputs.values(): _input["type"] = Input._map_from_rest_type(_input["type"]) outputs = rest_component_version.component_spec.pop("outputs", {}) origin_name = rest_component_version.component_spec[CommonYamlFields.NAME] rest_component_version.component_spec[CommonYamlFields.NAME] = ANONYMOUS_COMPONENT_NAME init_kwargs = cls._load_with_schema( rest_component_version.component_spec, context={BASE_PATH_CONTEXT_KEY: Path.cwd()}, unknown=INCLUDE ) init_kwargs.update( { "id": obj.id, "is_anonymous": rest_component_version.is_anonymous, "creation_context": obj.system_data, "inputs": inputs, "outputs": outputs, "name": origin_name, } ) # remove empty values, because some property only works for specific component, eg: distribution for command # note that there is an issue that environment == {} will always be true, so use isinstance here return {k: v for k, v in init_kwargs.items() if v is not None and not (isinstance(v, dict) and not v)} def _get_anonymous_hash(self) -> str: """Return the hash of anonymous component. Anonymous Components (same code and interface) will have same hash. :return: The component hash :rtype: str """ # omit version since anonymous component's version is random guid # omit name since name doesn't impact component's uniqueness return self._get_component_hash(keys_to_omit=["name", "id", "version"]) def _get_component_hash(self, keys_to_omit: Optional[Iterable[str]] = None) -> str: """Return the hash of component. :param keys_to_omit: An iterable of keys to omit when computing the component hash :type keys_to_omit: Optional[Iterable[str]] :return: The component hash :rtype: str """ component_interface_dict = self._to_dict() res: str = hash_dict(component_interface_dict, keys_to_omit=keys_to_omit) return res @classmethod def _get_resource_type(cls) -> str: return "Microsoft.MachineLearningServices/workspaces/components/versions" def _get_resource_name_version(self) -> Tuple: version: Optional[str] = None if not self.version and not self._auto_increment_version: version = str(uuid.uuid4()) else: version = self.version return self.name or ANONYMOUS_COMPONENT_NAME, version def _validate(self, raise_error: Optional[bool] = False) -> MutableValidationResult: origin_name = self.name # skip name validation for anonymous component as ANONYMOUS_COMPONENT_NAME will be used in component creation if self._is_anonymous: self.name = ANONYMOUS_COMPONENT_NAME try: return super()._validate(raise_error) finally: self.name = origin_name def _customized_validate(self) -> MutableValidationResult: validation_result = super(Component, self)._customized_validate() # validate inputs names validation_result.merge_with(self._validate_io_names(self.inputs, raise_error=False)) validation_result.merge_with(self._validate_io_names(self.outputs, raise_error=False)) return validation_result def _get_anonymous_component_name_version(self) -> Tuple: return ANONYMOUS_COMPONENT_NAME, self._get_anonymous_hash() def _get_rest_name_version(self) -> Tuple: if self._is_anonymous: return self._get_anonymous_component_name_version() return self.name, self.version def _to_rest_object(self) -> ComponentVersion: component = self._to_dict() # TODO: Remove in PuP with native import job/component type support in MFE/Designer # Convert import component to command component private preview if component.get(CommonYamlFields.TYPE, None) == NodeType.IMPORT: component[CommonYamlFields.TYPE] = NodeType.COMMAND component["inputs"] = component.pop("source") component["outputs"] = dict({"output": component.pop("output")}) # method _to_dict() will remove empty keys if "tags" not in component: component["tags"] = {} component["tags"]["component_type_overwrite"] = NodeType.IMPORT component["command"] = NodeType.IMPORT # add source type to component rest object component["_source"] = self._source if self._intellectual_property: # hack while full pass through supported is worked on for IPP fields component.pop("intellectual_property") component["intellectualProperty"] = self._intellectual_property._to_rest_object().serialize() properties = ComponentVersionProperties( component_spec=component, description=self.description, is_anonymous=self._is_anonymous, properties=dict(self.properties) if self.properties else {}, tags=self.tags, ) result = ComponentVersion(properties=properties) if self._is_anonymous: result.name = ANONYMOUS_COMPONENT_NAME else: result.name = self.name result.properties.properties["client_component_hash"] = self._get_component_hash(keys_to_omit=["version"]) return result def _to_dict(self) -> Dict: # Replace the name of $schema to schema. component_schema_dict: dict = self._dump_for_validation() component_schema_dict.pop(BASE_PATH_CONTEXT_KEY, None) # TODO: handle other_parameters and remove override from subclass return component_schema_dict def _localize(self, base_path: str) -> None: """Called on an asset got from service to clean up remote attributes like id, creation_context, etc. and update base_path. :param base_path: The base_path :type base_path: str """ if not getattr(self, "id", None): raise ValueError("Only remote asset can be localize but got a {} without id.".format(type(self))) self._id = None self._creation_context = None self._base_path = base_path def _get_telemetry_values(self, *args: Any, **kwargs: Any) -> Dict: # Note: the is_anonymous is not reliable here, create_or_update will log is_anonymous from parameter. is_anonymous = self.name is None or ANONYMOUS_COMPONENT_NAME in self.name return {"type": self.type, "source": self._source, "is_anonymous": is_anonymous} # pylint: disable-next=docstring-missing-param def __call__(self, *args: Any, **kwargs: Any) -> "BaseNode": """Call ComponentVersion as a function and get a Component object. :return: The component object :rtype: BaseNode """ if args: # raise clear error message for unsupported positional args if self._func._has_parameters: # type: ignore _error = f"got {args} for {self.name}" msg = ( f"Component function doesn't support positional arguments, {_error}. " # type: ignore f"Please use keyword arguments like: {self._func._func_calling_example}." ) else: msg = ( "Component function doesn't has any parameters, " f"please make sure component {self.name} has inputs. " ) raise ValidationException( message=msg, target=ErrorTarget.COMPONENT, no_personal_data_message=msg, error_category=ErrorCategory.USER_ERROR, ) return self._func(*args, **kwargs) # pylint: disable=not-callable