aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/_local_endpoints/docker_client.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/_local_endpoints/docker_client.py')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_local_endpoints/docker_client.py568
1 files changed, 568 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_local_endpoints/docker_client.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_local_endpoints/docker_client.py
new file mode 100644
index 00000000..beb9db71
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_local_endpoints/docker_client.py
@@ -0,0 +1,568 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+# pylint: disable=missing-client-constructor-parameter-credential,missing-client-constructor-parameter-kwargs
+# pylint: disable=client-accepts-api-version-keyword
+
+import json
+import logging
+import time
+from typing import Dict, List, Optional
+
+from azure.ai.ml._local_endpoints.local_endpoint_mode import LocalEndpointMode
+from azure.ai.ml._local_endpoints.vscode_debug.vscode_client import VSCodeClient
+from azure.ai.ml._utils._logger_utils import initialize_logger_info
+from azure.ai.ml._utils.utils import DockerProxy
+from azure.ai.ml.constants._endpoint import LocalEndpointConstants
+from azure.ai.ml.exceptions import (
+ DockerEngineNotAvailableError,
+ InvalidLocalEndpointError,
+ LocalEndpointImageBuildError,
+ LocalEndpointInFailedStateError,
+ LocalEndpointNotFoundError,
+ MultipleLocalDeploymentsFoundError,
+)
+
+docker = DockerProxy()
+module_logger = logging.getLogger(__name__)
+initialize_logger_info(module_logger, terminator="")
+
+DEFAULT_LABELS: Dict = {
+ LocalEndpointConstants.LABEL_KEY_AZUREML_LOCAL_ENDPOINT: "",
+ LocalEndpointConstants.LABEL_KEY_ENDPOINT_NAME: "",
+ LocalEndpointConstants.LABEL_KEY_DEPLOYMENT_NAME: "",
+ LocalEndpointConstants.LABEL_KEY_ENDPOINT_JSON: "",
+ LocalEndpointConstants.LABEL_KEY_DEPLOYMENT_JSON: "",
+ LocalEndpointConstants.LABEL_KEY_AZUREML_PORT: "",
+}
+
+
+class DockerClient(object):
+ """Client for interacting with User's Docker environment for local
+ endpoints."""
+
+ # pylint: disable=client-method-missing-type-annotations
+
+ def __init__(
+ self,
+ client: Optional["docker.DockerClient"] = None, # type: ignore[name-defined]
+ vscode_client: Optional[VSCodeClient] = None,
+ ):
+ self._lazy_client = client
+ self._vscode_client = vscode_client if vscode_client else VSCodeClient()
+
+ @property
+ def _client(self) -> "docker.DockerClient": # type: ignore[name-defined]
+ """Lazy initializer for docker-py client.
+
+ :return: docker.client.DockerClient
+ :raises: azure.ai.ml._local_endpoints.errors.DockerEngineNotAvailableError
+ """
+ if self._lazy_client is None:
+ try:
+ self._lazy_client = docker.from_env()
+ except docker.errors.DockerException as e:
+ if "Error while fetching server API version" in str(e):
+ raise DockerEngineNotAvailableError() from e
+ raise
+ return self._lazy_client
+
+ def create_endpoint(
+ self,
+ endpoint_name: str,
+ endpoint_metadata: str,
+ build_directory: str,
+ image_name: str,
+ dockerfile_path: str,
+ ) -> None:
+ try:
+ self._client.images.build(path=build_directory, tag=image_name, dockerfile=dockerfile_path)
+ except docker.errors.BuildError:
+ pass
+ self.delete(endpoint_name=endpoint_name, verify_exists=False)
+
+ labels = DEFAULT_LABELS.copy()
+ labels[LocalEndpointConstants.LABEL_KEY_ENDPOINT_NAME] = endpoint_name
+ labels[LocalEndpointConstants.LABEL_KEY_ENDPOINT_JSON] = endpoint_metadata
+ container_name = _get_container_name(endpoint_name)
+ self._client.containers.run(
+ image_name,
+ name=container_name,
+ labels=labels,
+ detach=True,
+ tty=True,
+ publish_all_ports=True,
+ )
+
+ # pylint: disable=client-method-has-more-than-5-positional-arguments
+
+ def create_deployment(
+ self,
+ endpoint_name: str,
+ deployment_name: str,
+ endpoint_metadata: str,
+ deployment_metadata: str,
+ build_directory: str,
+ dockerfile_path: str,
+ conda_source_path: str,
+ conda_yaml_contents: str,
+ volumes: dict,
+ environment: dict,
+ azureml_port: int,
+ local_endpoint_mode: LocalEndpointMode,
+ prebuilt_image_name: Optional[str] = None,
+ local_enable_gpu: Optional[bool] = False,
+ ) -> None:
+ """Builds and runs an image from provided image context.
+
+ :param endpoint_name: name of local endpoint
+ :type endpoint_name: str
+ :param deployment_name: name of local deployment
+ :type deployment_name: str
+ :param endpoint_metadata: Endpoint entity information serialized.
+ :type endpoint_metadata: str
+ :param deployment_metadata: Deployment entity information serialized.
+ :type deployment_metadata: str
+ :param build_directory: directory on user's local system to write conda file
+ :type build_directory: str
+ :param dockerfile_path: directory on user's local system to write Dockerfile
+ :type dockerfile_path: str
+ :param conda_source_path: source of conda file (either path on user's local machine or environment ID)
+ :type conda_source_path: str
+ :param conda_yaml_contents: contents of user's conda file for docker build
+ :type conda_yaml_contents: str
+ :param volumes: dictionary of volumes to mount to docker container
+ :type volumes: dict
+ :param environment: dictionary of docker environment variables to set in container
+ :type environment: dict
+ :param azureml_port: Port exposed in Docker image for AzureML service.
+ :type azureml_port: int
+ :param local_endpoint_mode: Mode for how to create the local user container.
+ :type local_endpoint_mode: LocalEndpointMode
+ :param prebuilt_image_name: Name of pre-built image from customer if using BYOC flow.
+ :type prebuilt_image_name: str
+ :param local_enable_gpu: enable local container to access gpu
+ :type local_enable_gpu: bool
+ """
+ # Prepare image
+ if prebuilt_image_name is None:
+ image_name = _get_image_name(endpoint_name, deployment_name)
+ module_logger.debug("Building local image '%s'\n", image_name)
+ module_logger.debug("Build directory: '%s'\n", build_directory)
+ module_logger.debug("Dockerfile path: '%s'\n", dockerfile_path)
+ module_logger.debug("Image '%s' is built.", image_name)
+ self._build_image(
+ build_directory=build_directory,
+ image_name=image_name,
+ dockerfile_path=dockerfile_path,
+ conda_source_path=conda_source_path,
+ conda_yaml_contents=conda_yaml_contents,
+ )
+ else:
+ image_name = prebuilt_image_name
+ try:
+ self._client.images.get(image_name)
+ except docker.errors.ImageNotFound:
+ module_logger.info("\nDid not find image '%s' locally. Pulling from registry.\n", image_name)
+ try:
+ self._client.images.pull(image_name)
+ except docker.errors.NotFound as e:
+ raise InvalidLocalEndpointError(
+ message=(
+ f"Could not find image '{image_name}' locally or in registry. "
+ "Please check your image name."
+ ),
+ no_personal_data_message=(
+ "Could not find image locally or in registry. Please check your image name."
+ ),
+ ) from e
+
+ module_logger.info("\nStarting up endpoint")
+ # Delete container if exists
+ self.delete(endpoint_name=endpoint_name, verify_exists=False)
+
+ labels = get_container_labels(
+ endpoint_name=endpoint_name,
+ deployment_name=deployment_name,
+ endpoint_metadata=endpoint_metadata, # type: ignore[arg-type]
+ deployment_metadata=deployment_metadata, # type: ignore[arg-type]
+ azureml_port=azureml_port,
+ )
+ module_logger.debug("Setting labels: '%s'\n", labels)
+ module_logger.debug("Mounting volumes: '%s'\n", volumes)
+ module_logger.debug("Setting environment variables: '%s'\n", environment)
+ container_name = _get_container_name(endpoint_name, deployment_name)
+ device_requests = [docker.types.DeviceRequest(count=-1, capabilities=[["gpu"]])] if local_enable_gpu else None
+ container = self._client.containers.create(
+ image_name,
+ name=container_name,
+ labels=labels,
+ volumes=self._reformat_volumes(volumes),
+ environment=environment,
+ detach=True,
+ tty=True,
+ publish_all_ports=True,
+ device_requests=device_requests,
+ )
+ if local_endpoint_mode == LocalEndpointMode.VSCodeDevContainer:
+ try:
+ devcontainer_path = self._vscode_client.create_dev_container_json(
+ azureml_container=container,
+ endpoint_name=endpoint_name,
+ deployment_name=deployment_name,
+ build_directory=build_directory,
+ image_name=image_name,
+ environment=environment,
+ volumes=volumes, # type: ignore[arg-type]
+ labels=labels,
+ )
+ finally:
+ # This pre-created container is only used for retrieving the entry script
+ # to add debugpy statements
+ container.remove()
+ app_path = environment[LocalEndpointConstants.ENVVAR_KEY_AML_APP_ROOT]
+ self._vscode_client.invoke_dev_container(devcontainer_path=devcontainer_path, app_path=app_path)
+ time.sleep(LocalEndpointConstants.DEFAULT_STARTUP_WAIT_TIME_SECONDS)
+ else:
+ container.start()
+ time.sleep(LocalEndpointConstants.DEFAULT_STARTUP_WAIT_TIME_SECONDS)
+ container.reload()
+ _validate_container_state(
+ endpoint_name=endpoint_name,
+ deployment_name=deployment_name,
+ container=container,
+ )
+ scoring_uri = self.get_scoring_uri(endpoint_name=endpoint_name, deployment_name=deployment_name)
+ module_logger.debug("Container '%s' is up and running at '%s'\n", container_name, scoring_uri)
+
+ def delete(
+ self,
+ endpoint_name: str,
+ deployment_name: Optional[str] = None,
+ verify_exists: bool = True,
+ ) -> None:
+ """Deletes local endpoint / deployment.
+
+ :param endpoint_name: name of local endpoint
+ :type endpoint_name: str
+ :param deployment_name: name of local deployment
+ :type deployment_name: (str, optional)
+ :param verify_exists: Verify that the endpoint exists on deletion. Default: True
+ :type verify_exists: (bool, optional)
+ :raises: azure.ai.ml._local_endpoints.errors.LocalEndpointNotFoundError
+ """
+ containers = self.list_containers(endpoint_name=endpoint_name, deployment_name=deployment_name)
+ if verify_exists and len(containers) == 0:
+ raise LocalEndpointNotFoundError(endpoint_name=endpoint_name, deployment_name=deployment_name)
+
+ for container in containers:
+ container.stop()
+ container.remove()
+ module_logger.debug("Endpoint container '%s' is removed.", container.name)
+
+ def get_endpoint(self, endpoint_name: str) -> Optional[dict]:
+ """Returns metadata for local endpoint or deployment.
+
+ :param endpoint_name: name of local endpoint
+ :type endpoint_name: str
+ :returns: JSON dict representing user provided endpoint input
+ :rtype: dict
+ """
+ container = self.get_endpoint_container(endpoint_name=endpoint_name)
+ if container is None:
+ raise LocalEndpointNotFoundError(endpoint_name=endpoint_name)
+ return get_endpoint_json_from_container(container=container)
+
+ def get_deployment(self, endpoint_name: str, deployment_name: Optional[str] = None) -> Optional[dict]:
+ """Returns metadata for local deployment.
+
+ :param endpoint_name: name of local endpoint
+ :type endpoint_name: str
+ :param deployment_name: name of local deployment
+ :type deployment_name: (str, optional)
+ :return: JSON dict representing user provided endpoint input
+ :rtype: dict
+ """
+ container = self.get_endpoint_container(endpoint_name=endpoint_name, deployment_name=deployment_name)
+ if container is None:
+ raise LocalEndpointNotFoundError(endpoint_name=endpoint_name, deployment_name=deployment_name)
+ return get_deployment_json_from_container(container=container)
+
+ def get_scoring_uri(self, endpoint_name: str, deployment_name: Optional[str] = None) -> Optional[str]:
+ """Returns scoring uri for local endpoint or deployment.
+
+ :param endpoint_name: name of local endpoint
+ :type endpoint_name: str
+ :param deployment_name: name of local deployment
+ :type deployment_name: (str, optional)
+ :raises: azure.ai.ml._local_endpoints.errors.LocalEndpointNotFoundError
+ :raises: azure.ai.ml._local_endpoints.errors.MultipleLocalDeploymentsFoundError
+ """
+ container = self.get_endpoint_container(
+ endpoint_name=endpoint_name,
+ deployment_name=deployment_name,
+ verify_single_deployment=True,
+ )
+ if container is None:
+ return None
+ _validate_container_state(
+ endpoint_name=endpoint_name,
+ deployment_name=str(deployment_name),
+ container=container,
+ )
+ return get_scoring_uri_from_container(container=container)
+
+ def logs(self, endpoint_name: str, deployment_name: str, lines: int) -> str:
+ """Returns logs from local deployment.
+
+ :param endpoint_name: name of local endpoint
+ :type endpoint_name: str
+ :param deployment_name: name of local deployment
+ :type deployment_name: str
+ :param lines: number of lines to retrieve from container logs
+ :type lines: int
+ :return: Deployment logs
+ :rtype: str
+ :raises: azure.ai.ml._local_endpoints.errors.LocalEndpointNotFoundError
+ """
+ container = self.get_endpoint_container(endpoint_name, deployment_name=deployment_name)
+ if container is None:
+ raise LocalEndpointNotFoundError(endpoint_name=endpoint_name, deployment_name=deployment_name)
+ return container.logs(tail=int(lines)).decode()
+
+ def list_containers(
+ self,
+ endpoint_name: Optional[str] = None,
+ deployment_name: Optional[str] = None,
+ include_stopped: bool = True,
+ ) -> list:
+ """Returns a list of local endpoints.
+
+ :param endpoint_name: Name of local endpoint. If none, all local endpoints will be returned.
+ :type endpoint_name: (str, optional)
+ :param deployment_name: Name of local deployment. If none, all deployments under endpoint will be returned.
+ :type deployment_name: (str, optional)
+ :param include_stopped: Include stopped containers. Default: True.
+ :type include_stopped: (str, optional)
+ :return: array of Container objects from docker-py library
+ :rtype: List[docker.models.containers.Container]
+ """
+ filters = {"label": [f"{LocalEndpointConstants.LABEL_KEY_AZUREML_LOCAL_ENDPOINT}"]}
+ if endpoint_name:
+ filters["label"].append(f"{LocalEndpointConstants.LABEL_KEY_ENDPOINT_NAME}={endpoint_name}")
+ if deployment_name:
+ filters["label"].append(f"{LocalEndpointConstants.LABEL_KEY_DEPLOYMENT_NAME}={deployment_name}")
+
+ return self._client.containers.list(filters=filters, all=include_stopped)
+
+ def get_endpoint_container(
+ self,
+ endpoint_name: str,
+ deployment_name: Optional[str] = None,
+ verify_single_deployment: bool = False,
+ include_stopped: bool = True,
+ ) -> "docker.models.containers.Container": # type: ignore[name-defined]
+ """Builds and runs an image from provided image context.
+
+ :param endpoint_name: name of local endpoint
+ :type endpoint_name: str
+ :param deployment_name: name of local deployment
+ :type deployment_name: (str, optional)
+ :param verify_single_deployment: Fail if more than one deployment container exists
+ :type verify_single_deployment: (bool, optional)
+ :param include_stopped: Include container even if it's stopped. Default: True.
+ :type include_stopped: (bool, optional)
+ :returns: The docker container
+ :rtype: docker.models.containers.Container
+ """
+ containers = self.list_containers(
+ endpoint_name=endpoint_name,
+ deployment_name=deployment_name,
+ include_stopped=include_stopped,
+ )
+ if len(containers) == 0:
+ return None
+ if len(containers) > 1 and verify_single_deployment:
+ raise MultipleLocalDeploymentsFoundError(endpoint_name=endpoint_name)
+ return containers[0]
+
+ def _build_image(
+ self,
+ build_directory: str,
+ image_name: str,
+ dockerfile_path: str,
+ conda_source_path: str, # pylint: disable=unused-argument
+ conda_yaml_contents: str, # pylint: disable=unused-argument
+ ) -> None:
+ try:
+ module_logger.info("\nBuilding Docker image from Dockerfile")
+ first_line = True
+ for status in self._client.api.build(
+ path=build_directory,
+ tag=image_name,
+ dockerfile=dockerfile_path,
+ pull=True,
+ decode=True,
+ quiet=False,
+ ):
+ if first_line:
+ module_logger.info("\n")
+ first_line = False
+ if "stream" in status:
+ if "An unexpected error has occurred. Conda has prepared the above report." in status["stream"]:
+ raise LocalEndpointImageBuildError(status["stream"])
+ module_logger.info(status["stream"])
+
+ if "error" in status:
+ module_logger.info(status["error"])
+ raise LocalEndpointImageBuildError(status["error"])
+ except docker.errors.APIError as e:
+ raise LocalEndpointImageBuildError(e) from e
+ except Exception as e:
+ if isinstance(e, LocalEndpointImageBuildError):
+ raise
+ raise LocalEndpointImageBuildError(e) from e
+
+ def _reformat_volumes(self, volumes_dict: Dict[str, Dict[str, Dict[str, str]]]) -> List[str]:
+ """Returns a list of volumes to pass to docker.
+
+ :param volumes_dict: custom formatted dict of volumes to mount. We expect the keys to be unique. Example:
+ .. code-block:: python
+
+ {
+ "codesrc:codedest": {
+ "codesrc": {
+ "bind": "codedest"
+ }
+ },
+ "modelsrc:modeldest": {
+ "modelsrc": {
+ "bind": "modeldest"
+ }
+ }
+ }
+
+ :type volumes_dict: str
+ :return: list of volumes to pass to docker. Example:
+ .. code-block:: python
+
+ ["codesrc:codedest", "modelsrc:modeldest"]
+ :rtype: List[str]
+ """
+ return list(volumes_dict.keys())
+
+
+def get_container_labels(
+ endpoint_name: str,
+ deployment_name: str,
+ endpoint_metadata: dict,
+ deployment_metadata: dict,
+ azureml_port: int,
+) -> dict:
+ labels = DEFAULT_LABELS.copy()
+ labels[LocalEndpointConstants.LABEL_KEY_ENDPOINT_NAME] = endpoint_name
+ labels[LocalEndpointConstants.LABEL_KEY_DEPLOYMENT_NAME] = deployment_name
+ labels[LocalEndpointConstants.LABEL_KEY_ENDPOINT_JSON] = endpoint_metadata
+ labels[LocalEndpointConstants.LABEL_KEY_DEPLOYMENT_JSON] = deployment_metadata
+ labels[LocalEndpointConstants.LABEL_KEY_AZUREML_PORT] = str(azureml_port)
+ return labels
+
+
+def get_endpoint_json_from_container(
+ container: "docker.models.containers.Container", # type: ignore[name-defined]
+) -> Optional[dict]:
+ if container:
+ data = container.labels[LocalEndpointConstants.LABEL_KEY_ENDPOINT_JSON]
+ return json.loads(data)
+ return None
+
+
+def get_deployment_json_from_container(
+ container: "docker.models.containers.Container", # type: ignore[name-defined]
+) -> Optional[dict]:
+ if container:
+ data = container.labels[LocalEndpointConstants.LABEL_KEY_DEPLOYMENT_JSON]
+ return json.loads(data)
+ return None
+
+
+def get_status_from_container(container: "docker.models.containers.Container") -> str: # type: ignore[name-defined]
+ """Returns status of container.
+
+ :param container: container of local Deployment
+ :type container: docker.models.containers.Container
+ :return: container status
+ :rtype: str
+ """
+ return container.status
+
+
+def get_scoring_uri_from_container(
+ container: "docker.models.containers.Container", # type: ignore[name-defined]
+) -> str:
+ """Returns scoring_uri of container.
+
+ :param container: container of local Deployment
+ :type container: docker.models.containers.Container
+ :return: container scoring_uri
+ :rtype: str
+ """
+ port = 5001
+ # Example container.ports: {'5001/tcp': [{'HostIp': '0.0.0.0', 'HostPort': '5001'}],
+ # '8883/tcp': None, '8888/tcp': None }
+ if container is not None and container.ports is not None:
+ azureml_port = container.labels["azureml-port"]
+ for docker_port, host_addresses in container.ports.items():
+ if azureml_port in docker_port and host_addresses is not None:
+ for address in host_addresses:
+ if "HostPort" in address:
+ port = address["HostPort"]
+ break
+ # TODO: resolve scoring path correctly
+ return f"http://localhost:{port}/score"
+
+
+def _get_image_name(endpoint_name: str, deployment_name: str) -> str:
+ """Returns an image name.
+
+ :param endpoint_name: name of local endpoint
+ :type endpoint_name: str
+ :param deployment_name: name of local deployment
+ :type deployment_name: str
+ :return: image name
+ :rtype: str
+ """
+ return f"{endpoint_name}:{deployment_name}"
+
+
+def _get_container_name(endpoint_name: str, deployment_name: Optional[str] = None) -> str:
+ """Returns a container name.
+
+ :param endpoint_name: name of local endpoint
+ :type endpoint_name: str
+ :param deployment_name: name of local deployment
+ :type deployment_name: str
+ :return: container name
+ :rtype: str
+ """
+ return f"{endpoint_name}.{deployment_name}" if deployment_name else endpoint_name
+
+
+def _validate_container_state(
+ endpoint_name: str,
+ deployment_name: str,
+ container: "docker.models.containers.Container", # type: ignore[name-defined]
+):
+ """Returns a container name.
+
+ :param endpoint_name: name of local endpoint
+ :type endpoint_name: str
+ :param deployment_name: name of local deployment
+ :type deployment_name: str
+ :param container: container of local Deployment
+ :type container: docker.models.containers.Container
+ :raises: azure.ai.ml._local_endpoints.errors.LocalEndpointInFailedStateError
+ """
+ status = get_status_from_container(container=container)
+ if LocalEndpointConstants.CONTAINER_EXITED == status:
+ raise LocalEndpointInFailedStateError(endpoint_name=endpoint_name, deployment_name=deployment_name)