aboutsummaryrefslogtreecommitdiff
import ipaddress
import os
import io
import json
import httpx
import binascii
import platform
import urllib.parse
from os import PathLike
from pathlib import Path
from copy import deepcopy
from hashlib import sha256
from base64 import b64encode, b64decode

from typing import Any, AnyStr, Union, Optional, Sequence, Mapping, Literal, overload

import sys

if sys.version_info < (3, 9):
  from typing import Iterator, AsyncIterator
else:
  from collections.abc import Iterator, AsyncIterator

from importlib import metadata

try:
  __version__ = metadata.version('ollama')
except metadata.PackageNotFoundError:
  __version__ = '0.0.0'

from ollama._types import Message, Options, RequestError, ResponseError, Tool


class BaseClient:
  def __init__(
    self,
    client,
    host: Optional[str] = None,
    follow_redirects: bool = True,
    timeout: Any = None,
    **kwargs,
  ) -> None:
    """
    Creates a httpx client. Default parameters are the same as those defined in httpx
    except for the following:
    - `follow_redirects`: True
    - `timeout`: None
    `kwargs` are passed to the httpx client.
    """

    headers = kwargs.pop('headers', {})
    headers['Content-Type'] = 'application/json'
    headers['Accept'] = 'application/json'
    headers['User-Agent'] = f'ollama-python/{__version__} ({platform.machine()} {platform.system().lower()}) Python/{platform.python_version()}'

    self._client = client(
      base_url=_parse_host(host or os.getenv('OLLAMA_HOST')),
      follow_redirects=follow_redirects,
      timeout=timeout,
      headers=headers,
      **kwargs,
    )


