aboutsummaryrefslogtreecommitdiff
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

# pylint: disable=protected-access

import json
import os
from copy import deepcopy
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Union

from marshmallow.exceptions import ValidationError as SchemaValidationError

from azure.ai.ml._artifacts._artifact_utilities import _check_and_upload_path
from azure.ai.ml._exception_helper import log_and_raise_error
from azure.ai.ml._restclient.v2023_08_01_preview import AzureMachineLearningWorkspaces as ServiceClient082023Preview
from azure.ai.ml._restclient.v2023_10_01 import AzureMachineLearningServices as ServiceClient102023
from azure.ai.ml._restclient.v2023_10_01.models import (
    FeaturesetVersion,
    FeaturesetVersionBackfillRequest,
    FeatureWindow,
)
from azure.ai.ml._scope_dependent_operations import OperationConfig, OperationScope, _ScopeDependentOperations
from azure.ai.ml._telemetry import ActivityType, monitor_with_activity
from azure.ai.ml._utils._feature_store_utils import (
    _archive_or_restore,
    _datetime_to_str,
    read_feature_set_metadata,
    read_remote_feature_set_spec_metadata,
)
from azure.ai.ml._utils._logger_utils import OpsLogger
from azure.ai.ml._utils.utils import is_url
from azure.ai.ml.constants import ListViewType
from azure.ai.ml.entities._assets._artifacts.feature_set import FeatureSet
from azure.ai.ml.entities._feature_set.data_availability_status import DataAvailabilityStatus
from azure.ai.ml.entities._feature_set.feature import Feature
from azure.ai.ml.entities._feature_set.feature_set_backfill_metadata import FeatureSetBackfillMetadata
from azure.ai.ml.entities._feature_set.feature_set_materialization_metadata import FeatureSetMaterializationMetadata
from azure.ai.ml.entities._feature_set.featureset_spec_metadata import FeaturesetSpecMetadata
from azure.ai.ml.entities._feature_set.materialization_compute_resource import MaterializationComputeResource
from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationErrorType, ValidationException
from azure.ai.ml.operations._datastore_operations import DatastoreOperations
from azure.core.paging import ItemPaged
from azure.core.polling import LROPoller
from azure.core.tracing.decorator import distributed_trace

ops_logger = OpsLogger(__name__)
module_logger = ops_logger.module_logger


