diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/sweep')
7 files changed, 1485 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/sweep/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/sweep/__init__.py new file mode 100644 index 00000000..fdf8caba --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/sweep/__init__.py @@ -0,0 +1,5 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +__path__ = __import__("pkgutil").extend_path(__path__, __name__) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/sweep/early_termination_policy.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/sweep/early_termination_policy.py new file mode 100644 index 00000000..b1b928fc --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/sweep/early_termination_policy.py @@ -0,0 +1,191 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +from abc import ABC +from typing import Any, Optional, cast + +from azure.ai.ml._restclient.v2023_04_01_preview.models import BanditPolicy as RestBanditPolicy +from azure.ai.ml._restclient.v2023_04_01_preview.models import EarlyTerminationPolicy as RestEarlyTerminationPolicy +from azure.ai.ml._restclient.v2023_04_01_preview.models import EarlyTerminationPolicyType +from azure.ai.ml._restclient.v2023_04_01_preview.models import MedianStoppingPolicy as RestMedianStoppingPolicy +from azure.ai.ml._restclient.v2023_04_01_preview.models import ( + TruncationSelectionPolicy as RestTruncationSelectionPolicy, +) +from azure.ai.ml._utils.utils import camel_to_snake +from azure.ai.ml.entities._mixins import RestTranslatableMixin + + +class EarlyTerminationPolicy(ABC, RestTranslatableMixin): + def __init__( + self, + *, + delay_evaluation: int, + evaluation_interval: int, + ): + self.type = None + self.delay_evaluation = delay_evaluation + self.evaluation_interval = evaluation_interval + + @classmethod + def _from_rest_object(cls, obj: RestEarlyTerminationPolicy) -> Optional["EarlyTerminationPolicy"]: + if not obj: + return None + + policy: Any = None + if obj.policy_type == EarlyTerminationPolicyType.BANDIT: + policy = BanditPolicy._from_rest_object(obj) # pylint: disable=protected-access + + if obj.policy_type == EarlyTerminationPolicyType.MEDIAN_STOPPING: + policy = MedianStoppingPolicy._from_rest_object(obj) # pylint: disable=protected-access + + if obj.policy_type == EarlyTerminationPolicyType.TRUNCATION_SELECTION: + policy = TruncationSelectionPolicy._from_rest_object(obj) # pylint: disable=protected-access + + return cast(Optional["EarlyTerminationPolicy"], policy) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, EarlyTerminationPolicy): + raise NotImplementedError + res: bool = self._to_rest_object() == other._to_rest_object() + return res + + +class BanditPolicy(EarlyTerminationPolicy): + """Defines an early termination policy based on slack criteria and a frequency and delay interval for evaluation. + + :keyword delay_evaluation: Number of intervals by which to delay the first evaluation. Defaults to 0. + :paramtype delay_evaluation: int + :keyword evaluation_interval: Interval (number of runs) between policy evaluations. Defaults to 0. + :paramtype evaluation_interval: int + :keyword slack_amount: Absolute distance allowed from the best performing run. Defaults to 0. + :paramtype slack_amount: float + :keyword slack_factor: Ratio of the allowed distance from the best performing run. Defaults to 0. + :paramtype slack_factor: float + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_sweep_configurations.py + :start-after: [START configure_sweep_job_bandit_policy] + :end-before: [END configure_sweep_job_bandit_policy] + :language: python + :dedent: 8 + :caption: Configuring BanditPolicy early termination of a hyperparameter sweep on a Command job. + """ + + def __init__( + self, + *, + delay_evaluation: int = 0, + evaluation_interval: int = 0, + slack_amount: float = 0, + slack_factor: float = 0, + ) -> None: + super().__init__(delay_evaluation=delay_evaluation, evaluation_interval=evaluation_interval) + self.type = EarlyTerminationPolicyType.BANDIT.lower() + self.slack_factor = slack_factor + self.slack_amount = slack_amount + + def _to_rest_object(self) -> RestBanditPolicy: + return RestBanditPolicy( + delay_evaluation=self.delay_evaluation, + evaluation_interval=self.evaluation_interval, + slack_factor=self.slack_factor, + slack_amount=self.slack_amount, + ) + + @classmethod + def _from_rest_object(cls, obj: RestBanditPolicy) -> "BanditPolicy": + return cls( + delay_evaluation=obj.delay_evaluation, + evaluation_interval=obj.evaluation_interval, + slack_factor=obj.slack_factor, + slack_amount=obj.slack_amount, + ) + + +class MedianStoppingPolicy(EarlyTerminationPolicy): + """Defines an early termination policy based on a running average of the primary metric of all runs. + + :keyword delay_evaluation: Number of intervals by which to delay the first evaluation. Defaults to 0. + :paramtype delay_evaluation: int + :keyword evaluation_interval: Interval (number of runs) between policy evaluations. Defaults to 1. + :paramtype evaluation_interval: int + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_sweep_configurations.py + :start-after: [START configure_sweep_job_median_stopping_policy] + :end-before: [END configure_sweep_job_median_stopping_policy] + :language: python + :dedent: 8 + :caption: Configuring an early termination policy for a hyperparameter sweep job using MedianStoppingPolicy + """ + + def __init__( + self, + *, + delay_evaluation: int = 0, + evaluation_interval: int = 1, + ) -> None: + super().__init__(delay_evaluation=delay_evaluation, evaluation_interval=evaluation_interval) + self.type = camel_to_snake(EarlyTerminationPolicyType.MEDIAN_STOPPING) + + def _to_rest_object(self) -> RestMedianStoppingPolicy: + return RestMedianStoppingPolicy( + delay_evaluation=self.delay_evaluation, evaluation_interval=self.evaluation_interval + ) + + @classmethod + def _from_rest_object(cls, obj: RestMedianStoppingPolicy) -> "MedianStoppingPolicy": + return cls( + delay_evaluation=obj.delay_evaluation, + evaluation_interval=obj.evaluation_interval, + ) + + +class TruncationSelectionPolicy(EarlyTerminationPolicy): + """Defines an early termination policy that cancels a given percentage of runs at each evaluation interval. + + :keyword delay_evaluation: Number of intervals by which to delay the first evaluation. Defaults to 0. + :paramtype delay_evaluation: int + :keyword evaluation_interval: Interval (number of runs) between policy evaluations. Defaults to 0. + :paramtype evaluation_interval: int + :keyword truncation_percentage: The percentage of runs to cancel at each evaluation interval. Defaults to 0. + :paramtype truncation_percentage: int + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_sweep_configurations.py + :start-after: [START configure_sweep_job_truncation_selection_policy] + :end-before: [END configure_sweep_job_truncation_selection_policy] + :language: python + :dedent: 8 + :caption: Configuring an early termination policy for a hyperparameter sweep job + using TruncationStoppingPolicy + """ + + def __init__( + self, + *, + delay_evaluation: int = 0, + evaluation_interval: int = 0, + truncation_percentage: int = 0, + ) -> None: + super().__init__(delay_evaluation=delay_evaluation, evaluation_interval=evaluation_interval) + self.type = camel_to_snake(EarlyTerminationPolicyType.TRUNCATION_SELECTION) + self.truncation_percentage = truncation_percentage + + def _to_rest_object(self) -> RestTruncationSelectionPolicy: + return RestTruncationSelectionPolicy( + delay_evaluation=self.delay_evaluation, + evaluation_interval=self.evaluation_interval, + truncation_percentage=self.truncation_percentage, + ) + + @classmethod + def _from_rest_object(cls, obj: RestTruncationSelectionPolicy) -> "TruncationSelectionPolicy": + return cls( + delay_evaluation=obj.delay_evaluation, + evaluation_interval=obj.evaluation_interval, + truncation_percentage=obj.truncation_percentage, + ) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/sweep/objective.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/sweep/objective.py new file mode 100644 index 00000000..45e13332 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/sweep/objective.py @@ -0,0 +1,53 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +from typing import Optional + +from azure.ai.ml._restclient.v2023_08_01_preview.models import Objective as RestObjective +from azure.ai.ml.entities._mixins import RestTranslatableMixin + + +class Objective(RestTranslatableMixin): + """Optimization objective. + + :param goal: Defines supported metric goals for hyperparameter tuning. Accepted values + are: "minimize", "maximize". + :type goal: str + :param primary_metric: The name of the metric to optimize. + :type primary_metric: str + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_sweep_configurations.py + :start-after: [START configure_sweep_job_bayesian_sampling_algorithm] + :end-before: [END configure_sweep_job_bayesian_sampling_algorithm] + :language: python + :dedent: 8 + :caption: Assigning an objective to a SweepJob. + """ + + def __init__(self, goal: Optional[str], primary_metric: Optional[str] = None) -> None: + """Optimization objective. + + :param goal: Defines supported metric goals for hyperparameter tuning. Acceptable values + are: "minimize" or "maximize". + :type goal: str + :param primary_metric: The name of the metric to optimize. + :type primary_metric: str + """ + if goal is not None: + self.goal = goal.lower() + self.primary_metric = primary_metric + + def _to_rest_object(self) -> RestObjective: + return RestObjective( + goal=self.goal, + primary_metric=self.primary_metric, + ) + + @classmethod + def _from_rest_object(cls, obj: RestObjective) -> Optional["Objective"]: + if not obj: + return None + + return cls(goal=obj.goal, primary_metric=obj.primary_metric) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/sweep/parameterized_sweep.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/sweep/parameterized_sweep.py new file mode 100644 index 00000000..5d69201f --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/sweep/parameterized_sweep.py @@ -0,0 +1,341 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +from typing import Any, Dict, List, Optional, Type, Union + +from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationErrorType, ValidationException + +from ..job_limits import SweepJobLimits +from ..job_resource_configuration import JobResourceConfiguration +from ..queue_settings import QueueSettings +from .early_termination_policy import ( + BanditPolicy, + EarlyTerminationPolicy, + EarlyTerminationPolicyType, + MedianStoppingPolicy, + TruncationSelectionPolicy, +) +from .objective import Objective +from .sampling_algorithm import ( + BayesianSamplingAlgorithm, + GridSamplingAlgorithm, + RandomSamplingAlgorithm, + RestBayesianSamplingAlgorithm, + RestGridSamplingAlgorithm, + RestRandomSamplingAlgorithm, + RestSamplingAlgorithm, + SamplingAlgorithm, + SamplingAlgorithmType, +) + +SAMPLING_ALGORITHM_TO_REST_CONSTRUCTOR: Dict[SamplingAlgorithmType, Type[RestSamplingAlgorithm]] = { + SamplingAlgorithmType.RANDOM: RestRandomSamplingAlgorithm, + SamplingAlgorithmType.GRID: RestGridSamplingAlgorithm, + SamplingAlgorithmType.BAYESIAN: RestBayesianSamplingAlgorithm, +} + +SAMPLING_ALGORITHM_CONSTRUCTOR: Dict[SamplingAlgorithmType, Type[SamplingAlgorithm]] = { + SamplingAlgorithmType.RANDOM: RandomSamplingAlgorithm, + SamplingAlgorithmType.GRID: GridSamplingAlgorithm, + SamplingAlgorithmType.BAYESIAN: BayesianSamplingAlgorithm, +} + + +class ParameterizedSweep: # pylint:disable=too-many-instance-attributes + """Shared logic for standalone and pipeline sweep job.""" + + def __init__( + self, + limits: Optional[SweepJobLimits] = None, + sampling_algorithm: Optional[Union[str, SamplingAlgorithm]] = None, + objective: Optional[Union[Dict, Objective]] = None, + early_termination: Optional[Any] = None, + search_space: Optional[Dict] = None, + queue_settings: Optional[QueueSettings] = None, + resources: Optional[Union[dict, JobResourceConfiguration]] = None, + ) -> None: + """ + :param limits: Limits for sweep job. + :type limits: ~azure.ai.ml.sweep.SweepJobLimits + :param sampling_algorithm: Sampling algorithm for sweep job. + :type sampling_algorithm: ~azure.ai.ml.sweep.SamplingAlgorithm + :param objective: Objective for sweep job. + :type objective: ~azure.ai.ml.sweep.Objective + :param early_termination: Early termination policy for sweep job. + :type early_termination: ~azure.ai.ml.entities._job.sweep.early_termination_policy.EarlyTerminationPolicy + :param search_space: Search space for sweep job. + :type search_space: Dict[str, Union[ + ~azure.ai.ml.sweep.Choice, + ~azure.ai.ml.sweep.LogNormal, + ~azure.ai.ml.sweep.LogUniform, + ~azure.ai.ml.sweep.Normal, + ~azure.ai.ml.sweep.QLogNormal, + ~azure.ai.ml.sweep.QLogUniform, + ~azure.ai.ml.sweep.QNormal, + ~azure.ai.ml.sweep.QUniform, + ~azure.ai.ml.sweep.Randint, + ~azure.ai.ml.sweep.Uniform + + ]] + :param queue_settings: Queue settings for sweep job. + :type queue_settings: ~azure.ai.ml.entities.QueueSettings + :param resources: Compute Resource configuration for the job. + :type resources: ~azure.ai.ml.entities.ResourceConfiguration + """ + self.sampling_algorithm = sampling_algorithm + self.early_termination = early_termination # type: ignore[assignment] + self._limits = limits + self.search_space = search_space + self.queue_settings = queue_settings + self.objective: Optional[Objective] = None + self.resources = resources + + if isinstance(objective, Dict): + self.objective = Objective(**objective) + else: + self.objective = objective + + @property + def resources(self) -> Optional[Union[dict, JobResourceConfiguration]]: + """Resources for sweep job. + + :returns: Resources for sweep job. + :rtype: ~azure.ai.ml.entities.ResourceConfiguration + """ + return self._resources + + @resources.setter + def resources(self, value: Optional[Union[dict, JobResourceConfiguration]]) -> None: + """Set Resources for sweep job. + + :param value: Compute Resource configuration for the job. + :type value: ~azure.ai.ml.entities.ResourceConfiguration + """ + if isinstance(value, dict): + value = JobResourceConfiguration(**value) + self._resources = value + + @property + def limits(self) -> Optional[SweepJobLimits]: + """Limits for sweep job. + + :returns: Limits for sweep job. + :rtype: ~azure.ai.ml.sweep.SweepJobLimits + """ + return self._limits + + @limits.setter + def limits(self, value: SweepJobLimits) -> None: + """Set limits for sweep job. + + :param value: Limits for sweep job. + :type value: ~azure.ai.ml.sweep.SweepJobLimits + """ + if not isinstance(value, SweepJobLimits): + msg = f"limits must be SweepJobLimits but get {type(value)} instead" + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.SWEEP_JOB, + error_category=ErrorCategory.USER_ERROR, + error_type=ValidationErrorType.INVALID_VALUE, + ) + self._limits = value + + def set_resources( + self, + *, + instance_type: Optional[Union[str, List[str]]] = None, + instance_count: Optional[int] = None, + locations: Optional[List[str]] = None, + properties: Optional[Dict] = None, + docker_args: Optional[str] = None, + shm_size: Optional[str] = None, + ) -> None: + """Set resources for Sweep. + + :keyword instance_type: The instance type to use for the job. + :paramtype instance_type: Optional[Union[str, List[str]]] + :keyword instance_count: The number of instances to use for the job. + :paramtype instance_count: Optional[int] + :keyword locations: The locations to use for the job. + :paramtype locations: Optional[List[str]] + :keyword properties: The properties for the job. + :paramtype properties: Optional[Dict] + :keyword docker_args: The docker arguments for the job. + :paramtype docker_args: Optional[str] + :keyword shm_size: The shared memory size for the job. + :paramtype shm_size: Optional[str] + """ + if self.resources is None: + self.resources = JobResourceConfiguration() + + if not isinstance(self.resources, dict): + if locations is not None: + self.resources.locations = locations + if instance_type is not None: + self.resources.instance_type = instance_type + if instance_count is not None: + self.resources.instance_count = instance_count + if properties is not None: + self.resources.properties = properties + if docker_args is not None: + self.resources.docker_args = docker_args + if shm_size is not None: + self.resources.shm_size = shm_size + + def set_limits( + self, + *, + max_concurrent_trials: Optional[int] = None, + max_total_trials: Optional[int] = None, + timeout: Optional[int] = None, + trial_timeout: Optional[int] = None, + ) -> None: + """Set limits for Sweep node. Leave parameters as None if you don't want to update corresponding values. + + :keyword max_concurrent_trials: maximum concurrent trial number. + :paramtype max_concurrent_trials: int + :keyword max_total_trials: maximum total trial number. + :paramtype max_total_trials: int + :keyword timeout: total timeout in seconds for sweep node + :paramtype timeout: int + :keyword trial_timeout: timeout in seconds for each trial + :paramtype trial_timeout: int + """ + # Looks related to https://github.com/pylint-dev/pylint/issues/3502, still an open issue + # pylint:disable=attribute-defined-outside-init + if self._limits is None: + self._limits = SweepJobLimits( + max_concurrent_trials=max_concurrent_trials, + max_total_trials=max_total_trials, + timeout=timeout, + trial_timeout=trial_timeout, + ) + else: + if self.limits is not None: + if max_concurrent_trials is not None: + self.limits.max_concurrent_trials = max_concurrent_trials + if max_total_trials is not None: + self.limits.max_total_trials = max_total_trials + if timeout is not None: + self.limits.timeout = timeout + if trial_timeout is not None: + self.limits.trial_timeout = trial_timeout + + def set_objective(self, *, goal: Optional[str] = None, primary_metric: Optional[str] = None) -> None: + """Set the sweep object.. Leave parameters as None if you don't want to update corresponding values. + + :keyword goal: Defines supported metric goals for hyperparameter tuning. Acceptable values are: + "minimize" and "maximize". + :paramtype goal: str + :keyword primary_metric: Name of the metric to optimize. + :paramtype primary_metric: str + """ + + if self.objective is not None: + if goal: + self.objective.goal = goal + if primary_metric: + self.objective.primary_metric = primary_metric + else: + self.objective = Objective(goal=goal, primary_metric=primary_metric) + + @property + def sampling_algorithm(self) -> Optional[Union[str, SamplingAlgorithm]]: + """Sampling algorithm for sweep job. + + :returns: Sampling algorithm for sweep job. + :rtype: ~azure.ai.ml.sweep.SamplingAlgorithm + """ + return self._sampling_algorithm + + @sampling_algorithm.setter + def sampling_algorithm(self, value: Optional[Union[SamplingAlgorithm, str]] = None) -> None: + """Set sampling algorithm for sweep job. + + :param value: Sampling algorithm for sweep job. + :type value: ~azure.ai.ml.sweep.SamplingAlgorithm + """ + if value is None: + self._sampling_algorithm = None + elif isinstance(value, SamplingAlgorithm) or ( + isinstance(value, str) and value.lower().capitalize() in SAMPLING_ALGORITHM_CONSTRUCTOR + ): + self._sampling_algorithm = value + else: + msg = f"unsupported sampling algorithm: {value}" + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.SWEEP_JOB, + error_category=ErrorCategory.USER_ERROR, + error_type=ValidationErrorType.INVALID_VALUE, + ) + + def _get_rest_sampling_algorithm(self) -> RestSamplingAlgorithm: + # TODO: self.sampling_algorithm will always return SamplingAlgorithm + if isinstance(self.sampling_algorithm, SamplingAlgorithm): + return self.sampling_algorithm._to_rest_object() # pylint: disable=protected-access + + if isinstance(self.sampling_algorithm, str): + return SAMPLING_ALGORITHM_CONSTRUCTOR[ # pylint: disable=protected-access + SamplingAlgorithmType(self.sampling_algorithm.lower().capitalize()) + ]()._to_rest_object() + + msg = f"Received unsupported value {self._sampling_algorithm} as the sampling algorithm" + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.SWEEP_JOB, + error_category=ErrorCategory.USER_ERROR, + error_type=ValidationErrorType.INVALID_VALUE, + ) + + @property + def early_termination(self) -> Optional[Union[str, EarlyTerminationPolicy]]: + """Early termination policy for sweep job. + + :returns: Early termination policy for sweep job. + :rtype: ~azure.ai.ml.entities._job.sweep.early_termination_policy.EarlyTerminationPolicy + """ + return self._early_termination + + @early_termination.setter + def early_termination(self, value: Any) -> None: + """Set early termination policy for sweep job. + + :param value: Early termination policy for sweep job. + :type value: ~azure.ai.ml.entities._job.sweep.early_termination_policy.EarlyTerminationPolicy + """ + self._early_termination: Optional[Union[str, EarlyTerminationPolicy]] + if value is None: + self._early_termination = None + elif isinstance(value, EarlyTerminationPolicy): + self._early_termination = value + elif isinstance(value, str): + value = value.lower().capitalize() + if value == EarlyTerminationPolicyType.BANDIT: + self._early_termination = BanditPolicy() + elif value == EarlyTerminationPolicyType.MEDIAN_STOPPING: + self._early_termination = MedianStoppingPolicy() + elif value == EarlyTerminationPolicyType.TRUNCATION_SELECTION: + self._early_termination = TruncationSelectionPolicy() + else: + msg = f"Received unsupported value {value} as the early termination policy" + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.SWEEP_JOB, + error_category=ErrorCategory.USER_ERROR, + error_type=ValidationErrorType.INVALID_VALUE, + ) + else: + msg = f"Received unsupported value of type {type(value)} as the early termination policy" + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.SWEEP_JOB, + error_category=ErrorCategory.USER_ERROR, + error_type=ValidationErrorType.INVALID_VALUE, + ) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/sweep/sampling_algorithm.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/sweep/sampling_algorithm.py new file mode 100644 index 00000000..d0bf795d --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/sweep/sampling_algorithm.py @@ -0,0 +1,141 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +from abc import ABC +from typing import Any, Optional, Union, cast + +from azure.ai.ml._restclient.v2023_08_01_preview.models import ( + BayesianSamplingAlgorithm as RestBayesianSamplingAlgorithm, +) +from azure.ai.ml._restclient.v2023_08_01_preview.models import GridSamplingAlgorithm as RestGridSamplingAlgorithm +from azure.ai.ml._restclient.v2023_08_01_preview.models import RandomSamplingAlgorithm as RestRandomSamplingAlgorithm +from azure.ai.ml._restclient.v2023_08_01_preview.models import SamplingAlgorithm as RestSamplingAlgorithm +from azure.ai.ml._restclient.v2023_08_01_preview.models import SamplingAlgorithmType +from azure.ai.ml.entities._mixins import RestTranslatableMixin + + +class SamplingAlgorithm(ABC, RestTranslatableMixin): + """Base class for sampling algorithms. + + This class should not be instantiated directly. Instead, use one of its subclasses. + """ + + def __init__(self) -> None: + self.type = None + + @classmethod + def _from_rest_object(cls, obj: RestSamplingAlgorithm) -> Optional["SamplingAlgorithm"]: + if not obj: + return None + + sampling_algorithm: Any = None + if obj.sampling_algorithm_type == SamplingAlgorithmType.RANDOM: + sampling_algorithm = RandomSamplingAlgorithm._from_rest_object(obj) # pylint: disable=protected-access + + if obj.sampling_algorithm_type == SamplingAlgorithmType.GRID: + sampling_algorithm = GridSamplingAlgorithm._from_rest_object(obj) # pylint: disable=protected-access + + if obj.sampling_algorithm_type == SamplingAlgorithmType.BAYESIAN: + sampling_algorithm = BayesianSamplingAlgorithm._from_rest_object(obj) # pylint: disable=protected-access + + return cast(Optional["SamplingAlgorithm"], sampling_algorithm) + + +class RandomSamplingAlgorithm(SamplingAlgorithm): + """Random Sampling Algorithm. + + :keyword rule: The specific type of random algorithm. Accepted values are: "random" and "sobol". + :type rule: str + :keyword seed: The seed for random number generation. + :paramtype seed: int + :keyword logbase: A positive number or the number "e" in string format to be used as the base for log + based random sampling. + :paramtype logbase: Union[float, str] + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_sweep_configurations.py + :start-after: [START configure_sweep_job_random_sampling_algorithm] + :end-before: [END configure_sweep_job_random_sampling_algorithm] + :language: python + :dedent: 8 + :caption: Assigning a random sampling algorithm for a SweepJob + """ + + def __init__( + self, + *, + rule: Optional[str] = None, + seed: Optional[int] = None, + logbase: Optional[Union[float, str]] = None, + ) -> None: + super().__init__() + self.type = SamplingAlgorithmType.RANDOM.lower() + self.rule = rule + self.seed = seed + self.logbase = logbase + + def _to_rest_object(self) -> RestRandomSamplingAlgorithm: + return RestRandomSamplingAlgorithm( + rule=self.rule, + seed=self.seed, + logbase=self.logbase, + ) + + @classmethod + def _from_rest_object(cls, obj: RestRandomSamplingAlgorithm) -> "RandomSamplingAlgorithm": + return cls( + rule=obj.rule, + seed=obj.seed, + logbase=obj.logbase, + ) + + +class GridSamplingAlgorithm(SamplingAlgorithm): + """Grid Sampling Algorithm. + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_sweep_configurations.py + :start-after: [START configure_sweep_job_grid_sampling_algorithm] + :end-before: [END configure_sweep_job_grid_sampling_algorithm] + :language: python + :dedent: 8 + :caption: Assigning a grid sampling algorithm for a SweepJob + """ + + def __init__(self) -> None: + super().__init__() + self.type = SamplingAlgorithmType.GRID.lower() + + def _to_rest_object(self) -> RestGridSamplingAlgorithm: + return RestGridSamplingAlgorithm() + + @classmethod + def _from_rest_object(cls, obj: RestGridSamplingAlgorithm) -> "GridSamplingAlgorithm": + return cls() + + +class BayesianSamplingAlgorithm(SamplingAlgorithm): + """Bayesian Sampling Algorithm. + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_sweep_configurations.py + :start-after: [START configure_sweep_job_bayesian_sampling_algorithm] + :end-before: [END configure_sweep_job_bayesian_sampling_algorithm] + :language: python + :dedent: 8 + :caption: Assigning a Bayesian sampling algorithm for a SweepJob + """ + + def __init__(self) -> None: + super().__init__() + self.type = SamplingAlgorithmType.BAYESIAN.lower() + + def _to_rest_object(self) -> RestBayesianSamplingAlgorithm: + return RestBayesianSamplingAlgorithm() + + @classmethod + def _from_rest_object(cls, obj: RestBayesianSamplingAlgorithm) -> "BayesianSamplingAlgorithm": + return cls() diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/sweep/search_space.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/sweep/search_space.py new file mode 100644 index 00000000..bbc08d98 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/sweep/search_space.py @@ -0,0 +1,393 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=protected-access + +from abc import ABC +from typing import Any, List, Optional, Union + +from azure.ai.ml.constants._common import TYPE +from azure.ai.ml.constants._job.sweep import SearchSpace +from azure.ai.ml.entities._mixins import RestTranslatableMixin +from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, JobException + + +class SweepDistribution(ABC, RestTranslatableMixin): + """Base class for sweep distribution configuration. + + This class should not be instantiated directly. Instead, use one of its subclasses. + + :keyword type: Type of distribution. + :paramtype type: str + """ + + def __init__(self, *, type: Optional[str] = None) -> None: # pylint: disable=redefined-builtin + self.type = type + + @classmethod + def _from_rest_object(cls, obj: List) -> "SweepDistribution": + mapping = { + SearchSpace.CHOICE: Choice, + SearchSpace.NORMAL: Normal, + SearchSpace.LOGNORMAL: LogNormal, + SearchSpace.QNORMAL: QNormal, + SearchSpace.QLOGNORMAL: QLogNormal, + SearchSpace.RANDINT: Randint, + SearchSpace.UNIFORM: Uniform, + SearchSpace.QUNIFORM: QUniform, + SearchSpace.LOGUNIFORM: LogUniform, + SearchSpace.QLOGUNIFORM: QLogUniform, + } + + ss_class: Any = mapping.get(obj[0], None) + if ss_class: + res: SweepDistribution = ss_class._from_rest_object(obj) + return res + + msg = f"Unknown search space type: {obj[0]}" + raise JobException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.SWEEP_JOB, + error_category=ErrorCategory.SYSTEM_ERROR, + ) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, SweepDistribution): + return NotImplemented + res: bool = self._to_rest_object() == other._to_rest_object() + return res + + +class Choice(SweepDistribution): + """Choice distribution configuration. + + :param values: List of values to choose from. + :type values: list[Union[float, str, dict]] + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_sweep_configurations.py + :start-after: [START configure_sweep_job_choice_loguniform] + :end-before: [END configure_sweep_job_choice_loguniform] + :language: python + :dedent: 8 + :caption: Using Choice distribution to set values for a hyperparameter sweep + """ + + def __init__(self, values: Optional[List[Union[float, str, dict]]] = None, **kwargs: Any) -> None: + kwargs.setdefault(TYPE, SearchSpace.CHOICE) + super().__init__(**kwargs) + self.values = values + + def _to_rest_object(self) -> List: + items: List = [] + if self.values is not None: + for value in self.values: + if isinstance(value, dict): + rest_dict = {} + for k, v in value.items(): + if isinstance(v, SweepDistribution): + rest_dict[k] = v._to_rest_object() + else: + rest_dict[k] = v + items.append(rest_dict) + else: + items.append(value) + return [self.type, [items]] + + @classmethod + def _from_rest_object(cls, obj: List) -> "Choice": + rest_values = obj[1][0] + from_rest_values = [] + for rest_value in rest_values: + if isinstance(rest_value, dict): + from_rest_dict = {} + for k, v in rest_value.items(): + try: + # first assume that any dictionary value is a valid distribution (i.e. normal, uniform, etc) + # and try to deserialize it into a the correct SDK distribution object + from_rest_dict[k] = SweepDistribution._from_rest_object(v) + except Exception: # pylint: disable=W0718 + # if an exception is raised, assume that the value was not a valid distribution and use the + # value as it is for deserialization + from_rest_dict[k] = v + from_rest_values.append(from_rest_dict) + else: + from_rest_values.append(rest_value) + return Choice(values=from_rest_values) # type: ignore[arg-type] + + +class Normal(SweepDistribution): + """Normal distribution configuration. + + :param mu: Mean of the distribution. + :type mu: float + :param sigma: Standard deviation of the distribution. + :type sigma: float + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_sweep_configurations.py + :start-after: [START configure_sweep_job_randint_normal] + :end-before: [END configure_sweep_job_randint_normal] + :language: python + :dedent: 8 + :caption: Configuring Normal distributions for a hyperparameter sweep on a Command job. + """ + + def __init__(self, mu: Optional[float] = None, sigma: Optional[float] = None, **kwargs: Any) -> None: + kwargs.setdefault(TYPE, SearchSpace.NORMAL) + super().__init__(**kwargs) + self.mu = mu + self.sigma = sigma + + def _to_rest_object(self) -> List: + return [self.type, [self.mu, self.sigma]] + + @classmethod + def _from_rest_object(cls, obj: List) -> "Normal": + return cls(mu=obj[1][0], sigma=obj[1][1]) + + +class LogNormal(Normal): + """LogNormal distribution configuration. + + :param mu: Mean of the log of the distribution. + :type mu: float + :param sigma: Standard deviation of the log of the distribution. + :type sigma: float + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_sweep_configurations.py + :start-after: [START configure_sweep_job_lognormal_qlognormal] + :end-before: [END configure_sweep_job_lognormal_qlognormal] + :language: python + :dedent: 8 + :caption: Configuring LogNormal distributions for a hyperparameter sweep on a Command job. + """ + + def __init__(self, mu: Optional[float] = None, sigma: Optional[float] = None, **kwargs: Any) -> None: + kwargs.setdefault(TYPE, SearchSpace.LOGNORMAL) + super().__init__(mu=mu, sigma=sigma, **kwargs) + + +class QNormal(Normal): + """QNormal distribution configuration. + + :param mu: Mean of the distribution. + :type mu: float + :param sigma: Standard deviation of the distribution. + :type sigma: float + :param q: Quantization factor. + :type q: int + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_sweep_configurations.py + :start-after: [START configure_sweep_job_qloguniform_qnormal] + :end-before: [END configure_sweep_job_qloguniform_qnormal] + :language: python + :dedent: 8 + :caption: Configuring QNormal distributions for a hyperparameter sweep on a Command job. + """ + + def __init__( + self, mu: Optional[float] = None, sigma: Optional[float] = None, q: Optional[int] = None, **kwargs: Any + ) -> None: + kwargs.setdefault(TYPE, SearchSpace.QNORMAL) + super().__init__(mu=mu, sigma=sigma, **kwargs) + self.q = q + + def _to_rest_object(self) -> List: + return [self.type, [self.mu, self.sigma, self.q]] + + @classmethod + def _from_rest_object(cls, obj: List) -> "QNormal": + return cls(mu=obj[1][0], sigma=obj[1][1], q=obj[1][2]) + + +class QLogNormal(QNormal): + """QLogNormal distribution configuration. + + :param mu: Mean of the log of the distribution. + :type mu: Optional[float] + :param sigma: Standard deviation of the log of the distribution. + :type sigma: Optional[float] + :param q: Quantization factor. + :type q: Optional[int] + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_sweep_configurations.py + :start-after: [START configure_sweep_job_lognormal_qlognormal] + :end-before: [END configure_sweep_job_lognormal_qlognormal] + :language: python + :dedent: 8 + :caption: Configuring QLogNormal distributions for a hyperparameter sweep on a Command job. + """ + + def __init__( + self, mu: Optional[float] = None, sigma: Optional[float] = None, q: Optional[int] = None, **kwargs: Any + ) -> None: + kwargs.setdefault(TYPE, SearchSpace.QLOGNORMAL) + super().__init__(mu=mu, sigma=sigma, q=q, **kwargs) + + +class Randint(SweepDistribution): + """Randint distribution configuration. + + :param upper: Upper bound of the distribution. + :type upper: int + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_sweep_configurations.py + :start-after: [START configure_sweep_job_randint_normal] + :end-before: [END configure_sweep_job_randint_normal] + :language: python + :dedent: 8 + :caption: Configuring Randint distributions for a hyperparameter sweep on a Command job. + """ + + def __init__(self, upper: Optional[int] = None, **kwargs: Any) -> None: + kwargs.setdefault(TYPE, SearchSpace.RANDINT) + super().__init__(**kwargs) + self.upper = upper + + def _to_rest_object(self) -> List: + return [self.type, [self.upper]] + + @classmethod + def _from_rest_object(cls, obj: List) -> "Randint": + return cls(upper=obj[1][0]) + + +class Uniform(SweepDistribution): + """ + + Uniform distribution configuration. + + :param min_value: Minimum value of the distribution. + :type min_value: float + :param max_value: Maximum value of the distribution. + :type max_value: float + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_sweep_configurations.py + :start-after: [START configure_sweep_job_uniform] + :end-before: [END configure_sweep_job_uniform] + :language: python + :dedent: 8 + :caption: Configuring Uniform distributions for learning rates and momentum + during a hyperparameter sweep on a Command job. + """ + + def __init__(self, min_value: Optional[float] = None, max_value: Optional[float] = None, **kwargs: Any) -> None: + kwargs.setdefault(TYPE, SearchSpace.UNIFORM) + super().__init__(**kwargs) + self.min_value = min_value + self.max_value = max_value + + def _to_rest_object(self) -> List: + return [self.type, [self.min_value, self.max_value]] + + @classmethod + def _from_rest_object(cls, obj: List) -> "Uniform": + return cls(min_value=obj[1][0], max_value=obj[1][1]) + + +class LogUniform(Uniform): + """LogUniform distribution configuration. + + :param min_value: Minimum value of the log of the distribution. + :type min_value: float + :param max_value: Maximum value of the log of the distribution. + :type max_value: float + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_sweep_configurations.py + :start-after: [START configure_sweep_job_choice_loguniform] + :end-before: [END configure_sweep_job_choice_loguniform] + :language: python + :dedent: 8 + :caption: Configuring a LogUniform distribution for a hyperparameter sweep job learning rate + """ + + def __init__(self, min_value: Optional[float] = None, max_value: Optional[float] = None, **kwargs: Any) -> None: + kwargs.setdefault(TYPE, SearchSpace.LOGUNIFORM) + super().__init__(min_value=min_value, max_value=max_value, **kwargs) + + +class QUniform(Uniform): + """QUniform distribution configuration. + + :param min_value: Minimum value of the distribution. + :type min_value: Optional[Union[int, float]] + :param max_value: Maximum value of the distribution. + :type max_value: Optional[Union[int, float]] + :param q: Quantization factor. + :type q: Optional[int] + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_sweep_configurations.py + :start-after: [START configure_sweep_job_truncation_selection_policy] + :end-before: [END configure_sweep_job_truncation_selection_policy] + :language: python + :dedent: 8 + :caption: Configuring QUniform distributions for a hyperparameter sweep on a Command job. + """ + + def __init__( + self, + min_value: Optional[Union[int, float]] = None, + max_value: Optional[Union[int, float]] = None, + q: Optional[int] = None, + **kwargs: Any, + ) -> None: + kwargs.setdefault(TYPE, SearchSpace.QUNIFORM) + super().__init__(min_value=min_value, max_value=max_value, **kwargs) + self.q = q + + def _to_rest_object(self) -> List: + return [self.type, [self.min_value, self.max_value, self.q]] + + @classmethod + def _from_rest_object(cls, obj: List) -> "QUniform": + return cls(min_value=obj[1][0], max_value=obj[1][1], q=obj[1][2]) + + +class QLogUniform(QUniform): + """QLogUniform distribution configuration. + + :param min_value: Minimum value of the log of the distribution. + :type min_value: Optional[float] + :param max_value: Maximum value of the log of the distribution. + :type max_value: Optional[float] + :param q: Quantization factor. + :type q: Optional[int] + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_sweep_configurations.py + :start-after: [START configure_sweep_job_qloguniform_qnormal] + :end-before: [END configure_sweep_job_qloguniform_qnormal] + :language: python + :dedent: 8 + :caption: Configuring QLogUniform distributions for a hyperparameter sweep on a Command job. + """ + + def __init__( + self, + min_value: Optional[float] = None, + max_value: Optional[float] = None, + q: Optional[int] = None, + **kwargs: Any, + ) -> None: + kwargs.setdefault(TYPE, SearchSpace.QLOGUNIFORM) + super().__init__(min_value=min_value, max_value=max_value, q=q, **kwargs) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/sweep/sweep_job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/sweep/sweep_job.py new file mode 100644 index 00000000..0a99bb39 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/sweep/sweep_job.py @@ -0,0 +1,361 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=protected-access + +import logging +from typing import Any, Dict, NoReturn, Optional, Union + +from azure.ai.ml._restclient.v2023_08_01_preview.models import JobBase +from azure.ai.ml._restclient.v2023_08_01_preview.models import SweepJob as RestSweepJob +from azure.ai.ml._restclient.v2023_08_01_preview.models import TrialComponent +from azure.ai.ml._schema._sweep.sweep_job import SweepJobSchema +from azure.ai.ml._utils.utils import map_single_brackets_and_warn +from azure.ai.ml.constants import JobType +from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, TYPE +from azure.ai.ml.entities._component.command_component import CommandComponent +from azure.ai.ml.entities._credentials import ( + AmlTokenConfiguration, + ManagedIdentityConfiguration, + UserIdentityConfiguration, + _BaseJobIdentityConfiguration, +) +from azure.ai.ml.entities._inputs_outputs import Input +from azure.ai.ml.entities._job._input_output_helpers import ( + from_rest_data_outputs, + from_rest_inputs_to_dataset_literal, + to_rest_data_outputs, + to_rest_dataset_literal_inputs, + validate_inputs_for_command, + validate_key_contains_allowed_characters, +) +from azure.ai.ml.entities._job.command_job import CommandJob +from azure.ai.ml.entities._job.job import Job +from azure.ai.ml.entities._job.job_io_mixin import JobIOMixin +from azure.ai.ml.entities._job.job_resource_configuration import JobResourceConfiguration +from azure.ai.ml.entities._job.sweep.sampling_algorithm import SamplingAlgorithm +from azure.ai.ml.entities._system_data import SystemData +from azure.ai.ml.entities._util import load_from_dict +from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, JobException + +# from ..identity import AmlToken, Identity, ManagedIdentity, UserIdentity +from ..job_limits import SweepJobLimits +from ..parameterized_command import ParameterizedCommand +from ..queue_settings import QueueSettings +from .early_termination_policy import ( + BanditPolicy, + EarlyTerminationPolicy, + MedianStoppingPolicy, + TruncationSelectionPolicy, +) +from .objective import Objective +from .parameterized_sweep import ParameterizedSweep +from .search_space import ( + Choice, + LogNormal, + LogUniform, + Normal, + QLogNormal, + QLogUniform, + QNormal, + QUniform, + Randint, + SweepDistribution, + Uniform, +) + +module_logger = logging.getLogger(__name__) + + +class SweepJob(Job, ParameterizedSweep, JobIOMixin): + """Sweep job for hyperparameter tuning. + + .. note:: + For sweep jobs, inputs, outputs, and parameters are accessible as environment variables using the prefix + ``AZUREML_SWEEP_``. For example, if you have a parameter named "learning_rate", you can access it as + ``AZUREML_SWEEP_learning_rate``. + + :keyword name: Name of the job. + :paramtype name: str + :keyword display_name: Display name of the job. + :paramtype display_name: str + :keyword description: Description of the job. + :paramtype description: str + :keyword tags: Tag dictionary. Tags can be added, removed, and updated. + :paramtype tags: dict[str, str] + :keyword properties: The asset property dictionary. + :paramtype properties: dict[str, str] + :keyword experiment_name: Name of the experiment the job will be created under. If None is provided, + job will be created under experiment 'Default'. + :paramtype experiment_name: str + :keyword identity: Identity that the training job will use while running on compute. + :paramtype identity: Union[ + ~azure.ai.ml.ManagedIdentityConfiguration, + ~azure.ai.ml.AmlTokenConfiguration, + ~azure.ai.ml.UserIdentityConfiguration + + ] + + :keyword inputs: Inputs to the command. + :paramtype inputs: dict + :keyword outputs: Mapping of output data bindings used in the job. + :paramtype outputs: dict[str, ~azure.ai.ml.Output] + :keyword sampling_algorithm: The hyperparameter sampling algorithm to use over the `search_space`. Defaults to + "random". + + :paramtype sampling_algorithm: str + :keyword search_space: Dictionary of the hyperparameter search space. The key is the name of the hyperparameter + and the value is the parameter expression. + + :paramtype search_space: Dict + :keyword objective: Metric to optimize for. + :paramtype objective: Objective + :keyword compute: The compute target the job runs on. + :paramtype compute: str + :keyword trial: The job configuration for each trial. Each trial will be provided with a different combination + of hyperparameter values that the system samples from the search_space. + + :paramtype trial: Union[ + ~azure.ai.ml.entities.CommandJob, + ~azure.ai.ml.entities.CommandComponent + + ] + + :keyword early_termination: The early termination policy to use. A trial job is canceled + when the criteria of the specified policy are met. If omitted, no early termination policy will be applied. + + :paramtype early_termination: Union[ + ~azure.mgmt.machinelearningservices.models.BanditPolicy, + ~azure.mgmt.machinelearningservices.models.MedianStoppingPolicy, + ~azure.mgmt.machinelearningservices.models.TruncationSelectionPolicy + + ] + + :keyword limits: Limits for the sweep job. + :paramtype limits: ~azure.ai.ml.entities.SweepJobLimits + :keyword queue_settings: Queue settings for the job. + :paramtype queue_settings: ~azure.ai.ml.entities.QueueSettings + :keyword resources: Compute Resource configuration for the job. + :paramtype resources: Optional[Union[~azure.ai.ml.entities.ResourceConfiguration] + :keyword kwargs: A dictionary of additional configuration parameters. + :paramtype kwargs: dict + + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_sweep_configurations.py + :start-after: [START configure_sweep_job_bayesian_sampling_algorithm] + :end-before: [END configure_sweep_job_bayesian_sampling_algorithm] + :language: python + :dedent: 8 + :caption: Creating a SweepJob + """ + + def __init__( + self, + *, + name: Optional[str] = None, + description: Optional[str] = None, + tags: Optional[Dict] = None, + display_name: Optional[str] = None, + experiment_name: Optional[str] = None, + identity: Optional[ + Union[ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration] + ] = None, + inputs: Optional[Dict[str, Union[Input, str, bool, int, float]]] = None, + outputs: Optional[Dict] = None, + compute: Optional[str] = None, + limits: Optional[SweepJobLimits] = None, + sampling_algorithm: Optional[Union[str, SamplingAlgorithm]] = None, + search_space: Optional[ + Dict[ + str, + Union[ + Choice, LogNormal, LogUniform, Normal, QLogNormal, QLogUniform, QNormal, QUniform, Randint, Uniform + ], + ] + ] = None, + objective: Optional[Objective] = None, + trial: Optional[Union[CommandJob, CommandComponent]] = None, + early_termination: Optional[ + Union[EarlyTerminationPolicy, BanditPolicy, MedianStoppingPolicy, TruncationSelectionPolicy] + ] = None, + queue_settings: Optional[QueueSettings] = None, + resources: Optional[Union[dict, JobResourceConfiguration]] = None, + **kwargs: Any, + ) -> None: + kwargs[TYPE] = JobType.SWEEP + + Job.__init__( + self, + name=name, + description=description, + tags=tags, + display_name=display_name, + experiment_name=experiment_name, + compute=compute, + **kwargs, + ) + self.inputs = inputs # type: ignore[assignment] + self.outputs = outputs # type: ignore[assignment] + self.trial = trial + self.identity = identity + + ParameterizedSweep.__init__( + self, + limits=limits, + sampling_algorithm=sampling_algorithm, + objective=objective, + early_termination=early_termination, + search_space=search_space, + queue_settings=queue_settings, + resources=resources, + ) + + def _to_dict(self) -> Dict: + res: dict = SweepJobSchema(context={BASE_PATH_CONTEXT_KEY: "./"}).dump(self) + return res + + def _to_rest_object(self) -> JobBase: + self._override_missing_properties_from_trial() + if self.trial is not None: + self.trial.command = map_single_brackets_and_warn(self.trial.command) + + if self.search_space is not None: + search_space = {param: space._to_rest_object() for (param, space) in self.search_space.items()} + + if self.trial is not None: + validate_inputs_for_command(self.trial.command, self.inputs) + for key in search_space.keys(): # pylint: disable=possibly-used-before-assignment + validate_key_contains_allowed_characters(key) + + if self.trial is not None: + trial_component = TrialComponent( + code_id=self.trial.code, + distribution=( + self.trial.distribution._to_rest_object() + if self.trial.distribution and not isinstance(self.trial.distribution, Dict) + else None + ), + environment_id=self.trial.environment, + command=self.trial.command, + environment_variables=self.trial.environment_variables, + resources=( + self.trial.resources._to_rest_object() + if self.trial.resources and not isinstance(self.trial.resources, Dict) + else None + ), + ) + + sweep_job = RestSweepJob( + display_name=self.display_name, + description=self.description, + experiment_name=self.experiment_name, + search_space=search_space, + sampling_algorithm=self._get_rest_sampling_algorithm() if self.sampling_algorithm else None, + limits=self.limits._to_rest_object() if self.limits else None, + early_termination=( + self.early_termination._to_rest_object() + if self.early_termination and not isinstance(self.early_termination, str) + else None + ), + properties=self.properties, + compute_id=self.compute, + objective=self.objective._to_rest_object() if self.objective else None, + trial=trial_component, # pylint: disable=possibly-used-before-assignment + tags=self.tags, + inputs=to_rest_dataset_literal_inputs(self.inputs, job_type=self.type), + outputs=to_rest_data_outputs(self.outputs), + identity=self.identity._to_job_rest_object() if self.identity else None, + queue_settings=self.queue_settings._to_rest_object() if self.queue_settings else None, + resources=( + self.resources._to_rest_object() if self.resources and not isinstance(self.resources, dict) else None + ), + ) + + if not sweep_job.resources and sweep_job.trial.resources: + sweep_job.resources = sweep_job.trial.resources + + sweep_job_resource = JobBase(properties=sweep_job) + sweep_job_resource.name = self.name + return sweep_job_resource + + def _to_component(self, context: Optional[Dict] = None, **kwargs: Any) -> NoReturn: + msg = "no sweep component entity" + raise JobException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.SWEEP_JOB, + error_category=ErrorCategory.USER_ERROR, + ) + + @classmethod + def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "SweepJob": + loaded_schema = load_from_dict(SweepJobSchema, data, context, additional_message, **kwargs) + loaded_schema["trial"] = ParameterizedCommand(**(loaded_schema["trial"])) + sweep_job = SweepJob(base_path=context[BASE_PATH_CONTEXT_KEY], **loaded_schema) + return sweep_job + + @classmethod + def _load_from_rest(cls, obj: JobBase) -> "SweepJob": + properties: RestSweepJob = obj.properties + + # Unpack termination schema + early_termination = EarlyTerminationPolicy._from_rest_object(properties.early_termination) + + # Unpack sampling algorithm + sampling_algorithm = SamplingAlgorithm._from_rest_object(properties.sampling_algorithm) + + trial = ParameterizedCommand._load_from_sweep_job(obj.properties) + # Compute also appears in both layers of the yaml, but only one of the REST. + # This should be a required field in one place, but cannot be if its optional in two + + _search_space = {} + for param, dist in properties.search_space.items(): + _search_space[param] = SweepDistribution._from_rest_object(dist) + + return SweepJob( + name=obj.name, + id=obj.id, + display_name=properties.display_name, + description=properties.description, + properties=properties.properties, + tags=properties.tags, + experiment_name=properties.experiment_name, + services=properties.services, + status=properties.status, + creation_context=SystemData._from_rest_object(obj.system_data) if obj.system_data else None, + trial=trial, # type: ignore[arg-type] + compute=properties.compute_id, + sampling_algorithm=sampling_algorithm, + search_space=_search_space, # type: ignore[arg-type] + limits=SweepJobLimits._from_rest_object(properties.limits), + early_termination=early_termination, + objective=Objective._from_rest_object(properties.objective) if properties.objective else None, + inputs=from_rest_inputs_to_dataset_literal(properties.inputs), + outputs=from_rest_data_outputs(properties.outputs), + identity=( + _BaseJobIdentityConfiguration._from_rest_object(properties.identity) if properties.identity else None + ), + queue_settings=properties.queue_settings, + resources=properties.resources if hasattr(properties, "resources") else None, + ) + + def _override_missing_properties_from_trial(self) -> None: + if not isinstance(self.trial, CommandJob): + return + + if not self.compute: + self.compute = self.trial.compute + if not self.inputs: + self.inputs = self.trial.inputs + if not self.outputs: + self.outputs = self.trial.outputs + + has_trial_limits_timeout = self.trial.limits and self.trial.limits.timeout + if has_trial_limits_timeout and not self.limits: + time_out = self.trial.limits.timeout if self.trial.limits is not None else None + self.limits = SweepJobLimits(trial_timeout=time_out) + elif has_trial_limits_timeout and self.limits is not None and not self.limits.trial_timeout: + self.limits.trial_timeout = self.trial.limits.timeout if self.trial.limits is not None else None |
