about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/_local_endpoints/docker_client.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/_local_endpoints/docker_client.py')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_local_endpoints/docker_client.py568
1 files changed, 568 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_local_endpoints/docker_client.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_local_endpoints/docker_client.py
new file mode 100644
index 00000000..beb9db71
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_local_endpoints/docker_client.py
@@ -0,0 +1,568 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+# pylint: disable=missing-client-constructor-parameter-credential,missing-client-constructor-parameter-kwargs
+# pylint: disable=client-accepts-api-version-keyword
+
+import json
+import logging
+import time
+from typing import Dict, List, Optional
+
+from azure.ai.ml._local_endpoints.local_endpoint_mode import LocalEndpointMode
+from azure.ai.ml._local_endpoints.vscode_debug.vscode_client import VSCodeClient
+from azure.ai.ml._utils._logger_utils import initialize_logger_info
+from azure.ai.ml._utils.utils import DockerProxy
+from azure.ai.ml.constants._endpoint import LocalEndpointConstants
+from azure.ai.ml.exceptions import (
+    DockerEngineNotAvailableError,
+    InvalidLocalEndpointError,
+    LocalEndpointImageBuildError,
+    LocalEndpointInFailedStateError,
+    LocalEndpointNotFoundError,
+    MultipleLocalDeploymentsFoundError,
+)
+
+docker = DockerProxy()
+module_logger = logging.getLogger(__name__)
+initialize_logger_info(module_logger, terminator="")
+
+DEFAULT_LABELS: Dict = {
+    LocalEndpointConstants.LABEL_KEY_AZUREML_LOCAL_ENDPOINT: "",
+    LocalEndpointConstants.LABEL_KEY_ENDPOINT_NAME: "",
+    LocalEndpointConstants.LABEL_KEY_DEPLOYMENT_NAME: "",
+    LocalEndpointConstants.LABEL_KEY_ENDPOINT_JSON: "",
+    LocalEndpointConstants.LABEL_KEY_DEPLOYMENT_JSON: "",
+    LocalEndpointConstants.LABEL_KEY_AZUREML_PORT: "",
+}
+
+
+class DockerClient(object):
+    """Client for interacting with User's Docker environment for local
+    endpoints."""
+
+    # pylint: disable=client-method-missing-type-annotations
+
+    def __init__(
+        self,
+        client: Optional["docker.DockerClient"] = None,  # type: ignore[name-defined]
+        vscode_client: Optional[VSCodeClient] = None,
+    ):
+        self._lazy_client = client
+        self._vscode_client = vscode_client if vscode_client else VSCodeClient()
+
+    @property
+    def _client(self) -> "docker.DockerClient":  # type: ignore[name-defined]
+        """Lazy initializer for docker-py client.
+
+        :return: docker.client.DockerClient
+        :raises: azure.ai.ml._local_endpoints.errors.DockerEngineNotAvailableError
+        """
+        if self._lazy_client is None:
+            try:
+                self._lazy_client = docker.from_env()
+            except docker.errors.DockerException as e:
+                if "Error while fetching server API version" in str(e):
+                    raise DockerEngineNotAvailableError() from e
+                raise
+        return self._lazy_client
+
+    def create_endpoint(
+        self,
+        endpoint_name: str,
+        endpoint_metadata: str,
+        build_directory: str,
+        image_name: str,
+        dockerfile_path: str,
+    ) -> None:
+        try:
+            self._client.images.build(path=build_directory, tag=image_name, dockerfile=dockerfile_path)
+        except docker.errors.BuildError:
+            pass
+        self.delete(endpoint_name=endpoint_name, verify_exists=False)
+
+        labels = DEFAULT_LABELS.copy()
+        labels[LocalEndpointConstants.LABEL_KEY_ENDPOINT_NAME] = endpoint_name
+        labels[LocalEndpointConstants.LABEL_KEY_ENDPOINT_JSON] = endpoint_metadata
+        container_name = _get_container_name(endpoint_name)
+        self._client.containers.run(
+            image_name,
+            name=container_name,
+            labels=labels,
+            detach=True,
+            tty=True,
+            publish_all_ports=True,
+        )
+
+    # pylint: disable=client-method-has-more-than-5-positional-arguments
+
+    def create_deployment(
+        self,
+        endpoint_name: str,
+        deployment_name: str,
+        endpoint_metadata: str,
+        deployment_metadata: str,
+        build_directory: str,
+        dockerfile_path: str,
+        conda_source_path: str,
+        conda_yaml_contents: str,
+        volumes: dict,
+        environment: dict,
+        azureml_port: int,
+        local_endpoint_mode: LocalEndpointMode,
+        prebuilt_image_name: Optional[str] = None,
+        local_enable_gpu: Optional[bool] = False,
+    ) -> None:
+        """Builds and runs an image from provided image context.
+
+        :param endpoint_name: name of local endpoint
+        :type endpoint_name: str
+        :param deployment_name: name of local deployment
+        :type deployment_name: str
+        :param endpoint_metadata: Endpoint entity information serialized.
+        :type endpoint_metadata: str
+        :param deployment_metadata: Deployment entity information serialized.
+        :type deployment_metadata: str
+        :param build_directory: directory on user's local system to write conda file
+        :type build_directory: str
+        :param dockerfile_path: directory on user's local system to write Dockerfile
+        :type dockerfile_path: str
+        :param conda_source_path: source of conda file (either path on user's local machine or environment ID)
+        :type conda_source_path: str
+        :param conda_yaml_contents: contents of user's conda file for docker build
+        :type conda_yaml_contents: str
+        :param volumes: dictionary of volumes to mount to docker container
+        :type volumes: dict
+        :param environment: dictionary of docker environment variables to set in container
+        :type environment: dict
+        :param azureml_port: Port exposed in Docker image for AzureML service.
+        :type azureml_port: int
+        :param local_endpoint_mode: Mode for how to create the local user container.
+        :type local_endpoint_mode: LocalEndpointMode
+        :param prebuilt_image_name: Name of pre-built image from customer if using BYOC flow.
+        :type prebuilt_image_name: str
+        :param local_enable_gpu: enable local container to access gpu
+        :type local_enable_gpu: bool
+        """
+        # Prepare image
+        if prebuilt_image_name is None:
+            image_name = _get_image_name(endpoint_name, deployment_name)
+            module_logger.debug("Building local image '%s'\n", image_name)
+            module_logger.debug("Build directory: '%s'\n", build_directory)
+            module_logger.debug("Dockerfile path: '%s'\n", dockerfile_path)
+            module_logger.debug("Image '%s' is built.", image_name)
+            self._build_image(
+                build_directory=build_directory,
+                image_name=image_name,
+                dockerfile_path=dockerfile_path,
+                conda_source_path=conda_source_path,
+                conda_yaml_contents=conda_yaml_contents,
+            )
+        else:
+            image_name = prebuilt_image_name
+            try:
+                self._client.images.get(image_name)
+            except docker.errors.ImageNotFound:
+                module_logger.info("\nDid not find image '%s' locally. Pulling from registry.\n", image_name)
+                try:
+                    self._client.images.pull(image_name)
+                except docker.errors.NotFound as e:
+                    raise InvalidLocalEndpointError(
+                        message=(
+                            f"Could not find image '{image_name}' locally or in registry. "
+                            "Please check your image name."
+                        ),
+                        no_personal_data_message=(
+                            "Could not find image locally or in registry. Please check your image name."
+                        ),
+                    ) from e
+
+        module_logger.info("\nStarting up endpoint")
+        # Delete container if exists
+        self.delete(endpoint_name=endpoint_name, verify_exists=False)
+
+        labels = get_container_labels(
+            endpoint_name=endpoint_name,
+            deployment_name=deployment_name,
+            endpoint_metadata=endpoint_metadata,  # type: ignore[arg-type]
+            deployment_metadata=deployment_metadata,  # type: ignore[arg-type]
+            azureml_port=azureml_port,
+        )
+        module_logger.debug("Setting labels: '%s'\n", labels)
+        module_logger.debug("Mounting volumes: '%s'\n", volumes)
+        module_logger.debug("Setting environment variables: '%s'\n", environment)
+        container_name = _get_container_name(endpoint_name, deployment_name)
+        device_requests = [docker.types.DeviceRequest(count=-1, capabilities=[["gpu"]])] if local_enable_gpu else None
+        container = self._client.containers.create(
+            image_name,
+            name=container_name,
+            labels=labels,
+            volumes=self._reformat_volumes(volumes),
+            environment=environment,
+            detach=True,
+            tty=True,
+            publish_all_ports=True,
+            device_requests=device_requests,
+        )
+        if local_endpoint_mode == LocalEndpointMode.VSCodeDevContainer:
+            try:
+                devcontainer_path = self._vscode_client.create_dev_container_json(
+                    azureml_container=container,
+                    endpoint_name=endpoint_name,
+                    deployment_name=deployment_name,
+                    build_directory=build_directory,
+                    image_name=image_name,
+                    environment=environment,
+                    volumes=volumes,  # type: ignore[arg-type]
+                    labels=labels,
+                )
+            finally:
+                # This pre-created container is only used for retrieving the entry script
+                # to add debugpy statements
+                container.remove()
+            app_path = environment[LocalEndpointConstants.ENVVAR_KEY_AML_APP_ROOT]
+            self._vscode_client.invoke_dev_container(devcontainer_path=devcontainer_path, app_path=app_path)
+            time.sleep(LocalEndpointConstants.DEFAULT_STARTUP_WAIT_TIME_SECONDS)
+        else:
+            container.start()
+            time.sleep(LocalEndpointConstants.DEFAULT_STARTUP_WAIT_TIME_SECONDS)
+            container.reload()
+            _validate_container_state(
+                endpoint_name=endpoint_name,
+                deployment_name=deployment_name,
+                container=container,
+            )
+            scoring_uri = self.get_scoring_uri(endpoint_name=endpoint_name, deployment_name=deployment_name)
+            module_logger.debug("Container '%s' is up and running at '%s'\n", container_name, scoring_uri)
+
+    def delete(
+        self,
+        endpoint_name: str,
+        deployment_name: Optional[str] = None,
+        verify_exists: bool = True,
+    ) -> None:
+        """Deletes local endpoint / deployment.
+
+        :param endpoint_name: name of local endpoint
+        :type endpoint_name: str
+        :param deployment_name: name of local deployment
+        :type deployment_name: (str, optional)
+        :param verify_exists: Verify that the endpoint exists on deletion. Default: True
+        :type verify_exists: (bool, optional)
+        :raises: azure.ai.ml._local_endpoints.errors.LocalEndpointNotFoundError
+        """
+        containers = self.list_containers(endpoint_name=endpoint_name, deployment_name=deployment_name)
+        if verify_exists and len(containers) == 0:
+            raise LocalEndpointNotFoundError(endpoint_name=endpoint_name, deployment_name=deployment_name)
+
+        for container in containers:
+            container.stop()
+            container.remove()
+            module_logger.debug("Endpoint container '%s' is removed.", container.name)
+
+    def get_endpoint(self, endpoint_name: str) -> Optional[dict]:
+        """Returns metadata for local endpoint or deployment.
+
+        :param endpoint_name: name of local endpoint
+        :type endpoint_name: str
+        :returns: JSON dict representing user provided endpoint input
+        :rtype: dict
+        """
+        container = self.get_endpoint_container(endpoint_name=endpoint_name)
+        if container is None:
+            raise LocalEndpointNotFoundError(endpoint_name=endpoint_name)
+        return get_endpoint_json_from_container(container=container)
+
+    def get_deployment(self, endpoint_name: str, deployment_name: Optional[str] = None) -> Optional[dict]:
+        """Returns metadata for local deployment.
+
+        :param endpoint_name: name of local endpoint
+        :type endpoint_name: str
+        :param deployment_name: name of local deployment
+        :type deployment_name: (str, optional)
+        :return: JSON dict representing user provided endpoint input
+        :rtype: dict
+        """
+        container = self.get_endpoint_container(endpoint_name=endpoint_name, deployment_name=deployment_name)
+        if container is None:
+            raise LocalEndpointNotFoundError(endpoint_name=endpoint_name, deployment_name=deployment_name)
+        return get_deployment_json_from_container(container=container)
+
+    def get_scoring_uri(self, endpoint_name: str, deployment_name: Optional[str] = None) -> Optional[str]:
+        """Returns scoring uri for local endpoint or deployment.
+
+        :param endpoint_name: name of local endpoint
+        :type endpoint_name: str
+        :param deployment_name: name of local deployment
+        :type deployment_name: (str, optional)
+        :raises: azure.ai.ml._local_endpoints.errors.LocalEndpointNotFoundError
+        :raises: azure.ai.ml._local_endpoints.errors.MultipleLocalDeploymentsFoundError
+        """
+        container = self.get_endpoint_container(
+            endpoint_name=endpoint_name,
+            deployment_name=deployment_name,
+            verify_single_deployment=True,
+        )
+        if container is None:
+            return None
+        _validate_container_state(
+            endpoint_name=endpoint_name,
+            deployment_name=str(deployment_name),
+            container=container,
+        )
+        return get_scoring_uri_from_container(container=container)
+
+    def logs(self, endpoint_name: str, deployment_name: str, lines: int) -> str:
+        """Returns logs from local deployment.
+
+        :param endpoint_name: name of local endpoint
+        :type endpoint_name: str
+        :param deployment_name: name of local deployment
+        :type deployment_name: str
+        :param lines: number of lines to retrieve from container logs
+        :type lines: int
+        :return: Deployment logs
+        :rtype: str
+        :raises: azure.ai.ml._local_endpoints.errors.LocalEndpointNotFoundError
+        """
+        container = self.get_endpoint_container(endpoint_name, deployment_name=deployment_name)
+        if container is None:
+            raise LocalEndpointNotFoundError(endpoint_name=endpoint_name, deployment_name=deployment_name)
+        return container.logs(tail=int(lines)).decode()
+
+    def list_containers(
+        self,
+        endpoint_name: Optional[str] = None,
+        deployment_name: Optional[str] = None,
+        include_stopped: bool = True,
+    ) -> list:
+        """Returns a list of local endpoints.
+
+        :param endpoint_name: Name of local endpoint. If none, all local endpoints will be returned.
+        :type endpoint_name: (str, optional)
+        :param deployment_name: Name of local deployment. If none, all deployments under endpoint will be returned.
+        :type deployment_name: (str, optional)
+        :param include_stopped: Include stopped containers. Default: True.
+        :type include_stopped: (str, optional)
+        :return: array of Container objects from docker-py library
+        :rtype: List[docker.models.containers.Container]
+        """
+        filters = {"label": [f"{LocalEndpointConstants.LABEL_KEY_AZUREML_LOCAL_ENDPOINT}"]}
+        if endpoint_name:
+            filters["label"].append(f"{LocalEndpointConstants.LABEL_KEY_ENDPOINT_NAME}={endpoint_name}")
+        if deployment_name:
+            filters["label"].append(f"{LocalEndpointConstants.LABEL_KEY_DEPLOYMENT_NAME}={deployment_name}")
+
+        return self._client.containers.list(filters=filters, all=include_stopped)
+
+    def get_endpoint_container(
+        self,
+        endpoint_name: str,
+        deployment_name: Optional[str] = None,
+        verify_single_deployment: bool = False,
+        include_stopped: bool = True,
+    ) -> "docker.models.containers.Container":  # type: ignore[name-defined]
+        """Builds and runs an image from provided image context.
+
+        :param endpoint_name: name of local endpoint
+        :type endpoint_name: str
+        :param deployment_name: name of local deployment
+        :type deployment_name: (str, optional)
+        :param verify_single_deployment: Fail if more than one deployment container exists
+        :type verify_single_deployment: (bool, optional)
+        :param include_stopped: Include container even if it's stopped. Default: True.
+        :type include_stopped: (bool, optional)
+        :returns: The docker container
+        :rtype: docker.models.containers.Container
+        """
+        containers = self.list_containers(
+            endpoint_name=endpoint_name,
+            deployment_name=deployment_name,
+            include_stopped=include_stopped,
+        )
+        if len(containers) == 0:
+            return None
+        if len(containers) > 1 and verify_single_deployment:
+            raise MultipleLocalDeploymentsFoundError(endpoint_name=endpoint_name)
+        return containers[0]
+
+    def _build_image(
+        self,
+        build_directory: str,
+        image_name: str,
+        dockerfile_path: str,
+        conda_source_path: str,  # pylint: disable=unused-argument
+        conda_yaml_contents: str,  # pylint: disable=unused-argument
+    ) -> None:
+        try:
+            module_logger.info("\nBuilding Docker image from Dockerfile")
+            first_line = True
+            for status in self._client.api.build(
+                path=build_directory,
+                tag=image_name,
+                dockerfile=dockerfile_path,
+                pull=True,
+                decode=True,
+                quiet=False,
+            ):
+                if first_line:
+                    module_logger.info("\n")
+                    first_line = False
+                if "stream" in status:
+                    if "An unexpected error has occurred. Conda has prepared the above report." in status["stream"]:
+                        raise LocalEndpointImageBuildError(status["stream"])
+                    module_logger.info(status["stream"])
+
+                if "error" in status:
+                    module_logger.info(status["error"])
+                    raise LocalEndpointImageBuildError(status["error"])
+        except docker.errors.APIError as e:
+            raise LocalEndpointImageBuildError(e) from e
+        except Exception as e:
+            if isinstance(e, LocalEndpointImageBuildError):
+                raise
+            raise LocalEndpointImageBuildError(e) from e
+
+    def _reformat_volumes(self, volumes_dict: Dict[str, Dict[str, Dict[str, str]]]) -> List[str]:
+        """Returns a list of volumes to pass to docker.
+
+        :param volumes_dict: custom formatted dict of volumes to mount. We expect the keys to be unique. Example:
+            .. code-block:: python
+
+                {
+                    "codesrc:codedest": {
+                        "codesrc": {
+                            "bind": "codedest"
+                        }
+                    },
+                    "modelsrc:modeldest": {
+                        "modelsrc": {
+                            "bind": "modeldest"
+                        }
+                    }
+                }
+
+        :type volumes_dict: str
+        :return: list of volumes to pass to docker. Example:
+            .. code-block:: python
+
+                ["codesrc:codedest", "modelsrc:modeldest"]
+        :rtype: List[str]
+        """
+        return list(volumes_dict.keys())
+
+
+def get_container_labels(
+    endpoint_name: str,
+    deployment_name: str,
+    endpoint_metadata: dict,
+    deployment_metadata: dict,
+    azureml_port: int,
+) -> dict:
+    labels = DEFAULT_LABELS.copy()
+    labels[LocalEndpointConstants.LABEL_KEY_ENDPOINT_NAME] = endpoint_name
+    labels[LocalEndpointConstants.LABEL_KEY_DEPLOYMENT_NAME] = deployment_name
+    labels[LocalEndpointConstants.LABEL_KEY_ENDPOINT_JSON] = endpoint_metadata
+    labels[LocalEndpointConstants.LABEL_KEY_DEPLOYMENT_JSON] = deployment_metadata
+    labels[LocalEndpointConstants.LABEL_KEY_AZUREML_PORT] = str(azureml_port)
+    return labels
+
+
+def get_endpoint_json_from_container(
+    container: "docker.models.containers.Container",  # type: ignore[name-defined]
+) -> Optional[dict]:
+    if container:
+        data = container.labels[LocalEndpointConstants.LABEL_KEY_ENDPOINT_JSON]
+        return json.loads(data)
+    return None
+
+
+def get_deployment_json_from_container(
+    container: "docker.models.containers.Container",  # type: ignore[name-defined]
+) -> Optional[dict]:
+    if container:
+        data = container.labels[LocalEndpointConstants.LABEL_KEY_DEPLOYMENT_JSON]
+        return json.loads(data)
+    return None
+
+
+def get_status_from_container(container: "docker.models.containers.Container") -> str:  # type: ignore[name-defined]
+    """Returns status of container.
+
+    :param container: container of local Deployment
+    :type container: docker.models.containers.Container
+    :return: container status
+    :rtype: str
+    """
+    return container.status
+
+
+def get_scoring_uri_from_container(
+    container: "docker.models.containers.Container",  # type: ignore[name-defined]
+) -> str:
+    """Returns scoring_uri of container.
+
+    :param container: container of local Deployment
+    :type container: docker.models.containers.Container
+    :return: container scoring_uri
+    :rtype: str
+    """
+    port = 5001
+    # Example container.ports: {'5001/tcp': [{'HostIp': '0.0.0.0', 'HostPort': '5001'}],
+    # '8883/tcp': None, '8888/tcp': None }
+    if container is not None and container.ports is not None:
+        azureml_port = container.labels["azureml-port"]
+        for docker_port, host_addresses in container.ports.items():
+            if azureml_port in docker_port and host_addresses is not None:
+                for address in host_addresses:
+                    if "HostPort" in address:
+                        port = address["HostPort"]
+                        break
+    # TODO: resolve scoring path correctly
+    return f"http://localhost:{port}/score"
+
+
+def _get_image_name(endpoint_name: str, deployment_name: str) -> str:
+    """Returns an image name.
+
+    :param endpoint_name: name of local endpoint
+    :type endpoint_name: str
+    :param deployment_name: name of local deployment
+    :type deployment_name: str
+    :return: image name
+    :rtype: str
+    """
+    return f"{endpoint_name}:{deployment_name}"
+
+
+def _get_container_name(endpoint_name: str, deployment_name: Optional[str] = None) -> str:
+    """Returns a container name.
+
+    :param endpoint_name: name of local endpoint
+    :type endpoint_name: str
+    :param deployment_name: name of local deployment
+    :type deployment_name: str
+    :return: container name
+    :rtype: str
+    """
+    return f"{endpoint_name}.{deployment_name}" if deployment_name else endpoint_name
+
+
+def _validate_container_state(
+    endpoint_name: str,
+    deployment_name: str,
+    container: "docker.models.containers.Container",  # type: ignore[name-defined]
+):
+    """Returns a container name.
+
+    :param endpoint_name: name of local endpoint
+    :type endpoint_name: str
+    :param deployment_name: name of local deployment
+    :type deployment_name: str
+    :param container: container of local Deployment
+    :type container: docker.models.containers.Container
+    :raises: azure.ai.ml._local_endpoints.errors.LocalEndpointInFailedStateError
+    """
+    status = get_status_from_container(container=container)
+    if LocalEndpointConstants.CONTAINER_EXITED == status:
+        raise LocalEndpointInFailedStateError(endpoint_name=endpoint_name, deployment_name=deployment_name)