class FeatureSetOperations(_ScopeDependentOperations):
    """FeatureSetOperations.

    You should not instantiate this class directly. Instead, you should
    create an MLClient instance that instantiates it for you and
    attaches it as an attribute.
    """

    def __init__(
        self,
        operation_scope: OperationScope,
        operation_config: OperationConfig,
        service_client: ServiceClient102023,
        service_client_for_jobs: ServiceClient082023Preview,
        datastore_operations: DatastoreOperations,
        **kwargs: Dict,
    ):
        super(FeatureSetOperations, self).__init__(operation_scope, operation_config)
        ops_logger.update_filter()
        self._operation = service_client.featureset_versions
        self._container_operation = service_client.featureset_containers
        self._jobs_operation = service_client_for_jobs.jobs
        self._feature_operation = service_client.features
        self._service_client = service_client
        self._datastore_operation = datastore_operations
        self._init_kwargs = kwargs

    @distributed_trace
    @monitor_with_activity(ops_logger, "FeatureSet.List", ActivityType.PUBLICAPI)
    def list(
        self,
        name: Optional[str] = None,
        *,
        list_view_type: ListViewType = ListViewType.ACTIVE_ONLY,
        **kwargs: Dict,
    ) -> ItemPaged[FeatureSet]:
        """List the FeatureSet assets of the workspace.

        :param name: Name of a specific FeatureSet asset, optional.
        :type name: Optional[str]
        :keyword list_view_type: View type for including/excluding (for example) archived FeatureSet assets.
            Defaults to ACTIVE_ONLY.
        :type list_view_type: Optional[ListViewType]
        :return: An iterator like instance of FeatureSet objects
        :rtype: ~azure.core.paging.ItemPaged[FeatureSet]
        """
        if name:
            return self._operation.list(
                workspace_name=self._workspace_name,
                name=name,
                cls=lambda objs: [FeatureSet._from_rest_object(obj) for obj in objs],
                list_view_type=list_view_type,
                **self._scope_kwargs,
                **kwargs,
            )
        return self._container_operation.list(
            workspace_name=self._workspace_name,
            cls=lambda objs: [FeatureSet._from_container_rest_object(obj) for obj in objs],
            list_view_type=list_view_type,
            **self._scope_kwargs,
            **kwargs,
        )

    def _get(self, name: str, version: Optional[str] = None, **kwargs: Dict) -> FeaturesetVersion:
        return self._operation.get(
            resource_group_name=self._resource_group_name,
            workspace_name=self._workspace_name,
            name=name,
            version=version,
            **self._init_kwargs,
            **kwargs,
        )

    @distributed_trace
    @monitor_with_activity(ops_logger, "FeatureSet.Get", ActivityType.PUBLICAPI)
    def get(self, name: str, version: str, **kwargs: Dict) -> FeatureSet:  # type: ignore
        """Get the specified FeatureSet asset.

        :param name: Name of FeatureSet asset.
        :type name: str
        :param version: Version of FeatureSet asset.
        :type version: str
        :raises ~azure.ai.ml.exceptions.ValidationException: Raised if FeatureSet cannot be successfully
            identified and retrieved. Details will be provided in the error message.
        :raises ~azure.core.exceptions.HttpResponseError: Raised if the corresponding name and version cannot be
            retrieved from the service.
        :return: FeatureSet asset object.
        :rtype: ~azure.ai.ml.entities.FeatureSet
        """
        try:
            featureset_version_resource = self._get(name, version, **kwargs)
            return FeatureSet._from_rest_object(featureset_version_resource)  # type: ignore[return-value]
        except (ValidationException, SchemaValidationError) as ex:
            log_and_raise_error(ex)

    @distributed_trace
    @monitor_with_activity(ops_logger, "FeatureSet.BeginCreateOrUpdate", ActivityType.PUBLICAPI)
    def begin_create_or_update(self, featureset: FeatureSet, **kwargs: Dict) -> LROPoller[FeatureSet]:
        """Create or update FeatureSet

        :param featureset: FeatureSet definition.
        :type featureset: FeatureSet
        :return: An instance of LROPoller that returns a FeatureSet.
        :rtype: ~azure.core.polling.LROPoller[~azure.ai.ml.entities.FeatureSet]
        """
        featureset_copy = deepcopy(featureset)

        featureset_spec = self._validate_and_get_feature_set_spec(featureset_copy)
        featureset_copy.properties["featuresetPropertiesVersion"] = "1"
        featureset_copy.properties["featuresetProperties"] = json.dumps(featureset_spec._to_dict())

        sas_uri = None

        if not is_url(featureset_copy.path):
            with open(os.path.join(str(featureset_copy.path), ".amlignore"), mode="w", encoding="utf-8") as f:
                f.write(".*\n*.amltmp\n*.amltemp")

        featureset_copy, _ = _check_and_upload_path(
            artifact=featureset_copy, asset_operations=self, sas_uri=sas_uri, artifact_type=ErrorTarget.FEATURE_SET
        )

        featureset_resource = FeatureSet._to_rest_object(featureset_copy)

        return self._operation.begin_create_or_update(
            resource_group_name=self._resource_group_name,
            workspace_name=self._workspace_name,
            name=featureset_copy.name,
            version=featureset_copy.version,
            body=featureset_resource,
            **kwargs,
            cls=lambda response, deserialized, headers: FeatureSet._from_rest_object(deserialized),
        )

    @distributed_trace
    @monitor_with_activity(ops_logger, "FeatureSet.BeginBackFill", ActivityType.PUBLICAPI)
    def begin_backfill(
        self,
        *,
        name: str,
        version: str,
        feature_window_start_time: Optional[datetime] = None,
        feature_window_end_time: Optional[datetime] = None,
        display_name: Optional[str] = None,
        description: Optional[str] = None,
        tags: Optional[Dict[str, str]] = None,
        compute_resource: Optional[MaterializationComputeResource] = None,
        spark_configuration: Optional[Dict[str, str]] = None,
        data_status: Optional[List[Union[str, DataAvailabilityStatus]]] = None,
        job_id: Optional[str] = None,
        **kwargs: Dict,
    ) -> LROPoller[FeatureSetBackfillMetadata]:
        """Backfill.

        :keyword name: Feature set name. This is case-sensitive.
        :paramtype name: str
        :keyword version: Version identifier. This is case-sensitive.
        :paramtype version: str
        :keyword feature_window_start_time: Start time of the feature window to be materialized.
        :paramtype feature_window_start_time: datetime
        :keyword feature_window_end_time: End time of the feature window to be materialized.
        :paramtype feature_window_end_time: datetime
        :keyword display_name: Specifies description.
        :paramtype display_name: str
        :keyword description: Specifies description.
        :paramtype description: str
        :keyword tags: A set of tags. Specifies the tags.
        :paramtype tags: dict[str, str]
        :keyword compute_resource: Specifies the compute resource settings.
        :paramtype compute_resource: ~azure.ai.ml.entities.MaterializationComputeResource
        :keyword spark_configuration: Specifies the spark compute settings.
        :paramtype spark_configuration: dict[str, str]
        :keyword data_status: Specifies the data status that you want to backfill.
        :paramtype data_status: list[str or ~azure.ai.ml.entities.DataAvailabilityStatus]
        :keyword job_id: The job id.
        :paramtype job_id: str
        :return: An instance of LROPoller that returns ~azure.ai.ml.entities.FeatureSetBackfillMetadata
        :rtype: ~azure.core.polling.LROPoller[~azure.ai.ml.entities.FeatureSetBackfillMetadata]
        """
        request_body: FeaturesetVersionBackfillRequest = FeaturesetVersionBackfillRequest(
            description=description,
            display_name=display_name,
            feature_window=FeatureWindow(
                feature_window_start=feature_window_start_time, feature_window_end=feature_window_end_time
            ),
            resource=compute_resource._to_rest_object() if compute_resource else None,
            spark_configuration=spark_configuration,
            data_availability_status=data_status,
            job_id=job_id,
            tags=tags,
        )

        return self._operation.begin_backfill(
            resource_group_name=self._resource_group_name,
            workspace_name=self._workspace_name,
            name=name,
            version=version,
            body=request_body,
            **kwargs,
            cls=lambda response, deserialized, headers: FeatureSetBackfillMetadata._from_rest_object(deserialized),
        )

    @distributed_trace
    @monitor_with_activity(ops_logger, "FeatureSet.ListMaterializationOperation", ActivityType.PUBLICAPI)
    def list_materialization_operations(
        self,
        name: str,
        version: str,
        *,
        feature_window_start_time: Optional[Union[str, datetime]] = None,
        feature_window_end_time: Optional[Union[str, datetime]] = None,
        filters: Optional[str] = None,
        **kwargs: Dict,
    ) -> ItemPaged[FeatureSetMaterializationMetadata]:
        """List Materialization operation.

        :param name: Feature set name.
        :type name: str
        :param version: Feature set version.
        :type version: str
        :keyword feature_window_start_time: Start time of the feature window to filter materialization jobs.
        :paramtype feature_window_start_time: Union[str, datetime]
        :keyword feature_window_end_time: End time of the feature window to filter materialization jobs.
        :paramtype feature_window_end_time: Union[str, datetime]
        :keyword filters: Comma-separated list of tag names (and optionally values). Example: tag1,tag2=value2.
        :paramtype filters: str
        :return: An iterator like instance of ~azure.ai.ml.entities.FeatureSetMaterializationMetadata objects
        :rtype: ~azure.core.paging.ItemPaged[FeatureSetMaterializationMetadata]
        """
        feature_window_start_time = _datetime_to_str(feature_window_start_time) if feature_window_start_time else None
        feature_window_end_time = _datetime_to_str(feature_window_end_time) if feature_window_end_time else None
        properties = f"azureml.FeatureSetName={name},azureml.FeatureSetVersion={version}"
        if feature_window_start_time:
            properties = properties + f",azureml.FeatureWindowStart={feature_window_start_time}"
        if feature_window_end_time:
            properties = properties + f",azureml.FeatureWindowEnd={feature_window_end_time}"

        materialization_jobs = self._jobs_operation.list(
            resource_group_name=self._resource_group_name,
            workspace_name=self._workspace_name,
            properties=properties,
            tag=filters,
            cls=lambda objs: [FeatureSetMaterializationMetadata._from_rest_object(obj) for obj in objs],
            **kwargs,
        )
        return materialization_jobs

    @distributed_trace
    @monitor_with_activity(ops_logger, "FeatureSet.ListFeatures", ActivityType.PUBLICAPI)
    def list_features(
        self,
        feature_set_name: str,
        version: str,
        *,
        feature_name: Optional[str] = None,
        description: Optional[str] = None,
        tags: Optional[str] = None,
        **kwargs: Dict,
    ) -> ItemPaged[Feature]:
        """List features

        :param feature_set_name: Feature set name.
        :type feature_set_name: str
        :param version: Feature set version.
        :type version: str
        :keyword feature_name: feature name.
        :paramtype feature_name: str
        :keyword description: Description of the featureset.
        :paramtype description: str
        :keyword tags: Comma-separated list of tag names (and optionally values). Example: tag1,tag2=value2.
        :paramtype tags: str
        :return: An iterator like instance of Feature objects
        :rtype: ~azure.core.paging.ItemPaged[Feature]
        """
        features = self._feature_operation.list(
            resource_group_name=self._resource_group_name,
            workspace_name=self._workspace_name,
            featureset_name=feature_set_name,
            featureset_version=version,
            tags=tags,
            feature_name=feature_name,
            description=description,
            **kwargs,
            cls=lambda objs: [Feature._from_rest_object(obj) for obj in objs],
        )
        return features

    @distributed_trace
    @monitor_with_activity(ops_logger, "FeatureSet.GetFeature", ActivityType.PUBLICAPI)
    def get_feature(
        self, feature_set_name: str, version: str, *, feature_name: str, **kwargs: Dict
    ) -> Optional["Feature"]:
        """Get Feature

        :param feature_set_name: Feature set name.
        :type feature_set_name: str
        :param version: Feature set version.
        :type version: str
        :keyword feature_name: The feature name. This argument is case-sensitive.
        :paramtype feature_name: str
        :return: Feature object
        :rtype: ~azure.ai.ml.entities.Feature
        """
        feature = self._feature_operation.get(
            resource_group_name=self._resource_group_name,
            workspace_name=self._workspace_name,
            featureset_name=feature_set_name,
            featureset_version=version,
            feature_name=feature_name,
            **kwargs,
        )

        return Feature._from_rest_object(feature)  # type: ignore[return-value]

    @distributed_trace
    @monitor_with_activity(ops_logger, "FeatureSet.Archive", ActivityType.PUBLICAPI)
    def archive(
        self,
        name: str,
        version: str,
        **kwargs: Dict,
    ) -> None:
        """Archive a FeatureSet asset.

        :param name: Name of FeatureSet asset.
        :type name: str
        :param version: Version of FeatureSet asset.
        :type version: str
        :return: None
        """

        _archive_or_restore(
            asset_operations=self,
            version_operation=self._operation,
            is_archived=True,
            name=name,
            version=version,
            **kwargs,
        )

    @distributed_trace
    @monitor_with_activity(ops_logger, "FeatureSet.Restore", ActivityType.PUBLICAPI)
    def restore(
        self,
        name: str,
        version: str,
        **kwargs: Dict,
    ) -> None:
        """Restore an archived FeatureSet asset.

        :param name: Name of FeatureSet asset.
        :type name: str
        :param version: Version of FeatureSet asset.
        :type version: str
        :return: None
        """

        _archive_or_restore(
            asset_operations=self,
            version_operation=self._operation,
            is_archived=False,
            name=name,
            version=version,
            **kwargs,
        )

    def _validate_and_get_feature_set_spec(self, featureset: FeatureSet) -> FeaturesetSpecMetadata:
        if not (featureset.specification and featureset.specification.path):
            msg = "Missing FeatureSet specification path. Path is required for feature set."
            raise ValidationException(
                message=msg,
                no_personal_data_message=msg,
                error_type=ValidationErrorType.MISSING_FIELD,
                target=ErrorTarget.FEATURE_SET,
                error_category=ErrorCategory.USER_ERROR,
            )

        featureset_spec_path: Any = str(featureset.specification.path)
        if is_url(featureset_spec_path):
            try:
                featureset_spec_contents = read_remote_feature_set_spec_metadata(
                    base_uri=featureset_spec_path,
                    datastore_operations=self._datastore_operation,
                )
                featureset_spec_path = None
            except Exception as ex:
                module_logger.info("Unable to access FeaturesetSpec at path %s", featureset_spec_path)
                raise ex
            return FeaturesetSpecMetadata._load(featureset_spec_contents, featureset_spec_path)

        if not os.path.isabs(featureset_spec_path):
            featureset_spec_path = Path(featureset.base_path, featureset_spec_path).resolve()

        if not os.path.isdir(featureset_spec_path):
            raise ValidationException(
                message="No such directory: {}".format(featureset_spec_path),
                no_personal_data_message="No such directory",
                target=ErrorTarget.FEATURE_SET,
                error_category=ErrorCategory.USER_ERROR,
                error_type=ValidationErrorType.FILE_OR_FOLDER_NOT_FOUND,
            )

        featureset_spec_contents = read_feature_set_metadata(path=featureset_spec_path)
        featureset_spec_yaml_path = Path(featureset_spec_path, "FeatureSetSpec.yaml")
        return FeaturesetSpecMetadata._load(featureset_spec_contents, featureset_spec_yaml_path)