class Client(BaseClient):
  def __init__(self, host: Optional[str] = None, **kwargs) -> None:
    super().__init__(httpx.Client, host, **kwargs)

  def _request(self, method: str, url: str, **kwargs) -> httpx.Response:
    response = self._client.request(method, url, **kwargs)

    try:
      response.raise_for_status()
    except httpx.HTTPStatusError as e:
      raise ResponseError(e.response.text, e.response.status_code) from None

    return response

  def _stream(self, method: str, url: str, **kwargs) -> Iterator[Mapping[str, Any]]:
    with self._client.stream(method, url, **kwargs) as r:
      try:
        r.raise_for_status()
      except httpx.HTTPStatusError as e:
        e.response.read()
        raise ResponseError(e.response.text, e.response.status_code) from None

      for line in r.iter_lines():
        partial = json.loads(line)
        if e := partial.get('error'):
          raise ResponseError(e)
        yield partial

  def _request_stream(
    self,
    *args,
    stream: bool = False,
    **kwargs,
  ) -> Union[Mapping[str, Any], Iterator[Mapping[str, Any]]]:
    return self._stream(*args, **kwargs) if stream else self._request(*args, **kwargs).json()

  @overload
  def generate(
    self,
    model: str = '',
    prompt: str = '',
    suffix: str = '',
    system: str = '',
    template: str = '',
    context: Optional[Sequence[int]] = None,
    stream: Literal[False] = False,
    raw: bool = False,
    format: Literal['', 'json'] = '',
    images: Optional[Sequence[AnyStr]] = None,
    options: Optional[Options] = None,
    keep_alive: Optional[Union[float, str]] = None,
  ) -> Mapping[str, Any]: ...

  @overload
  def generate(
    self,
    model: str = '',
    prompt: str = '',
    suffix: str = '',
    system: str = '',
    template: str = '',
    context: Optional[Sequence[int]] = None,
    stream: Literal[True] = True,
    raw: bool = False,
    format: Literal['', 'json'] = '',
    images: Optional[Sequence[AnyStr]] = None,
    options: Optional[Options] = None,
    keep_alive: Optional[Union[float, str]] = None,
  ) -> Iterator[Mapping[str, Any]]: ...

  def generate(
    self,
    model: str = '',
    prompt: str = '',
    suffix: str = '',
    system: str = '',
    template: str = '',
    context: Optional[Sequence[int]] = None,
    stream: bool = False,
    raw: bool = False,
    format: Literal['', 'json'] = '',
    images: Optional[Sequence[AnyStr]] = None,
    options: Optional[Options] = None,
    keep_alive: Optional[Union[float, str]] = None,
  ) -> Union[Mapping[str, Any], Iterator[Mapping[str, Any]]]:
    """
    Create a response using the requested model.

    Raises `RequestError` if a model is not provided.

    Raises `ResponseError` if the request could not be fulfilled.

    Returns `GenerateResponse` if `stream` is `False`, otherwise returns a `GenerateResponse` generator.
    """

    if not model:
      raise RequestError('must provide a model')

    return self._request_stream(
      'POST',
      '/api/generate',
      json={
        'model': model,
        'prompt': prompt,
        'suffix': suffix,
        'system': system,
        'template': template,
        'context': context or [],
        'stream': stream,
        'raw': raw,
        'images': [_encode_image(image) for image in images or []],
        'format': format,
        'options': options or {},
        'keep_alive': keep_alive,
      },
      stream=stream,
    )

  @overload
  def chat(
    self,
    model: str = '',
    messages: Optional[Sequence[Message]] = None,
    tools: Optional[Sequence[Tool]] = None,
    stream: Literal[False] = False,
    format: Literal['', 'json'] = '',
    options: Optional[Options] = None,
    keep_alive: Optional[Union[float, str]] = None,
  ) -> Mapping[str, Any]: ...

  @overload
  def chat(
    self,
    model: str = '',
    messages: Optional[Sequence[Message]] = None,
    tools: Optional[Sequence[Tool]] = None,
    stream: Literal[True] = True,
    format: Literal['', 'json'] = '',
    options: Optional[Options] = None,
    keep_alive: Optional[Union[float, str]] = None,
  ) -> Iterator[Mapping[str, Any]]: ...

  def chat(
    self,
    model: str = '',
    messages: Optional[Sequence[Message]] = None,
    tools: Optional[Sequence[Tool]] = None,
    stream: bool = False,
    format: Literal['', 'json'] = '',
    options: Optional[Options] = None,
    keep_alive: Optional[Union[float, str]] = None,
  ) -> Union[Mapping[str, Any], Iterator[Mapping[str, Any]]]:
    """
    Create a chat response using the requested model.

    Raises `RequestError` if a model is not provided.

    Raises `ResponseError` if the request could not be fulfilled.

    Returns `ChatResponse` if `stream` is `False`, otherwise returns a `ChatResponse` generator.
    """

    if not model:
      raise RequestError('must provide a model')

    messages = deepcopy(messages)

    for message in messages or []:
      if images := message.get('images'):
        message['images'] = [_encode_image(image) for image in images]

    return self._request_stream(
      'POST',
      '/api/chat',
      json={
        'model': model,
        'messages': messages,
        'tools': tools or [],
        'stream': stream,
        'format': format,
        'options': options or {},
        'keep_alive': keep_alive,
      },
      stream=stream,
    )

  def embed(
    self,
    model: str = '',
    input: Union[str, Sequence[AnyStr]] = '',
    truncate: bool = True,
    options: Optional[Options] = None,
    keep_alive: Optional[Union[float, str]] = None,
  ) -> Mapping[str, Any]:
    if not model:
      raise RequestError('must provide a model')

    return self._request(
      'POST',
      '/api/embed',
      json={
        'model': model,
        'input': input,
        'truncate': truncate,
        'options': options or {},
        'keep_alive': keep_alive,
      },
    ).json()

  def embeddings(
    self,
    model: str = '',
    prompt: str = '',
    options: Optional[Options] = None,
    keep_alive: Optional[Union[float, str]] = None,
  ) -> Mapping[str, Sequence[float]]:
    return self._request(
      'POST',
      '/api/embeddings',
      json={
        'model': model,
        'prompt': prompt,
        'options': options or {},
        'keep_alive': keep_alive,
      },
    ).json()

  @overload
  def pull(
    self,
    model: str,
    insecure: bool = False,
    stream: Literal[False] = False,
  ) -> Mapping[str, Any]: ...

  @overload
  def pull(
    self,
    model: str,
    insecure: bool = False,
    stream: Literal[True] = True,
  ) -> Iterator[Mapping[str, Any]]: ...

  def pull(
    self,
    model: str,
    insecure: bool = False,
    stream: bool = False,
  ) -> Union[Mapping[str, Any], Iterator[Mapping[str, Any]]]:
    """
    Raises `ResponseError` if the request could not be fulfilled.

    Returns `ProgressResponse` if `stream` is `False`, otherwise returns a `ProgressResponse` generator.
    """
    return self._request_stream(
      'POST',
      '/api/pull',
      json={
        'name': model,
        'insecure': insecure,
        'stream': stream,
      },
      stream=stream,
    )

  @overload
  def push(
    self,
    model: str,
    insecure: bool = False,
    stream: Literal[False] = False,
  ) -> Mapping[str, Any]: ...

  @overload
  def push(
    self,
    model: str,
    insecure: bool = False,
    stream: Literal[True] = True,
  ) -> Iterator[Mapping[str, Any]]: ...

  def push(
    self,
    model: str,
    insecure: bool = False,
    stream: bool = False,
  ) -> Union[Mapping[str, Any], Iterator[Mapping[str, Any]]]:
    """
    Raises `ResponseError` if the request could not be fulfilled.

    Returns `ProgressResponse` if `stream` is `False`, otherwise returns a `ProgressResponse` generator.
    """
    return self._request_stream(
      'POST',
      '/api/push',
      json={
        'name': model,
        'insecure': insecure,
        'stream': stream,
      },
      stream=stream,
    )

  @overload
  def create(
    self,
    model: str,
    path: Optional[Union[str, PathLike]] = None,
    modelfile: Optional[str] = None,
    quantize: Optional[str] = None,
    stream: Literal[False] = False,
  ) -> Mapping[str, Any]: ...

  @overload
  def create(
    self,
    model: str,
    path: Optional[Union[str, PathLike]] = None,
    modelfile: Optional[str] = None,
    quantize: Optional[str] = None,
    stream: Literal[True] = True,
  ) -> Iterator[Mapping[str, Any]]: ...

  def create(
    self,
    model: str,
    path: Optional[Union[str, PathLike]] = None,
    modelfile: Optional[str] = None,
    quantize: Optional[str] = None,
    stream: bool = False,
  ) -> Union[Mapping[str, Any], Iterator[Mapping[str, Any]]]:
    """
    Raises `ResponseError` if the request could not be fulfilled.

    Returns `ProgressResponse` if `stream` is `False`, otherwise returns a `ProgressResponse` generator.
    """
    if (realpath := _as_path(path)) and realpath.exists():
      modelfile = self._parse_modelfile(realpath.read_text(), base=realpath.parent)
    elif modelfile:
      modelfile = self._parse_modelfile(modelfile)
    else:
      raise RequestError('must provide either path or modelfile')

    return self._request_stream(
      'POST',
      '/api/create',
      json={
        'name': model,
        'modelfile': modelfile,
        'stream': stream,
        'quantize': quantize,
      },
      stream=stream,
    )

  def _parse_modelfile(self, modelfile: str, base: Optional[Path] = None) -> str:
    base = Path.cwd() if base is None else base

    out = io.StringIO()
    for line in io.StringIO(modelfile):
      command, _, args = line.partition(' ')
      if command.upper() not in ['FROM', 'ADAPTER']:
        print(line, end='', file=out)
        continue

      path = Path(args.strip()).expanduser()
      path = path if path.is_absolute() else base / path
      if path.exists():
        args = f'@{self._create_blob(path)}\n'
      print(command, args, end='', file=out)

    return out.getvalue()

  def _create_blob(self, path: Union[str, Path]) -> str:
    sha256sum = sha256()
    with open(path, 'rb') as r:
      while True:
        chunk = r.read(32 * 1024)
        if not chunk:
          break
        sha256sum.update(chunk)

    digest = f'sha256:{sha256sum.hexdigest()}'

    try:
      self._request('HEAD', f'/api/blobs/{digest}')
    except ResponseError as e:
      if e.status_code != 404:
        raise

      with open(path, 'rb') as r:
        self._request('POST', f'/api/blobs/{digest}', content=r)

    return digest

  def delete(self, model: str) -> Mapping[str, Any]:
    response = self._request('DELETE', '/api/delete', json={'name': model})
    return {'status': 'success' if response.status_code == 200 else 'error'}

  def list(self) -> Mapping[str, Any]:
    return self._request('GET', '/api/tags').json()

  def copy(self, source: str, destination: str) -> Mapping[str, Any]:
    response = self._request('POST', '/api/copy', json={'source': source, 'destination': destination})
    return {'status': 'success' if response.status_code == 200 else 'error'}

  def show(self, model: str) -> Mapping[str, Any]:
    return self._request('POST', '/api/show', json={'name': model}).json()

  def ps(self) -> Mapping[str, Any]:
    return self._request('GET', '/api/ps').json()


