diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/_local_endpoints/docker_client.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/azure/ai/ml/_local_endpoints/docker_client.py | 568 |
1 files changed, 568 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_local_endpoints/docker_client.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_local_endpoints/docker_client.py new file mode 100644 index 00000000..beb9db71 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_local_endpoints/docker_client.py @@ -0,0 +1,568 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +# pylint: disable=missing-client-constructor-parameter-credential,missing-client-constructor-parameter-kwargs +# pylint: disable=client-accepts-api-version-keyword + +import json +import logging +import time +from typing import Dict, List, Optional + +from azure.ai.ml._local_endpoints.local_endpoint_mode import LocalEndpointMode +from azure.ai.ml._local_endpoints.vscode_debug.vscode_client import VSCodeClient +from azure.ai.ml._utils._logger_utils import initialize_logger_info +from azure.ai.ml._utils.utils import DockerProxy +from azure.ai.ml.constants._endpoint import LocalEndpointConstants +from azure.ai.ml.exceptions import ( + DockerEngineNotAvailableError, + InvalidLocalEndpointError, + LocalEndpointImageBuildError, + LocalEndpointInFailedStateError, + LocalEndpointNotFoundError, + MultipleLocalDeploymentsFoundError, +) + +docker = DockerProxy() +module_logger = logging.getLogger(__name__) +initialize_logger_info(module_logger, terminator="") + +DEFAULT_LABELS: Dict = { + LocalEndpointConstants.LABEL_KEY_AZUREML_LOCAL_ENDPOINT: "", + LocalEndpointConstants.LABEL_KEY_ENDPOINT_NAME: "", + LocalEndpointConstants.LABEL_KEY_DEPLOYMENT_NAME: "", + LocalEndpointConstants.LABEL_KEY_ENDPOINT_JSON: "", + LocalEndpointConstants.LABEL_KEY_DEPLOYMENT_JSON: "", + LocalEndpointConstants.LABEL_KEY_AZUREML_PORT: "", +} + + +class DockerClient(object): + """Client for interacting with User's Docker environment for local + endpoints.""" + + # pylint: disable=client-method-missing-type-annotations + + def __init__( + self, + client: Optional["docker.DockerClient"] = None, # type: ignore[name-defined] + vscode_client: Optional[VSCodeClient] = None, + ): + self._lazy_client = client + self._vscode_client = vscode_client if vscode_client else VSCodeClient() + + @property + def _client(self) -> "docker.DockerClient": # type: ignore[name-defined] + """Lazy initializer for docker-py client. + + :return: docker.client.DockerClient + :raises: azure.ai.ml._local_endpoints.errors.DockerEngineNotAvailableError + """ + if self._lazy_client is None: + try: + self._lazy_client = docker.from_env() + except docker.errors.DockerException as e: + if "Error while fetching server API version" in str(e): + raise DockerEngineNotAvailableError() from e + raise + return self._lazy_client + + def create_endpoint( + self, + endpoint_name: str, + endpoint_metadata: str, + build_directory: str, + image_name: str, + dockerfile_path: str, + ) -> None: + try: + self._client.images.build(path=build_directory, tag=image_name, dockerfile=dockerfile_path) + except docker.errors.BuildError: + pass + self.delete(endpoint_name=endpoint_name, verify_exists=False) + + labels = DEFAULT_LABELS.copy() + labels[LocalEndpointConstants.LABEL_KEY_ENDPOINT_NAME] = endpoint_name + labels[LocalEndpointConstants.LABEL_KEY_ENDPOINT_JSON] = endpoint_metadata + container_name = _get_container_name(endpoint_name) + self._client.containers.run( + image_name, + name=container_name, + labels=labels, + detach=True, + tty=True, + publish_all_ports=True, + ) + + # pylint: disable=client-method-has-more-than-5-positional-arguments + + def create_deployment( + self, + endpoint_name: str, + deployment_name: str, + endpoint_metadata: str, + deployment_metadata: str, + build_directory: str, + dockerfile_path: str, + conda_source_path: str, + conda_yaml_contents: str, + volumes: dict, + environment: dict, + azureml_port: int, + local_endpoint_mode: LocalEndpointMode, + prebuilt_image_name: Optional[str] = None, + local_enable_gpu: Optional[bool] = False, + ) -> None: + """Builds and runs an image from provided image context. + + :param endpoint_name: name of local endpoint + :type endpoint_name: str + :param deployment_name: name of local deployment + :type deployment_name: str + :param endpoint_metadata: Endpoint entity information serialized. + :type endpoint_metadata: str + :param deployment_metadata: Deployment entity information serialized. + :type deployment_metadata: str + :param build_directory: directory on user's local system to write conda file + :type build_directory: str + :param dockerfile_path: directory on user's local system to write Dockerfile + :type dockerfile_path: str + :param conda_source_path: source of conda file (either path on user's local machine or environment ID) + :type conda_source_path: str + :param conda_yaml_contents: contents of user's conda file for docker build + :type conda_yaml_contents: str + :param volumes: dictionary of volumes to mount to docker container + :type volumes: dict + :param environment: dictionary of docker environment variables to set in container + :type environment: dict + :param azureml_port: Port exposed in Docker image for AzureML service. + :type azureml_port: int + :param local_endpoint_mode: Mode for how to create the local user container. + :type local_endpoint_mode: LocalEndpointMode + :param prebuilt_image_name: Name of pre-built image from customer if using BYOC flow. + :type prebuilt_image_name: str + :param local_enable_gpu: enable local container to access gpu + :type local_enable_gpu: bool + """ + # Prepare image + if prebuilt_image_name is None: + image_name = _get_image_name(endpoint_name, deployment_name) + module_logger.debug("Building local image '%s'\n", image_name) + module_logger.debug("Build directory: '%s'\n", build_directory) + module_logger.debug("Dockerfile path: '%s'\n", dockerfile_path) + module_logger.debug("Image '%s' is built.", image_name) + self._build_image( + build_directory=build_directory, + image_name=image_name, + dockerfile_path=dockerfile_path, + conda_source_path=conda_source_path, + conda_yaml_contents=conda_yaml_contents, + ) + else: + image_name = prebuilt_image_name + try: + self._client.images.get(image_name) + except docker.errors.ImageNotFound: + module_logger.info("\nDid not find image '%s' locally. Pulling from registry.\n", image_name) + try: + self._client.images.pull(image_name) + except docker.errors.NotFound as e: + raise InvalidLocalEndpointError( + message=( + f"Could not find image '{image_name}' locally or in registry. " + "Please check your image name." + ), + no_personal_data_message=( + "Could not find image locally or in registry. Please check your image name." + ), + ) from e + + module_logger.info("\nStarting up endpoint") + # Delete container if exists + self.delete(endpoint_name=endpoint_name, verify_exists=False) + + labels = get_container_labels( + endpoint_name=endpoint_name, + deployment_name=deployment_name, + endpoint_metadata=endpoint_metadata, # type: ignore[arg-type] + deployment_metadata=deployment_metadata, # type: ignore[arg-type] + azureml_port=azureml_port, + ) + module_logger.debug("Setting labels: '%s'\n", labels) + module_logger.debug("Mounting volumes: '%s'\n", volumes) + module_logger.debug("Setting environment variables: '%s'\n", environment) + container_name = _get_container_name(endpoint_name, deployment_name) + device_requests = [docker.types.DeviceRequest(count=-1, capabilities=[["gpu"]])] if local_enable_gpu else None + container = self._client.containers.create( + image_name, + name=container_name, + labels=labels, + volumes=self._reformat_volumes(volumes), + environment=environment, + detach=True, + tty=True, + publish_all_ports=True, + device_requests=device_requests, + ) + if local_endpoint_mode == LocalEndpointMode.VSCodeDevContainer: + try: + devcontainer_path = self._vscode_client.create_dev_container_json( + azureml_container=container, + endpoint_name=endpoint_name, + deployment_name=deployment_name, + build_directory=build_directory, + image_name=image_name, + environment=environment, + volumes=volumes, # type: ignore[arg-type] + labels=labels, + ) + finally: + # This pre-created container is only used for retrieving the entry script + # to add debugpy statements + container.remove() + app_path = environment[LocalEndpointConstants.ENVVAR_KEY_AML_APP_ROOT] + self._vscode_client.invoke_dev_container(devcontainer_path=devcontainer_path, app_path=app_path) + time.sleep(LocalEndpointConstants.DEFAULT_STARTUP_WAIT_TIME_SECONDS) + else: + container.start() + time.sleep(LocalEndpointConstants.DEFAULT_STARTUP_WAIT_TIME_SECONDS) + container.reload() + _validate_container_state( + endpoint_name=endpoint_name, + deployment_name=deployment_name, + container=container, + ) + scoring_uri = self.get_scoring_uri(endpoint_name=endpoint_name, deployment_name=deployment_name) + module_logger.debug("Container '%s' is up and running at '%s'\n", container_name, scoring_uri) + + def delete( + self, + endpoint_name: str, + deployment_name: Optional[str] = None, + verify_exists: bool = True, + ) -> None: + """Deletes local endpoint / deployment. + + :param endpoint_name: name of local endpoint + :type endpoint_name: str + :param deployment_name: name of local deployment + :type deployment_name: (str, optional) + :param verify_exists: Verify that the endpoint exists on deletion. Default: True + :type verify_exists: (bool, optional) + :raises: azure.ai.ml._local_endpoints.errors.LocalEndpointNotFoundError + """ + containers = self.list_containers(endpoint_name=endpoint_name, deployment_name=deployment_name) + if verify_exists and len(containers) == 0: + raise LocalEndpointNotFoundError(endpoint_name=endpoint_name, deployment_name=deployment_name) + + for container in containers: + container.stop() + container.remove() + module_logger.debug("Endpoint container '%s' is removed.", container.name) + + def get_endpoint(self, endpoint_name: str) -> Optional[dict]: + """Returns metadata for local endpoint or deployment. + + :param endpoint_name: name of local endpoint + :type endpoint_name: str + :returns: JSON dict representing user provided endpoint input + :rtype: dict + """ + container = self.get_endpoint_container(endpoint_name=endpoint_name) + if container is None: + raise LocalEndpointNotFoundError(endpoint_name=endpoint_name) + return get_endpoint_json_from_container(container=container) + + def get_deployment(self, endpoint_name: str, deployment_name: Optional[str] = None) -> Optional[dict]: + """Returns metadata for local deployment. + + :param endpoint_name: name of local endpoint + :type endpoint_name: str + :param deployment_name: name of local deployment + :type deployment_name: (str, optional) + :return: JSON dict representing user provided endpoint input + :rtype: dict + """ + container = self.get_endpoint_container(endpoint_name=endpoint_name, deployment_name=deployment_name) + if container is None: + raise LocalEndpointNotFoundError(endpoint_name=endpoint_name, deployment_name=deployment_name) + return get_deployment_json_from_container(container=container) + + def get_scoring_uri(self, endpoint_name: str, deployment_name: Optional[str] = None) -> Optional[str]: + """Returns scoring uri for local endpoint or deployment. + + :param endpoint_name: name of local endpoint + :type endpoint_name: str + :param deployment_name: name of local deployment + :type deployment_name: (str, optional) + :raises: azure.ai.ml._local_endpoints.errors.LocalEndpointNotFoundError + :raises: azure.ai.ml._local_endpoints.errors.MultipleLocalDeploymentsFoundError + """ + container = self.get_endpoint_container( + endpoint_name=endpoint_name, + deployment_name=deployment_name, + verify_single_deployment=True, + ) + if container is None: + return None + _validate_container_state( + endpoint_name=endpoint_name, + deployment_name=str(deployment_name), + container=container, + ) + return get_scoring_uri_from_container(container=container) + + def logs(self, endpoint_name: str, deployment_name: str, lines: int) -> str: + """Returns logs from local deployment. + + :param endpoint_name: name of local endpoint + :type endpoint_name: str + :param deployment_name: name of local deployment + :type deployment_name: str + :param lines: number of lines to retrieve from container logs + :type lines: int + :return: Deployment logs + :rtype: str + :raises: azure.ai.ml._local_endpoints.errors.LocalEndpointNotFoundError + """ + container = self.get_endpoint_container(endpoint_name, deployment_name=deployment_name) + if container is None: + raise LocalEndpointNotFoundError(endpoint_name=endpoint_name, deployment_name=deployment_name) + return container.logs(tail=int(lines)).decode() + + def list_containers( + self, + endpoint_name: Optional[str] = None, + deployment_name: Optional[str] = None, + include_stopped: bool = True, + ) -> list: + """Returns a list of local endpoints. + + :param endpoint_name: Name of local endpoint. If none, all local endpoints will be returned. + :type endpoint_name: (str, optional) + :param deployment_name: Name of local deployment. If none, all deployments under endpoint will be returned. + :type deployment_name: (str, optional) + :param include_stopped: Include stopped containers. Default: True. + :type include_stopped: (str, optional) + :return: array of Container objects from docker-py library + :rtype: List[docker.models.containers.Container] + """ + filters = {"label": [f"{LocalEndpointConstants.LABEL_KEY_AZUREML_LOCAL_ENDPOINT}"]} + if endpoint_name: + filters["label"].append(f"{LocalEndpointConstants.LABEL_KEY_ENDPOINT_NAME}={endpoint_name}") + if deployment_name: + filters["label"].append(f"{LocalEndpointConstants.LABEL_KEY_DEPLOYMENT_NAME}={deployment_name}") + + return self._client.containers.list(filters=filters, all=include_stopped) + + def get_endpoint_container( + self, + endpoint_name: str, + deployment_name: Optional[str] = None, + verify_single_deployment: bool = False, + include_stopped: bool = True, + ) -> "docker.models.containers.Container": # type: ignore[name-defined] + """Builds and runs an image from provided image context. + + :param endpoint_name: name of local endpoint + :type endpoint_name: str + :param deployment_name: name of local deployment + :type deployment_name: (str, optional) + :param verify_single_deployment: Fail if more than one deployment container exists + :type verify_single_deployment: (bool, optional) + :param include_stopped: Include container even if it's stopped. Default: True. + :type include_stopped: (bool, optional) + :returns: The docker container + :rtype: docker.models.containers.Container + """ + containers = self.list_containers( + endpoint_name=endpoint_name, + deployment_name=deployment_name, + include_stopped=include_stopped, + ) + if len(containers) == 0: + return None + if len(containers) > 1 and verify_single_deployment: + raise MultipleLocalDeploymentsFoundError(endpoint_name=endpoint_name) + return containers[0] + + def _build_image( + self, + build_directory: str, + image_name: str, + dockerfile_path: str, + conda_source_path: str, # pylint: disable=unused-argument + conda_yaml_contents: str, # pylint: disable=unused-argument + ) -> None: + try: + module_logger.info("\nBuilding Docker image from Dockerfile") + first_line = True + for status in self._client.api.build( + path=build_directory, + tag=image_name, + dockerfile=dockerfile_path, + pull=True, + decode=True, + quiet=False, + ): + if first_line: + module_logger.info("\n") + first_line = False + if "stream" in status: + if "An unexpected error has occurred. Conda has prepared the above report." in status["stream"]: + raise LocalEndpointImageBuildError(status["stream"]) + module_logger.info(status["stream"]) + + if "error" in status: + module_logger.info(status["error"]) + raise LocalEndpointImageBuildError(status["error"]) + except docker.errors.APIError as e: + raise LocalEndpointImageBuildError(e) from e + except Exception as e: + if isinstance(e, LocalEndpointImageBuildError): + raise + raise LocalEndpointImageBuildError(e) from e + + def _reformat_volumes(self, volumes_dict: Dict[str, Dict[str, Dict[str, str]]]) -> List[str]: + """Returns a list of volumes to pass to docker. + + :param volumes_dict: custom formatted dict of volumes to mount. We expect the keys to be unique. Example: + .. code-block:: python + + { + "codesrc:codedest": { + "codesrc": { + "bind": "codedest" + } + }, + "modelsrc:modeldest": { + "modelsrc": { + "bind": "modeldest" + } + } + } + + :type volumes_dict: str + :return: list of volumes to pass to docker. Example: + .. code-block:: python + + ["codesrc:codedest", "modelsrc:modeldest"] + :rtype: List[str] + """ + return list(volumes_dict.keys()) + + +def get_container_labels( + endpoint_name: str, + deployment_name: str, + endpoint_metadata: dict, + deployment_metadata: dict, + azureml_port: int, +) -> dict: + labels = DEFAULT_LABELS.copy() + labels[LocalEndpointConstants.LABEL_KEY_ENDPOINT_NAME] = endpoint_name + labels[LocalEndpointConstants.LABEL_KEY_DEPLOYMENT_NAME] = deployment_name + labels[LocalEndpointConstants.LABEL_KEY_ENDPOINT_JSON] = endpoint_metadata + labels[LocalEndpointConstants.LABEL_KEY_DEPLOYMENT_JSON] = deployment_metadata + labels[LocalEndpointConstants.LABEL_KEY_AZUREML_PORT] = str(azureml_port) + return labels + + +def get_endpoint_json_from_container( + container: "docker.models.containers.Container", # type: ignore[name-defined] +) -> Optional[dict]: + if container: + data = container.labels[LocalEndpointConstants.LABEL_KEY_ENDPOINT_JSON] + return json.loads(data) + return None + + +def get_deployment_json_from_container( + container: "docker.models.containers.Container", # type: ignore[name-defined] +) -> Optional[dict]: + if container: + data = container.labels[LocalEndpointConstants.LABEL_KEY_DEPLOYMENT_JSON] + return json.loads(data) + return None + + +def get_status_from_container(container: "docker.models.containers.Container") -> str: # type: ignore[name-defined] + """Returns status of container. + + :param container: container of local Deployment + :type container: docker.models.containers.Container + :return: container status + :rtype: str + """ + return container.status + + +def get_scoring_uri_from_container( + container: "docker.models.containers.Container", # type: ignore[name-defined] +) -> str: + """Returns scoring_uri of container. + + :param container: container of local Deployment + :type container: docker.models.containers.Container + :return: container scoring_uri + :rtype: str + """ + port = 5001 + # Example container.ports: {'5001/tcp': [{'HostIp': '0.0.0.0', 'HostPort': '5001'}], + # '8883/tcp': None, '8888/tcp': None } + if container is not None and container.ports is not None: + azureml_port = container.labels["azureml-port"] + for docker_port, host_addresses in container.ports.items(): + if azureml_port in docker_port and host_addresses is not None: + for address in host_addresses: + if "HostPort" in address: + port = address["HostPort"] + break + # TODO: resolve scoring path correctly + return f"http://localhost:{port}/score" + + +def _get_image_name(endpoint_name: str, deployment_name: str) -> str: + """Returns an image name. + + :param endpoint_name: name of local endpoint + :type endpoint_name: str + :param deployment_name: name of local deployment + :type deployment_name: str + :return: image name + :rtype: str + """ + return f"{endpoint_name}:{deployment_name}" + + +def _get_container_name(endpoint_name: str, deployment_name: Optional[str] = None) -> str: + """Returns a container name. + + :param endpoint_name: name of local endpoint + :type endpoint_name: str + :param deployment_name: name of local deployment + :type deployment_name: str + :return: container name + :rtype: str + """ + return f"{endpoint_name}.{deployment_name}" if deployment_name else endpoint_name + + +def _validate_container_state( + endpoint_name: str, + deployment_name: str, + container: "docker.models.containers.Container", # type: ignore[name-defined] +): + """Returns a container name. + + :param endpoint_name: name of local endpoint + :type endpoint_name: str + :param deployment_name: name of local deployment + :type deployment_name: str + :param container: container of local Deployment + :type container: docker.models.containers.Container + :raises: azure.ai.ml._local_endpoints.errors.LocalEndpointInFailedStateError + """ + status = get_status_from_container(container=container) + if LocalEndpointConstants.CONTAINER_EXITED == status: + raise LocalEndpointInFailedStateError(endpoint_name=endpoint_name, deployment_name=deployment_name) |