diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/ollama/_client.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/ollama/_client.py | 1046 |
1 files changed, 1046 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/ollama/_client.py b/.venv/lib/python3.12/site-packages/ollama/_client.py new file mode 100644 index 00000000..ec9acb90 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/ollama/_client.py @@ -0,0 +1,1046 @@ +import ipaddress +import os +import io +import json +import httpx +import binascii +import platform +import urllib.parse +from os import PathLike +from pathlib import Path +from copy import deepcopy +from hashlib import sha256 +from base64 import b64encode, b64decode + +from typing import Any, AnyStr, Union, Optional, Sequence, Mapping, Literal, overload + +import sys + +if sys.version_info < (3, 9): + from typing import Iterator, AsyncIterator +else: + from collections.abc import Iterator, AsyncIterator + +from importlib import metadata + +try: + __version__ = metadata.version('ollama') +except metadata.PackageNotFoundError: + __version__ = '0.0.0' + +from ollama._types import Message, Options, RequestError, ResponseError, Tool + + +class BaseClient: + def __init__( + self, + client, + host: Optional[str] = None, + follow_redirects: bool = True, + timeout: Any = None, + **kwargs, + ) -> None: + """ + Creates a httpx client. Default parameters are the same as those defined in httpx + except for the following: + - `follow_redirects`: True + - `timeout`: None + `kwargs` are passed to the httpx client. + """ + + headers = kwargs.pop('headers', {}) + headers['Content-Type'] = 'application/json' + headers['Accept'] = 'application/json' + headers['User-Agent'] = f'ollama-python/{__version__} ({platform.machine()} {platform.system().lower()}) Python/{platform.python_version()}' + + self._client = client( + base_url=_parse_host(host or os.getenv('OLLAMA_HOST')), + follow_redirects=follow_redirects, + timeout=timeout, + headers=headers, + **kwargs, + ) + + +class Client(BaseClient): + def __init__(self, host: Optional[str] = None, **kwargs) -> None: + super().__init__(httpx.Client, host, **kwargs) + + def _request(self, method: str, url: str, **kwargs) -> httpx.Response: + response = self._client.request(method, url, **kwargs) + + try: + response.raise_for_status() + except httpx.HTTPStatusError as e: + raise ResponseError(e.response.text, e.response.status_code) from None + + return response + + def _stream(self, method: str, url: str, **kwargs) -> Iterator[Mapping[str, Any]]: + with self._client.stream(method, url, **kwargs) as r: + try: + r.raise_for_status() + except httpx.HTTPStatusError as e: + e.response.read() + raise ResponseError(e.response.text, e.response.status_code) from None + + for line in r.iter_lines(): + partial = json.loads(line) + if e := partial.get('error'): + raise ResponseError(e) + yield partial + + def _request_stream( + self, + *args, + stream: bool = False, + **kwargs, + ) -> Union[Mapping[str, Any], Iterator[Mapping[str, Any]]]: + return self._stream(*args, **kwargs) if stream else self._request(*args, **kwargs).json() + + @overload + def generate( + self, + model: str = '', + prompt: str = '', + suffix: str = '', + system: str = '', + template: str = '', + context: Optional[Sequence[int]] = None, + stream: Literal[False] = False, + raw: bool = False, + format: Literal['', 'json'] = '', + images: Optional[Sequence[AnyStr]] = None, + options: Optional[Options] = None, + keep_alive: Optional[Union[float, str]] = None, + ) -> Mapping[str, Any]: ... + + @overload + def generate( + self, + model: str = '', + prompt: str = '', + suffix: str = '', + system: str = '', + template: str = '', + context: Optional[Sequence[int]] = None, + stream: Literal[True] = True, + raw: bool = False, + format: Literal['', 'json'] = '', + images: Optional[Sequence[AnyStr]] = None, + options: Optional[Options] = None, + keep_alive: Optional[Union[float, str]] = None, + ) -> Iterator[Mapping[str, Any]]: ... + + def generate( + self, + model: str = '', + prompt: str = '', + suffix: str = '', + system: str = '', + template: str = '', + context: Optional[Sequence[int]] = None, + stream: bool = False, + raw: bool = False, + format: Literal['', 'json'] = '', + images: Optional[Sequence[AnyStr]] = None, + options: Optional[Options] = None, + keep_alive: Optional[Union[float, str]] = None, + ) -> Union[Mapping[str, Any], Iterator[Mapping[str, Any]]]: + """ + Create a response using the requested model. + + Raises `RequestError` if a model is not provided. + + Raises `ResponseError` if the request could not be fulfilled. + + Returns `GenerateResponse` if `stream` is `False`, otherwise returns a `GenerateResponse` generator. + """ + + if not model: + raise RequestError('must provide a model') + + return self._request_stream( + 'POST', + '/api/generate', + json={ + 'model': model, + 'prompt': prompt, + 'suffix': suffix, + 'system': system, + 'template': template, + 'context': context or [], + 'stream': stream, + 'raw': raw, + 'images': [_encode_image(image) for image in images or []], + 'format': format, + 'options': options or {}, + 'keep_alive': keep_alive, + }, + stream=stream, + ) + + @overload + def chat( + self, + model: str = '', + messages: Optional[Sequence[Message]] = None, + tools: Optional[Sequence[Tool]] = None, + stream: Literal[False] = False, + format: Literal['', 'json'] = '', + options: Optional[Options] = None, + keep_alive: Optional[Union[float, str]] = None, + ) -> Mapping[str, Any]: ... + + @overload + def chat( + self, + model: str = '', + messages: Optional[Sequence[Message]] = None, + tools: Optional[Sequence[Tool]] = None, + stream: Literal[True] = True, + format: Literal['', 'json'] = '', + options: Optional[Options] = None, + keep_alive: Optional[Union[float, str]] = None, + ) -> Iterator[Mapping[str, Any]]: ... + + def chat( + self, + model: str = '', + messages: Optional[Sequence[Message]] = None, + tools: Optional[Sequence[Tool]] = None, + stream: bool = False, + format: Literal['', 'json'] = '', + options: Optional[Options] = None, + keep_alive: Optional[Union[float, str]] = None, + ) -> Union[Mapping[str, Any], Iterator[Mapping[str, Any]]]: + """ + Create a chat response using the requested model. + + Raises `RequestError` if a model is not provided. + + Raises `ResponseError` if the request could not be fulfilled. + + Returns `ChatResponse` if `stream` is `False`, otherwise returns a `ChatResponse` generator. + """ + + if not model: + raise RequestError('must provide a model') + + messages = deepcopy(messages) + + for message in messages or []: + if images := message.get('images'): + message['images'] = [_encode_image(image) for image in images] + + return self._request_stream( + 'POST', + '/api/chat', + json={ + 'model': model, + 'messages': messages, + 'tools': tools or [], + 'stream': stream, + 'format': format, + 'options': options or {}, + 'keep_alive': keep_alive, + }, + stream=stream, + ) + + def embed( + self, + model: str = '', + input: Union[str, Sequence[AnyStr]] = '', + truncate: bool = True, + options: Optional[Options] = None, + keep_alive: Optional[Union[float, str]] = None, + ) -> Mapping[str, Any]: + if not model: + raise RequestError('must provide a model') + + return self._request( + 'POST', + '/api/embed', + json={ + 'model': model, + 'input': input, + 'truncate': truncate, + 'options': options or {}, + 'keep_alive': keep_alive, + }, + ).json() + + def embeddings( + self, + model: str = '', + prompt: str = '', + options: Optional[Options] = None, + keep_alive: Optional[Union[float, str]] = None, + ) -> Mapping[str, Sequence[float]]: + return self._request( + 'POST', + '/api/embeddings', + json={ + 'model': model, + 'prompt': prompt, + 'options': options or {}, + 'keep_alive': keep_alive, + }, + ).json() + + @overload + def pull( + self, + model: str, + insecure: bool = False, + stream: Literal[False] = False, + ) -> Mapping[str, Any]: ... + + @overload + def pull( + self, + model: str, + insecure: bool = False, + stream: Literal[True] = True, + ) -> Iterator[Mapping[str, Any]]: ... + + def pull( + self, + model: str, + insecure: bool = False, + stream: bool = False, + ) -> Union[Mapping[str, Any], Iterator[Mapping[str, Any]]]: + """ + Raises `ResponseError` if the request could not be fulfilled. + + Returns `ProgressResponse` if `stream` is `False`, otherwise returns a `ProgressResponse` generator. + """ + return self._request_stream( + 'POST', + '/api/pull', + json={ + 'name': model, + 'insecure': insecure, + 'stream': stream, + }, + stream=stream, + ) + + @overload + def push( + self, + model: str, + insecure: bool = False, + stream: Literal[False] = False, + ) -> Mapping[str, Any]: ... + + @overload + def push( + self, + model: str, + insecure: bool = False, + stream: Literal[True] = True, + ) -> Iterator[Mapping[str, Any]]: ... + + def push( + self, + model: str, + insecure: bool = False, + stream: bool = False, + ) -> Union[Mapping[str, Any], Iterator[Mapping[str, Any]]]: + """ + Raises `ResponseError` if the request could not be fulfilled. + + Returns `ProgressResponse` if `stream` is `False`, otherwise returns a `ProgressResponse` generator. + """ + return self._request_stream( + 'POST', + '/api/push', + json={ + 'name': model, + 'insecure': insecure, + 'stream': stream, + }, + stream=stream, + ) + + @overload + def create( + self, + model: str, + path: Optional[Union[str, PathLike]] = None, + modelfile: Optional[str] = None, + quantize: Optional[str] = None, + stream: Literal[False] = False, + ) -> Mapping[str, Any]: ... + + @overload + def create( + self, + model: str, + path: Optional[Union[str, PathLike]] = None, + modelfile: Optional[str] = None, + quantize: Optional[str] = None, + stream: Literal[True] = True, + ) -> Iterator[Mapping[str, Any]]: ... + + def create( + self, + model: str, + path: Optional[Union[str, PathLike]] = None, + modelfile: Optional[str] = None, + quantize: Optional[str] = None, + stream: bool = False, + ) -> Union[Mapping[str, Any], Iterator[Mapping[str, Any]]]: + """ + Raises `ResponseError` if the request could not be fulfilled. + + Returns `ProgressResponse` if `stream` is `False`, otherwise returns a `ProgressResponse` generator. + """ + if (realpath := _as_path(path)) and realpath.exists(): + modelfile = self._parse_modelfile(realpath.read_text(), base=realpath.parent) + elif modelfile: + modelfile = self._parse_modelfile(modelfile) + else: + raise RequestError('must provide either path or modelfile') + + return self._request_stream( + 'POST', + '/api/create', + json={ + 'name': model, + 'modelfile': modelfile, + 'stream': stream, + 'quantize': quantize, + }, + stream=stream, + ) + + def _parse_modelfile(self, modelfile: str, base: Optional[Path] = None) -> str: + base = Path.cwd() if base is None else base + + out = io.StringIO() + for line in io.StringIO(modelfile): + command, _, args = line.partition(' ') + if command.upper() not in ['FROM', 'ADAPTER']: + print(line, end='', file=out) + continue + + path = Path(args.strip()).expanduser() + path = path if path.is_absolute() else base / path + if path.exists(): + args = f'@{self._create_blob(path)}\n' + print(command, args, end='', file=out) + + return out.getvalue() + + def _create_blob(self, path: Union[str, Path]) -> str: + sha256sum = sha256() + with open(path, 'rb') as r: + while True: + chunk = r.read(32 * 1024) + if not chunk: + break + sha256sum.update(chunk) + + digest = f'sha256:{sha256sum.hexdigest()}' + + try: + self._request('HEAD', f'/api/blobs/{digest}') + except ResponseError as e: + if e.status_code != 404: + raise + + with open(path, 'rb') as r: + self._request('POST', f'/api/blobs/{digest}', content=r) + + return digest + + def delete(self, model: str) -> Mapping[str, Any]: + response = self._request('DELETE', '/api/delete', json={'name': model}) + return {'status': 'success' if response.status_code == 200 else 'error'} + + def list(self) -> Mapping[str, Any]: + return self._request('GET', '/api/tags').json() + + def copy(self, source: str, destination: str) -> Mapping[str, Any]: + response = self._request('POST', '/api/copy', json={'source': source, 'destination': destination}) + return {'status': 'success' if response.status_code == 200 else 'error'} + + def show(self, model: str) -> Mapping[str, Any]: + return self._request('POST', '/api/show', json={'name': model}).json() + + def ps(self) -> Mapping[str, Any]: + return self._request('GET', '/api/ps').json() + + +class AsyncClient(BaseClient): + def __init__(self, host: Optional[str] = None, **kwargs) -> None: + super().__init__(httpx.AsyncClient, host, **kwargs) + + async def _request(self, method: str, url: str, **kwargs) -> httpx.Response: + response = await self._client.request(method, url, **kwargs) + + try: + response.raise_for_status() + except httpx.HTTPStatusError as e: + raise ResponseError(e.response.text, e.response.status_code) from None + + return response + + async def _stream(self, method: str, url: str, **kwargs) -> AsyncIterator[Mapping[str, Any]]: + async def inner(): + async with self._client.stream(method, url, **kwargs) as r: + try: + r.raise_for_status() + except httpx.HTTPStatusError as e: + await e.response.aread() + raise ResponseError(e.response.text, e.response.status_code) from None + + async for line in r.aiter_lines(): + partial = json.loads(line) + if e := partial.get('error'): + raise ResponseError(e) + yield partial + + return inner() + + async def _request_stream( + self, + *args, + stream: bool = False, + **kwargs, + ) -> Union[Mapping[str, Any], AsyncIterator[Mapping[str, Any]]]: + if stream: + return await self._stream(*args, **kwargs) + + response = await self._request(*args, **kwargs) + return response.json() + + @overload + async def generate( + self, + model: str = '', + prompt: str = '', + suffix: str = '', + system: str = '', + template: str = '', + context: Optional[Sequence[int]] = None, + stream: Literal[False] = False, + raw: bool = False, + format: Literal['', 'json'] = '', + images: Optional[Sequence[AnyStr]] = None, + options: Optional[Options] = None, + keep_alive: Optional[Union[float, str]] = None, + ) -> Mapping[str, Any]: ... + + @overload + async def generate( + self, + model: str = '', + prompt: str = '', + suffix: str = '', + system: str = '', + template: str = '', + context: Optional[Sequence[int]] = None, + stream: Literal[True] = True, + raw: bool = False, + format: Literal['', 'json'] = '', + images: Optional[Sequence[AnyStr]] = None, + options: Optional[Options] = None, + keep_alive: Optional[Union[float, str]] = None, + ) -> AsyncIterator[Mapping[str, Any]]: ... + + async def generate( + self, + model: str = '', + prompt: str = '', + suffix: str = '', + system: str = '', + template: str = '', + context: Optional[Sequence[int]] = None, + stream: bool = False, + raw: bool = False, + format: Literal['', 'json'] = '', + images: Optional[Sequence[AnyStr]] = None, + options: Optional[Options] = None, + keep_alive: Optional[Union[float, str]] = None, + ) -> Union[Mapping[str, Any], AsyncIterator[Mapping[str, Any]]]: + """ + Create a response using the requested model. + + Raises `RequestError` if a model is not provided. + + Raises `ResponseError` if the request could not be fulfilled. + + Returns `GenerateResponse` if `stream` is `False`, otherwise returns an asynchronous `GenerateResponse` generator. + """ + if not model: + raise RequestError('must provide a model') + + return await self._request_stream( + 'POST', + '/api/generate', + json={ + 'model': model, + 'prompt': prompt, + 'suffix': suffix, + 'system': system, + 'template': template, + 'context': context or [], + 'stream': stream, + 'raw': raw, + 'images': [_encode_image(image) for image in images or []], + 'format': format, + 'options': options or {}, + 'keep_alive': keep_alive, + }, + stream=stream, + ) + + @overload + async def chat( + self, + model: str = '', + messages: Optional[Sequence[Message]] = None, + tools: Optional[Sequence[Tool]] = None, + stream: Literal[False] = False, + format: Literal['', 'json'] = '', + options: Optional[Options] = None, + keep_alive: Optional[Union[float, str]] = None, + ) -> Mapping[str, Any]: ... + + @overload + async def chat( + self, + model: str = '', + messages: Optional[Sequence[Message]] = None, + tools: Optional[Sequence[Tool]] = None, + stream: Literal[True] = True, + format: Literal['', 'json'] = '', + options: Optional[Options] = None, + keep_alive: Optional[Union[float, str]] = None, + ) -> AsyncIterator[Mapping[str, Any]]: ... + + async def chat( + self, + model: str = '', + messages: Optional[Sequence[Message]] = None, + tools: Optional[Sequence[Tool]] = None, + stream: bool = False, + format: Literal['', 'json'] = '', + options: Optional[Options] = None, + keep_alive: Optional[Union[float, str]] = None, + ) -> Union[Mapping[str, Any], AsyncIterator[Mapping[str, Any]]]: + """ + Create a chat response using the requested model. + + Raises `RequestError` if a model is not provided. + + Raises `ResponseError` if the request could not be fulfilled. + + Returns `ChatResponse` if `stream` is `False`, otherwise returns an asynchronous `ChatResponse` generator. + """ + if not model: + raise RequestError('must provide a model') + + messages = deepcopy(messages) + + for message in messages or []: + if images := message.get('images'): + message['images'] = [_encode_image(image) for image in images] + + return await self._request_stream( + 'POST', + '/api/chat', + json={ + 'model': model, + 'messages': messages, + 'tools': tools or [], + 'stream': stream, + 'format': format, + 'options': options or {}, + 'keep_alive': keep_alive, + }, + stream=stream, + ) + + async def embed( + self, + model: str = '', + input: Union[str, Sequence[AnyStr]] = '', + truncate: bool = True, + options: Optional[Options] = None, + keep_alive: Optional[Union[float, str]] = None, + ) -> Mapping[str, Any]: + if not model: + raise RequestError('must provide a model') + + response = await self._request( + 'POST', + '/api/embed', + json={ + 'model': model, + 'input': input, + 'truncate': truncate, + 'options': options or {}, + 'keep_alive': keep_alive, + }, + ) + + return response.json() + + async def embeddings( + self, + model: str = '', + prompt: str = '', + options: Optional[Options] = None, + keep_alive: Optional[Union[float, str]] = None, + ) -> Mapping[str, Sequence[float]]: + response = await self._request( + 'POST', + '/api/embeddings', + json={ + 'model': model, + 'prompt': prompt, + 'options': options or {}, + 'keep_alive': keep_alive, + }, + ) + + return response.json() + + @overload + async def pull( + self, + model: str, + insecure: bool = False, + stream: Literal[False] = False, + ) -> Mapping[str, Any]: ... + + @overload + async def pull( + self, + model: str, + insecure: bool = False, + stream: Literal[True] = True, + ) -> AsyncIterator[Mapping[str, Any]]: ... + + async def pull( + self, + model: str, + insecure: bool = False, + stream: bool = False, + ) -> Union[Mapping[str, Any], AsyncIterator[Mapping[str, Any]]]: + """ + Raises `ResponseError` if the request could not be fulfilled. + + Returns `ProgressResponse` if `stream` is `False`, otherwise returns a `ProgressResponse` generator. + """ + return await self._request_stream( + 'POST', + '/api/pull', + json={ + 'name': model, + 'insecure': insecure, + 'stream': stream, + }, + stream=stream, + ) + + @overload + async def push( + self, + model: str, + insecure: bool = False, + stream: Literal[False] = False, + ) -> Mapping[str, Any]: ... + + @overload + async def push( + self, + model: str, + insecure: bool = False, + stream: Literal[True] = True, + ) -> AsyncIterator[Mapping[str, Any]]: ... + + async def push( + self, + model: str, + insecure: bool = False, + stream: bool = False, + ) -> Union[Mapping[str, Any], AsyncIterator[Mapping[str, Any]]]: + """ + Raises `ResponseError` if the request could not be fulfilled. + + Returns `ProgressResponse` if `stream` is `False`, otherwise returns a `ProgressResponse` generator. + """ + return await self._request_stream( + 'POST', + '/api/push', + json={ + 'name': model, + 'insecure': insecure, + 'stream': stream, + }, + stream=stream, + ) + + @overload + async def create( + self, + model: str, + path: Optional[Union[str, PathLike]] = None, + modelfile: Optional[str] = None, + quantize: Optional[str] = None, + stream: Literal[False] = False, + ) -> Mapping[str, Any]: ... + + @overload + async def create( + self, + model: str, + path: Optional[Union[str, PathLike]] = None, + modelfile: Optional[str] = None, + quantize: Optional[str] = None, + stream: Literal[True] = True, + ) -> AsyncIterator[Mapping[str, Any]]: ... + + async def create( + self, + model: str, + path: Optional[Union[str, PathLike]] = None, + modelfile: Optional[str] = None, + quantize: Optional[str] = None, + stream: bool = False, + ) -> Union[Mapping[str, Any], AsyncIterator[Mapping[str, Any]]]: + """ + Raises `ResponseError` if the request could not be fulfilled. + + Returns `ProgressResponse` if `stream` is `False`, otherwise returns a `ProgressResponse` generator. + """ + if (realpath := _as_path(path)) and realpath.exists(): + modelfile = await self._parse_modelfile(realpath.read_text(), base=realpath.parent) + elif modelfile: + modelfile = await self._parse_modelfile(modelfile) + else: + raise RequestError('must provide either path or modelfile') + + return await self._request_stream( + 'POST', + '/api/create', + json={ + 'name': model, + 'modelfile': modelfile, + 'stream': stream, + 'quantize': quantize, + }, + stream=stream, + ) + + async def _parse_modelfile(self, modelfile: str, base: Optional[Path] = None) -> str: + base = Path.cwd() if base is None else base + + out = io.StringIO() + for line in io.StringIO(modelfile): + command, _, args = line.partition(' ') + if command.upper() not in ['FROM', 'ADAPTER']: + print(line, end='', file=out) + continue + + path = Path(args.strip()).expanduser() + path = path if path.is_absolute() else base / path + if path.exists(): + args = f'@{await self._create_blob(path)}\n' + print(command, args, end='', file=out) + + return out.getvalue() + + async def _create_blob(self, path: Union[str, Path]) -> str: + sha256sum = sha256() + with open(path, 'rb') as r: + while True: + chunk = r.read(32 * 1024) + if not chunk: + break + sha256sum.update(chunk) + + digest = f'sha256:{sha256sum.hexdigest()}' + + try: + await self._request('HEAD', f'/api/blobs/{digest}') + except ResponseError as e: + if e.status_code != 404: + raise + + async def upload_bytes(): + with open(path, 'rb') as r: + while True: + chunk = r.read(32 * 1024) + if not chunk: + break + yield chunk + + await self._request('POST', f'/api/blobs/{digest}', content=upload_bytes()) + + return digest + + async def delete(self, model: str) -> Mapping[str, Any]: + response = await self._request('DELETE', '/api/delete', json={'name': model}) + return {'status': 'success' if response.status_code == 200 else 'error'} + + async def list(self) -> Mapping[str, Any]: + response = await self._request('GET', '/api/tags') + return response.json() + + async def copy(self, source: str, destination: str) -> Mapping[str, Any]: + response = await self._request('POST', '/api/copy', json={'source': source, 'destination': destination}) + return {'status': 'success' if response.status_code == 200 else 'error'} + + async def show(self, model: str) -> Mapping[str, Any]: + response = await self._request('POST', '/api/show', json={'name': model}) + return response.json() + + async def ps(self) -> Mapping[str, Any]: + response = await self._request('GET', '/api/ps') + return response.json() + + +def _encode_image(image) -> str: + """ + >>> _encode_image(b'ollama') + 'b2xsYW1h' + >>> _encode_image(io.BytesIO(b'ollama')) + 'b2xsYW1h' + >>> _encode_image('LICENSE') + 'TUlUIExpY2Vuc2UKCkNvcHlyaWdodCAoYykgT2xsYW1hCgpQZXJtaXNzaW9uIGlzIGhlcmVieSBncmFudGVkLCBmcmVlIG9mIGNoYXJnZSwgdG8gYW55IHBlcnNvbiBvYnRhaW5pbmcgYSBjb3B5Cm9mIHRoaXMgc29mdHdhcmUgYW5kIGFzc29jaWF0ZWQgZG9jdW1lbnRhdGlvbiBmaWxlcyAodGhlICJTb2Z0d2FyZSIpLCB0byBkZWFsCmluIHRoZSBTb2Z0d2FyZSB3aXRob3V0IHJlc3RyaWN0aW9uLCBpbmNsdWRpbmcgd2l0aG91dCBsaW1pdGF0aW9uIHRoZSByaWdodHMKdG8gdXNlLCBjb3B5LCBtb2RpZnksIG1lcmdlLCBwdWJsaXNoLCBkaXN0cmlidXRlLCBzdWJsaWNlbnNlLCBhbmQvb3Igc2VsbApjb3BpZXMgb2YgdGhlIFNvZnR3YXJlLCBhbmQgdG8gcGVybWl0IHBlcnNvbnMgdG8gd2hvbSB0aGUgU29mdHdhcmUgaXMKZnVybmlzaGVkIHRvIGRvIHNvLCBzdWJqZWN0IHRvIHRoZSBmb2xsb3dpbmcgY29uZGl0aW9uczoKClRoZSBhYm92ZSBjb3B5cmlnaHQgbm90aWNlIGFuZCB0aGlzIHBlcm1pc3Npb24gbm90aWNlIHNoYWxsIGJlIGluY2x1ZGVkIGluIGFsbApjb3BpZXMgb3Igc3Vic3RhbnRpYWwgcG9ydGlvbnMgb2YgdGhlIFNvZnR3YXJlLgoKVEhFIFNPRlRXQVJFIElTIFBST1ZJREVEICJBUyBJUyIsIFdJVEhPVVQgV0FSUkFOVFkgT0YgQU5ZIEtJTkQsIEVYUFJFU1MgT1IKSU1QTElFRCwgSU5DTFVESU5HIEJVVCBOT1QgTElNSVRFRCBUTyBUSEUgV0FSUkFOVElFUyBPRiBNRVJDSEFOVEFCSUxJVFksCkZJVE5FU1MgRk9SIEEgUEFSVElDVUxBUiBQVVJQT1NFIEFORCBOT05JTkZSSU5HRU1FTlQuIElOIE5PIEVWRU5UIFNIQUxMIFRIRQpBVVRIT1JTIE9SIENPUFlSSUdIVCBIT0xERVJTIEJFIExJQUJMRSBGT1IgQU5ZIENMQUlNLCBEQU1BR0VTIE9SIE9USEVSCkxJQUJJTElUWSwgV0hFVEhFUiBJTiBBTiBBQ1RJT04gT0YgQ09OVFJBQ1QsIFRPUlQgT1IgT1RIRVJXSVNFLCBBUklTSU5HIEZST00sCk9VVCBPRiBPUiBJTiBDT05ORUNUSU9OIFdJVEggVEhFIFNPRlRXQVJFIE9SIFRIRSBVU0UgT1IgT1RIRVIgREVBTElOR1MgSU4gVEhFClNPRlRXQVJFLgo=' + >>> _encode_image(Path('LICENSE')) + 'TUlUIExpY2Vuc2UKCkNvcHlyaWdodCAoYykgT2xsYW1hCgpQZXJtaXNzaW9uIGlzIGhlcmVieSBncmFudGVkLCBmcmVlIG9mIGNoYXJnZSwgdG8gYW55IHBlcnNvbiBvYnRhaW5pbmcgYSBjb3B5Cm9mIHRoaXMgc29mdHdhcmUgYW5kIGFzc29jaWF0ZWQgZG9jdW1lbnRhdGlvbiBmaWxlcyAodGhlICJTb2Z0d2FyZSIpLCB0byBkZWFsCmluIHRoZSBTb2Z0d2FyZSB3aXRob3V0IHJlc3RyaWN0aW9uLCBpbmNsdWRpbmcgd2l0aG91dCBsaW1pdGF0aW9uIHRoZSByaWdodHMKdG8gdXNlLCBjb3B5LCBtb2RpZnksIG1lcmdlLCBwdWJsaXNoLCBkaXN0cmlidXRlLCBzdWJsaWNlbnNlLCBhbmQvb3Igc2VsbApjb3BpZXMgb2YgdGhlIFNvZnR3YXJlLCBhbmQgdG8gcGVybWl0IHBlcnNvbnMgdG8gd2hvbSB0aGUgU29mdHdhcmUgaXMKZnVybmlzaGVkIHRvIGRvIHNvLCBzdWJqZWN0IHRvIHRoZSBmb2xsb3dpbmcgY29uZGl0aW9uczoKClRoZSBhYm92ZSBjb3B5cmlnaHQgbm90aWNlIGFuZCB0aGlzIHBlcm1pc3Npb24gbm90aWNlIHNoYWxsIGJlIGluY2x1ZGVkIGluIGFsbApjb3BpZXMgb3Igc3Vic3RhbnRpYWwgcG9ydGlvbnMgb2YgdGhlIFNvZnR3YXJlLgoKVEhFIFNPRlRXQVJFIElTIFBST1ZJREVEICJBUyBJUyIsIFdJVEhPVVQgV0FSUkFOVFkgT0YgQU5ZIEtJTkQsIEVYUFJFU1MgT1IKSU1QTElFRCwgSU5DTFVESU5HIEJVVCBOT1QgTElNSVRFRCBUTyBUSEUgV0FSUkFOVElFUyBPRiBNRVJDSEFOVEFCSUxJVFksCkZJVE5FU1MgRk9SIEEgUEFSVElDVUxBUiBQVVJQT1NFIEFORCBOT05JTkZSSU5HRU1FTlQuIElOIE5PIEVWRU5UIFNIQUxMIFRIRQpBVVRIT1JTIE9SIENPUFlSSUdIVCBIT0xERVJTIEJFIExJQUJMRSBGT1IgQU5ZIENMQUlNLCBEQU1BR0VTIE9SIE9USEVSCkxJQUJJTElUWSwgV0hFVEhFUiBJTiBBTiBBQ1RJT04gT0YgQ09OVFJBQ1QsIFRPUlQgT1IgT1RIRVJXSVNFLCBBUklTSU5HIEZST00sCk9VVCBPRiBPUiBJTiBDT05ORUNUSU9OIFdJVEggVEhFIFNPRlRXQVJFIE9SIFRIRSBVU0UgT1IgT1RIRVIgREVBTElOR1MgSU4gVEhFClNPRlRXQVJFLgo=' + >>> _encode_image('YWJj') + 'YWJj' + >>> _encode_image(b'YWJj') + 'YWJj' + """ + + if p := _as_path(image): + return b64encode(p.read_bytes()).decode('utf-8') + + try: + b64decode(image, validate=True) + return image if isinstance(image, str) else image.decode('utf-8') + except (binascii.Error, TypeError): + ... + + if b := _as_bytesio(image): + return b64encode(b.read()).decode('utf-8') + + raise RequestError('image must be bytes, path-like object, or file-like object') + + +def _as_path(s: Optional[Union[str, PathLike]]) -> Union[Path, None]: + if isinstance(s, str) or isinstance(s, Path): + try: + if (p := Path(s)).exists(): + return p + except Exception: + ... + return None + + +def _as_bytesio(s: Any) -> Union[io.BytesIO, None]: + if isinstance(s, io.BytesIO): + return s + elif isinstance(s, bytes): + return io.BytesIO(s) + return None + + +def _parse_host(host: Optional[str]) -> str: + """ + >>> _parse_host(None) + 'http://127.0.0.1:11434' + >>> _parse_host('') + 'http://127.0.0.1:11434' + >>> _parse_host('1.2.3.4') + 'http://1.2.3.4:11434' + >>> _parse_host(':56789') + 'http://127.0.0.1:56789' + >>> _parse_host('1.2.3.4:56789') + 'http://1.2.3.4:56789' + >>> _parse_host('http://1.2.3.4') + 'http://1.2.3.4:80' + >>> _parse_host('https://1.2.3.4') + 'https://1.2.3.4:443' + >>> _parse_host('https://1.2.3.4:56789') + 'https://1.2.3.4:56789' + >>> _parse_host('example.com') + 'http://example.com:11434' + >>> _parse_host('example.com:56789') + 'http://example.com:56789' + >>> _parse_host('http://example.com') + 'http://example.com:80' + >>> _parse_host('https://example.com') + 'https://example.com:443' + >>> _parse_host('https://example.com:56789') + 'https://example.com:56789' + >>> _parse_host('example.com/') + 'http://example.com:11434' + >>> _parse_host('example.com:56789/') + 'http://example.com:56789' + >>> _parse_host('example.com/path') + 'http://example.com:11434/path' + >>> _parse_host('example.com:56789/path') + 'http://example.com:56789/path' + >>> _parse_host('https://example.com:56789/path') + 'https://example.com:56789/path' + >>> _parse_host('example.com:56789/path/') + 'http://example.com:56789/path' + >>> _parse_host('[0001:002:003:0004::1]') + 'http://[0001:002:003:0004::1]:11434' + >>> _parse_host('[0001:002:003:0004::1]:56789') + 'http://[0001:002:003:0004::1]:56789' + >>> _parse_host('http://[0001:002:003:0004::1]') + 'http://[0001:002:003:0004::1]:80' + >>> _parse_host('https://[0001:002:003:0004::1]') + 'https://[0001:002:003:0004::1]:443' + >>> _parse_host('https://[0001:002:003:0004::1]:56789') + 'https://[0001:002:003:0004::1]:56789' + >>> _parse_host('[0001:002:003:0004::1]/') + 'http://[0001:002:003:0004::1]:11434' + >>> _parse_host('[0001:002:003:0004::1]:56789/') + 'http://[0001:002:003:0004::1]:56789' + >>> _parse_host('[0001:002:003:0004::1]/path') + 'http://[0001:002:003:0004::1]:11434/path' + >>> _parse_host('[0001:002:003:0004::1]:56789/path') + 'http://[0001:002:003:0004::1]:56789/path' + >>> _parse_host('https://[0001:002:003:0004::1]:56789/path') + 'https://[0001:002:003:0004::1]:56789/path' + >>> _parse_host('[0001:002:003:0004::1]:56789/path/') + 'http://[0001:002:003:0004::1]:56789/path' + """ + + host, port = host or '', 11434 + scheme, _, hostport = host.partition('://') + if not hostport: + scheme, hostport = 'http', host + elif scheme == 'http': + port = 80 + elif scheme == 'https': + port = 443 + + split = urllib.parse.urlsplit('://'.join([scheme, hostport])) + host = split.hostname or '127.0.0.1' + port = split.port or port + + # Fix missing square brackets for IPv6 from urlsplit + try: + if isinstance(ipaddress.ip_address(host), ipaddress.IPv6Address): + host = f'[{host}]' + except ValueError: + ... + + if path := split.path.strip('/'): + return f'{scheme}://{host}:{port}/{path}' + + return f'{scheme}://{host}:{port}' |