class AsyncClient(BaseClient):
  def __init__(self, host: Optional[str] = None, **kwargs) -> None:
    super().__init__(httpx.AsyncClient, host, **kwargs)

  async def _request(self, method: str, url: str, **kwargs) -> httpx.Response:
    response = await self._client.request(method, url, **kwargs)

    try:
      response.raise_for_status()
    except httpx.HTTPStatusError as e:
      raise ResponseError(e.response.text, e.response.status_code) from None

    return response

  async def _stream(self, method: str, url: str, **kwargs) -> AsyncIterator[Mapping[str, Any]]:
    async def inner():
      async with self._client.stream(method, url, **kwargs) as r:
        try:
          r.raise_for_status()
        except httpx.HTTPStatusError as e:
          await e.response.aread()
          raise ResponseError(e.response.text, e.response.status_code) from None

        async for line in r.aiter_lines():
          partial = json.loads(line)
          if e := partial.get('error'):
            raise ResponseError(e)
          yield partial

    return inner()

  async def _request_stream(
    self,
    *args,
    stream: bool = False,
    **kwargs,
  ) -> Union[Mapping[str, Any], AsyncIterator[Mapping[str, Any]]]:
    if stream:
      return await self._stream(*args, **kwargs)

    response = await self._request(*args, **kwargs)
    return response.json()

  @overload
  async def generate(
    self,
    model: str = '',
    prompt: str = '',
    suffix: str = '',
    system: str = '',
    template: str = '',
    context: Optional[Sequence[int]] = None,
    stream: Literal[False] = False,
    raw: bool = False,
    format: Literal['', 'json'] = '',
    images: Optional[Sequence[AnyStr]] = None,
    options: Optional[Options] = None,
    keep_alive: Optional[Union[float, str]] = None,
  ) -> Mapping[str, Any]: ...

  @overload
  async def generate(
    self,
    model: str = '',
    prompt: str = '',
    suffix: str = '',
    system: str = '',
    template: str = '',
    context: Optional[Sequence[int]] = None,
    stream: Literal[True] = True,
    raw: bool = False,
    format: Literal['', 'json'] = '',
    images: Optional[Sequence[AnyStr]] = None,
    options: Optional[Options] = None,
    keep_alive: Optional[Union[float, str]] = None,
  ) -> AsyncIterator[Mapping[str, Any]]: ...

  async def generate(
    self,
    model: str = '',
    prompt: str = '',
    suffix: str = '',
    system: str = '',
    template: str = '',
    context: Optional[Sequence[int]] = None,
    stream: bool = False,
    raw: bool = False,
    format: Literal['', 'json'] = '',
    images: Optional[Sequence[AnyStr]] = None,
    options: Optional[Options] = None,
    keep_alive: Optional[Union[float, str]] = None,
  ) -> Union[Mapping[str, Any], AsyncIterator[Mapping[str, Any]]]:
    """
    Create a response using the requested model.

    Raises `RequestError` if a model is not provided.

    Raises `ResponseError` if the request could not be fulfilled.

    Returns `GenerateResponse` if `stream` is `False`, otherwise returns an asynchronous `GenerateResponse` generator.
    """
    if not model:
      raise RequestError('must provide a model')

    return await self._request_stream(
      'POST',
      '/api/generate',
      json={
        'model': model,
        'prompt': prompt,
        'suffix': suffix,
        'system': system,
        'template': template,
        'context': context or [],
        'stream': stream,
        'raw': raw,
        'images': [_encode_image(image) for image in images or []],
        'format': format,
        'options': options or {},
        'keep_alive': keep_alive,
      },
      stream=stream,
    )

  @overload
  async def chat(
    self,
    model: str = '',
    messages: Optional[Sequence[Message]] = None,
    tools: Optional[Sequence[Tool]] = None,
    stream: Literal[False] = False,
    format: Literal['', 'json'] = '',
    options: Optional[Options] = None,
    keep_alive: Optional[Union[float, str]] = None,
  ) -> Mapping[str, Any]: ...

  @overload
  async def chat(
    self,
    model: str = '',
    messages: Optional[Sequence[Message]] = None,
    tools: Optional[Sequence[Tool]] = None,
    stream: Literal[True] = True,
    format: Literal['', 'json'] = '',
    options: Optional[Options] = None,
    keep_alive: Optional[Union[float, str]] = None,
  ) -> AsyncIterator[Mapping[str, Any]]: ...

  async def chat(
    self,
    model: str = '',
    messages: Optional[Sequence[Message]] = None,
    tools: Optional[Sequence[Tool]] = None,
    stream: bool = False,
    format: Literal['', 'json'] = '',
    options: Optional[Options] = None,
    keep_alive: Optional[Union[float, str]] = None,
  ) -> Union[Mapping[str, Any], AsyncIterator[Mapping[str, Any]]]:
    """
    Create a chat response using the requested model.

    Raises `RequestError` if a model is not provided.

    Raises `ResponseError` if the request could not be fulfilled.

    Returns `ChatResponse` if `stream` is `False`, otherwise returns an asynchronous `ChatResponse` generator.
    """
    if not model:
      raise RequestError('must provide a model')

    messages = deepcopy(messages)

    for message in messages or []:
      if images := message.get('images'):
        message['images'] = [_encode_image(image) for image in images]

    return await self._request_stream(
      'POST',
      '/api/chat',
      json={
        'model': model,
        'messages': messages,
        'tools': tools or [],
        'stream': stream,
        'format': format,
        'options': options or {},
        'keep_alive': keep_alive,
      },
      stream=stream,
    )

  async def embed(
    self,
    model: str = '',
    input: Union[str, Sequence[AnyStr]] = '',
    truncate: bool = True,
    options: Optional[Options] = None,
    keep_alive: Optional[Union[float, str]] = None,
  ) -> Mapping[str, Any]:
    if not model:
      raise RequestError('must provide a model')

    response = await self._request(
      'POST',
      '/api/embed',
      json={
        'model': model,
        'input': input,
        'truncate': truncate,
        'options': options or {},
        'keep_alive': keep_alive,
      },
    )

    return response.json()

  async def embeddings(
    self,
    model: str = '',
    prompt: str = '',
    options: Optional[Options] = None,
    keep_alive: Optional[Union[float, str]] = None,
  ) -> Mapping[str, Sequence[float]]:
    response = await self._request(
      'POST',
      '/api/embeddings',
      json={
        'model': model,
        'prompt': prompt,
        'options': options or {},
        'keep_alive': keep_alive,
      },
    )

    return response.json()

  @overload
  async def pull(
    self,
    model: str,
    insecure: bool = False,
    stream: Literal[False] = False,
  ) -> Mapping[str, Any]: ...

  @overload
  async def pull(
    self,
    model: str,
    insecure: bool = False,
    stream: Literal[True] = True,
  ) -> AsyncIterator[Mapping[str, Any]]: ...

  async def pull(
    self,
    model: str,
    insecure: bool = False,
    stream: bool = False,
  ) -> Union[Mapping[str, Any], AsyncIterator[Mapping[str, Any]]]:
    """
    Raises `ResponseError` if the request could not be fulfilled.

    Returns `ProgressResponse` if `stream` is `False`, otherwise returns a `ProgressResponse` generator.
    """
    return await self._request_stream(
      'POST',
      '/api/pull',
      json={
        'name': model,
        'insecure': insecure,
        'stream': stream,
      },
      stream=stream,
    )

  @overload
  async def push(
    self,
    model: str,
    insecure: bool = False,
    stream: Literal[False] = False,
  ) -> Mapping[str, Any]: ...

  @overload
  async def push(
    self,
    model: str,
    insecure: bool = False,
    stream: Literal[True] = True,
  ) -> AsyncIterator[Mapping[str, Any]]: ...

  async def push(
    self,
    model: str,
    insecure: bool = False,
    stream: bool = False,
  ) -> Union[Mapping[str, Any], AsyncIterator[Mapping[str, Any]]]:
    """
    Raises `ResponseError` if the request could not be fulfilled.

    Returns `ProgressResponse` if `stream` is `False`, otherwise returns a `ProgressResponse` generator.
    """
    return await self._request_stream(
      'POST',
      '/api/push',
      json={
        'name': model,
        'insecure': insecure,
        'stream': stream,
      },
      stream=stream,
    )

  @overload
  async def create(
    self,
    model: str,
    path: Optional[Union[str, PathLike]] = None,
    modelfile: Optional[str] = None,
    quantize: Optional[str] = None,
    stream: Literal[False] = False,
  ) -> Mapping[str, Any]: ...

  @overload
  async def create(
    self,
    model: str,
    path: Optional[Union[str, PathLike]] = None,
    modelfile: Optional[str] = None,
    quantize: Optional[str] = None,
    stream: Literal[True] = True,
  ) -> AsyncIterator[Mapping[str, Any]]: ...

  async def create(
    self,
    model: str,
    path: Optional[Union[str, PathLike]] = None,
    modelfile: Optional[str] = None,
    quantize: Optional[str] = None,
    stream: bool = False,
  ) -> Union[Mapping[str, Any], AsyncIterator[Mapping[str, Any]]]:
    """
    Raises `ResponseError` if the request could not be fulfilled.

    Returns `ProgressResponse` if `stream` is `False`, otherwise returns a `ProgressResponse` generator.
    """
    if (realpath := _as_path(path)) and realpath.exists():
      modelfile = await self._parse_modelfile(realpath.read_text(), base=realpath.parent)
    elif modelfile:
      modelfile = await self._parse_modelfile(modelfile)
    else:
      raise RequestError('must provide either path or modelfile')

    return await self._request_stream(
      'POST',
      '/api/create',
      json={
        'name': model,
        'modelfile': modelfile,
        'stream': stream,
        'quantize': quantize,
      },
      stream=stream,
    )

  async def _parse_modelfile(self, modelfile: str, base: Optional[Path] = None) -> str:
    base = Path.cwd() if base is None else base

    out = io.StringIO()
    for line in io.StringIO(modelfile):
      command, _, args = line.partition(' ')
      if command.upper() not in ['FROM', 'ADAPTER']:
        print(line, end='', file=out)
        continue

      path = Path(args.strip()).expanduser()
      path = path if path.is_absolute() else base / path
      if path.exists():
        args = f'@{await self._create_blob(path)}\n'
      print(command, args, end='', file=out)

    return out.getvalue()

  async def _create_blob(self, path: Union[str, Path]) -> str:
    sha256sum = sha256()
    with open(path, 'rb') as r:
      while True:
        chunk = r.read(32 * 1024)
        if not chunk:
          break
        sha256sum.update(chunk)

    digest = f'sha256:{sha256sum.hexdigest()}'

    try:
      await self._request('HEAD', f'/api/blobs/{digest}')
    except ResponseError as e:
      if e.status_code != 404:
        raise

      async def upload_bytes():
        with open(path, 'rb') as r:
          while True:
            chunk = r.read(32 * 1024)
            if not chunk:
              break
            yield chunk

      await self._request('POST', f'/api/blobs/{digest}', content=upload_bytes())

    return digest

  async def delete(self, model: str) -> Mapping[str, Any]:
    response = await self._request('DELETE', '/api/delete', json={'name': model})
    return {'status': 'success' if response.status_code == 200 else 'error'}

  async def list(self) -> Mapping[str, Any]:
    response = await self._request('GET', '/api/tags')
    return response.json()

  async def copy(self, source: str, destination: str) -> Mapping[str, Any]:
    response = await self._request('POST', '/api/copy', json={'source': source, 'destination': destination})
    return {'status': 'success' if response.status_code == 200 else 'error'}

  async def show(self, model: str) -> Mapping[str, Any]:
    response = await self._request('POST', '/api/show', json={'name': model})
    return response.json()

  async def ps(self) -> Mapping[str, Any]:
    response = await self._request('GET', '/api/ps')
    return response.json()


def _encode_image(image) -> str:
  """
  >>> _encode_image(b'ollama')
  'b2xsYW1h'
  >>> _encode_image(io.BytesIO(b'ollama'))
  'b2xsYW1h'
  >>> _encode_image('LICENSE')
  'TUlUIExpY2Vuc2UKCkNvcHlyaWdodCAoYykgT2xsYW1hCgpQZXJtaXNzaW9uIGlzIGhlcmVieSBncmFudGVkLCBmcmVlIG9mIGNoYXJnZSwgdG8gYW55IHBlcnNvbiBvYnRhaW5pbmcgYSBjb3B5Cm9mIHRoaXMgc29mdHdhcmUgYW5kIGFzc29jaWF0ZWQgZG9jdW1lbnRhdGlvbiBmaWxlcyAodGhlICJTb2Z0d2FyZSIpLCB0byBkZWFsCmluIHRoZSBTb2Z0d2FyZSB3aXRob3V0IHJlc3RyaWN0aW9uLCBpbmNsdWRpbmcgd2l0aG91dCBsaW1pdGF0aW9uIHRoZSByaWdodHMKdG8gdXNlLCBjb3B5LCBtb2RpZnksIG1lcmdlLCBwdWJsaXNoLCBkaXN0cmlidXRlLCBzdWJsaWNlbnNlLCBhbmQvb3Igc2VsbApjb3BpZXMgb2YgdGhlIFNvZnR3YXJlLCBhbmQgdG8gcGVybWl0IHBlcnNvbnMgdG8gd2hvbSB0aGUgU29mdHdhcmUgaXMKZnVybmlzaGVkIHRvIGRvIHNvLCBzdWJqZWN0IHRvIHRoZSBmb2xsb3dpbmcgY29uZGl0aW9uczoKClRoZSBhYm92ZSBjb3B5cmlnaHQgbm90aWNlIGFuZCB0aGlzIHBlcm1pc3Npb24gbm90aWNlIHNoYWxsIGJlIGluY2x1ZGVkIGluIGFsbApjb3BpZXMgb3Igc3Vic3RhbnRpYWwgcG9ydGlvbnMgb2YgdGhlIFNvZnR3YXJlLgoKVEhFIFNPRlRXQVJFIElTIFBST1ZJREVEICJBUyBJUyIsIFdJVEhPVVQgV0FSUkFOVFkgT0YgQU5ZIEtJTkQsIEVYUFJFU1MgT1IKSU1QTElFRCwgSU5DTFVESU5HIEJVVCBOT1QgTElNSVRFRCBUTyBUSEUgV0FSUkFOVElFUyBPRiBNRVJDSEFOVEFCSUxJVFksCkZJVE5FU1MgRk9SIEEgUEFSVElDVUxBUiBQVVJQT1NFIEFORCBOT05JTkZSSU5HRU1FTlQuIElOIE5PIEVWRU5UIFNIQUxMIFRIRQpBVVRIT1JTIE9SIENPUFlSSUdIVCBIT0xERVJTIEJFIExJQUJMRSBGT1IgQU5ZIENMQUlNLCBEQU1BR0VTIE9SIE9USEVSCkxJQUJJTElUWSwgV0hFVEhFUiBJTiBBTiBBQ1RJT04gT0YgQ09OVFJBQ1QsIFRPUlQgT1IgT1RIRVJXSVNFLCBBUklTSU5HIEZST00sCk9VVCBPRiBPUiBJTiBDT05ORUNUSU9OIFdJVEggVEhFIFNPRlRXQVJFIE9SIFRIRSBVU0UgT1IgT1RIRVIgREVBTElOR1MgSU4gVEhFClNPRlRXQVJFLgo='
  >>> _encode_image(Path('LICENSE'))
  'TUlUIExpY2Vuc2UKCkNvcHlyaWdodCAoYykgT2xsYW1hCgpQZXJtaXNzaW9uIGlzIGhlcmVieSBncmFudGVkLCBmcmVlIG9mIGNoYXJnZSwgdG8gYW55IHBlcnNvbiBvYnRhaW5pbmcgYSBjb3B5Cm9mIHRoaXMgc29mdHdhcmUgYW5kIGFzc29jaWF0ZWQgZG9jdW1lbnRhdGlvbiBmaWxlcyAodGhlICJTb2Z0d2FyZSIpLCB0byBkZWFsCmluIHRoZSBTb2Z0d2FyZSB3aXRob3V0IHJlc3RyaWN0aW9uLCBpbmNsdWRpbmcgd2l0aG91dCBsaW1pdGF0aW9uIHRoZSByaWdodHMKdG8gdXNlLCBjb3B5LCBtb2RpZnksIG1lcmdlLCBwdWJsaXNoLCBkaXN0cmlidXRlLCBzdWJsaWNlbnNlLCBhbmQvb3Igc2VsbApjb3BpZXMgb2YgdGhlIFNvZnR3YXJlLCBhbmQgdG8gcGVybWl0IHBlcnNvbnMgdG8gd2hvbSB0aGUgU29mdHdhcmUgaXMKZnVybmlzaGVkIHRvIGRvIHNvLCBzdWJqZWN0IHRvIHRoZSBmb2xsb3dpbmcgY29uZGl0aW9uczoKClRoZSBhYm92ZSBjb3B5cmlnaHQgbm90aWNlIGFuZCB0aGlzIHBlcm1pc3Npb24gbm90aWNlIHNoYWxsIGJlIGluY2x1ZGVkIGluIGFsbApjb3BpZXMgb3Igc3Vic3RhbnRpYWwgcG9ydGlvbnMgb2YgdGhlIFNvZnR3YXJlLgoKVEhFIFNPRlRXQVJFIElTIFBST1ZJREVEICJBUyBJUyIsIFdJVEhPVVQgV0FSUkFOVFkgT0YgQU5ZIEtJTkQsIEVYUFJFU1MgT1IKSU1QTElFRCwgSU5DTFVESU5HIEJVVCBOT1QgTElNSVRFRCBUTyBUSEUgV0FSUkFOVElFUyBPRiBNRVJDSEFOVEFCSUxJVFksCkZJVE5FU1MgRk9SIEEgUEFSVElDVUxBUiBQVVJQT1NFIEFORCBOT05JTkZSSU5HRU1FTlQuIElOIE5PIEVWRU5UIFNIQUxMIFRIRQpBVVRIT1JTIE9SIENPUFlSSUdIVCBIT0xERVJTIEJFIExJQUJMRSBGT1IgQU5ZIENMQUlNLCBEQU1BR0VTIE9SIE9USEVSCkxJQUJJTElUWSwgV0hFVEhFUiBJTiBBTiBBQ1RJT04gT0YgQ09OVFJBQ1QsIFRPUlQgT1IgT1RIRVJXSVNFLCBBUklTSU5HIEZST00sCk9VVCBPRiBPUiBJTiBDT05ORUNUSU9OIFdJVEggVEhFIFNPRlRXQVJFIE9SIFRIRSBVU0UgT1IgT1RIRVIgREVBTElOR1MgSU4gVEhFClNPRlRXQVJFLgo='
  >>> _encode_image('YWJj')
  'YWJj'
  >>> _encode_image(b'YWJj')
  'YWJj'
  """

  if p := _as_path(image):
    return b64encode(p.read_bytes()).decode('utf-8')

  try:
    b64decode(image, validate=True)
    return image if isinstance(image, str) else image.decode('utf-8')
  except (binascii.Error, TypeError):
    ...

  if b := _as_bytesio(image):
    return b64encode(b.read()).decode('utf-8')

  raise RequestError('image must be bytes, path-like object, or file-like object')


def _as_path(s: Optional[Union[str, PathLike]]) -> Union[Path, None]:
  if isinstance(s, str) or isinstance(s, Path):
    try:
      if (p := Path(s)).exists():
        return p
    except Exception:
      ...
  return None


def _as_bytesio(s: Any) -> Union[io.BytesIO, None]:
  if isinstance(s, io.BytesIO):
    return s
  elif isinstance(s, bytes):
    return io.BytesIO(s)
  return None


def _parse_host(host: Optional[str]) -> str:
  """
  >>> _parse_host(None)
  'http://127.0.0.1:11434'
  >>> _parse_host('')
  'http://127.0.0.1:11434'
  >>> _parse_host('1.2.3.4')
  'http://1.2.3.4:11434'
  >>> _parse_host(':56789')
  'http://127.0.0.1:56789'
  >>> _parse_host('1.2.3.4:56789')
  'http://1.2.3.4:56789'
  >>> _parse_host('http://1.2.3.4')
  'http://1.2.3.4:80'
  >>> _parse_host('https://1.2.3.4')
  'https://1.2.3.4:443'
  >>> _parse_host('https://1.2.3.4:56789')
  'https://1.2.3.4:56789'
  >>> _parse_host('example.com')
  'http://example.com:11434'
  >>> _parse_host('example.com:56789')
  'http://example.com:56789'
  >>> _parse_host('http://example.com')
  'http://example.com:80'
  >>> _parse_host('https://example.com')
  'https://example.com:443'
  >>> _parse_host('https://example.com:56789')
  'https://example.com:56789'
  >>> _parse_host('example.com/')
  'http://example.com:11434'
  >>> _parse_host('example.com:56789/')
  'http://example.com:56789'
  >>> _parse_host('example.com/path')
  'http://example.com:11434/path'
  >>> _parse_host('example.com:56789/path')
  'http://example.com:56789/path'
  >>> _parse_host('https://example.com:56789/path')
  'https://example.com:56789/path'
  >>> _parse_host('example.com:56789/path/')
  'http://example.com:56789/path'
  >>> _parse_host('[0001:002:003:0004::1]')
  'http://[0001:002:003:0004::1]:11434'
  >>> _parse_host('[0001:002:003:0004::1]:56789')
  'http://[0001:002:003:0004::1]:56789'
  >>> _parse_host('http://[0001:002:003:0004::1]')
  'http://[0001:002:003:0004::1]:80'
  >>> _parse_host('https://[0001:002:003:0004::1]')
  'https://[0001:002:003:0004::1]:443'
  >>> _parse_host('https://[0001:002:003:0004::1]:56789')
  'https://[0001:002:003:0004::1]:56789'
  >>> _parse_host('[0001:002:003:0004::1]/')
  'http://[0001:002:003:0004::1]:11434'
  >>> _parse_host('[0001:002:003:0004::1]:56789/')
  'http://[0001:002:003:0004::1]:56789'
  >>> _parse_host('[0001:002:003:0004::1]/path')
  'http://[0001:002:003:0004::1]:11434/path'
  >>> _parse_host('[0001:002:003:0004::1]:56789/path')
  'http://[0001:002:003:0004::1]:56789/path'
  >>> _parse_host('https://[0001:002:003:0004::1]:56789/path')
  'https://[0001:002:003:0004::1]:56789/path'
  >>> _parse_host('[0001:002:003:0004::1]:56789/path/')
  'http://[0001:002:003:0004::1]:56789/path'
  """

  host, port = host or '', 11434
  scheme, _, hostport = host.partition('://')
  if not hostport:
    scheme, hostport = 'http', host
  elif scheme == 'http':
    port = 80
  elif scheme == 'https':
    port = 443

  split = urllib.parse.urlsplit('://'.join([scheme, hostport]))
  host = split.hostname or '127.0.0.1'
  port = split.port or port

  # Fix missing square brackets for IPv6 from urlsplit
  try:
    if isinstance(ipaddress.ip_address(host), ipaddress.IPv6Address):
      host = f'[{host}]'
  except ValueError:
    ...

  if path := split.path.strip('/'):
    return f'{scheme}://{host}:{port}/{path}'

  return f'{scheme}://{host}:{port}'