aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/ollama/_client.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/ollama/_client.py')
-rw-r--r--.venv/lib/python3.12/site-packages/ollama/_client.py1046
1 files changed, 1046 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/ollama/_client.py b/.venv/lib/python3.12/site-packages/ollama/_client.py
new file mode 100644
index 00000000..ec9acb90
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/ollama/_client.py
@@ -0,0 +1,1046 @@
+import ipaddress
+import os
+import io
+import json
+import httpx
+import binascii
+import platform
+import urllib.parse
+from os import PathLike
+from pathlib import Path
+from copy import deepcopy
+from hashlib import sha256
+from base64 import b64encode, b64decode
+
+from typing import Any, AnyStr, Union, Optional, Sequence, Mapping, Literal, overload
+
+import sys
+
+if sys.version_info < (3, 9):
+ from typing import Iterator, AsyncIterator
+else:
+ from collections.abc import Iterator, AsyncIterator
+
+from importlib import metadata
+
+try:
+ __version__ = metadata.version('ollama')
+except metadata.PackageNotFoundError:
+ __version__ = '0.0.0'
+
+from ollama._types import Message, Options, RequestError, ResponseError, Tool
+
+
+class BaseClient:
+ def __init__(
+ self,
+ client,
+ host: Optional[str] = None,
+ follow_redirects: bool = True,
+ timeout: Any = None,
+ **kwargs,
+ ) -> None:
+ """
+ Creates a httpx client. Default parameters are the same as those defined in httpx
+ except for the following:
+ - `follow_redirects`: True
+ - `timeout`: None
+ `kwargs` are passed to the httpx client.
+ """
+
+ headers = kwargs.pop('headers', {})
+ headers['Content-Type'] = 'application/json'
+ headers['Accept'] = 'application/json'
+ headers['User-Agent'] = f'ollama-python/{__version__} ({platform.machine()} {platform.system().lower()}) Python/{platform.python_version()}'
+
+ self._client = client(
+ base_url=_parse_host(host or os.getenv('OLLAMA_HOST')),
+ follow_redirects=follow_redirects,
+ timeout=timeout,
+ headers=headers,
+ **kwargs,
+ )
+
+
+class Client(BaseClient):
+ def __init__(self, host: Optional[str] = None, **kwargs) -> None:
+ super().__init__(httpx.Client, host, **kwargs)
+
+ def _request(self, method: str, url: str, **kwargs) -> httpx.Response:
+ response = self._client.request(method, url, **kwargs)
+
+ try:
+ response.raise_for_status()
+ except httpx.HTTPStatusError as e:
+ raise ResponseError(e.response.text, e.response.status_code) from None
+
+ return response
+
+ def _stream(self, method: str, url: str, **kwargs) -> Iterator[Mapping[str, Any]]:
+ with self._client.stream(method, url, **kwargs) as r:
+ try:
+ r.raise_for_status()
+ except httpx.HTTPStatusError as e:
+ e.response.read()
+ raise ResponseError(e.response.text, e.response.status_code) from None
+
+ for line in r.iter_lines():
+ partial = json.loads(line)
+ if e := partial.get('error'):
+ raise ResponseError(e)
+ yield partial
+
+ def _request_stream(
+ self,
+ *args,
+ stream: bool = False,
+ **kwargs,
+ ) -> Union[Mapping[str, Any], Iterator[Mapping[str, Any]]]:
+ return self._stream(*args, **kwargs) if stream else self._request(*args, **kwargs).json()
+
+ @overload
+ def generate(
+ self,
+ model: str = '',
+ prompt: str = '',
+ suffix: str = '',
+ system: str = '',
+ template: str = '',
+ context: Optional[Sequence[int]] = None,
+ stream: Literal[False] = False,
+ raw: bool = False,
+ format: Literal['', 'json'] = '',
+ images: Optional[Sequence[AnyStr]] = None,
+ options: Optional[Options] = None,
+ keep_alive: Optional[Union[float, str]] = None,
+ ) -> Mapping[str, Any]: ...
+
+ @overload
+ def generate(
+ self,
+ model: str = '',
+ prompt: str = '',
+ suffix: str = '',
+ system: str = '',
+ template: str = '',
+ context: Optional[Sequence[int]] = None,
+ stream: Literal[True] = True,
+ raw: bool = False,
+ format: Literal['', 'json'] = '',
+ images: Optional[Sequence[AnyStr]] = None,
+ options: Optional[Options] = None,
+ keep_alive: Optional[Union[float, str]] = None,
+ ) -> Iterator[Mapping[str, Any]]: ...
+
+ def generate(
+ self,
+ model: str = '',
+ prompt: str = '',
+ suffix: str = '',
+ system: str = '',
+ template: str = '',
+ context: Optional[Sequence[int]] = None,
+ stream: bool = False,
+ raw: bool = False,
+ format: Literal['', 'json'] = '',
+ images: Optional[Sequence[AnyStr]] = None,
+ options: Optional[Options] = None,
+ keep_alive: Optional[Union[float, str]] = None,
+ ) -> Union[Mapping[str, Any], Iterator[Mapping[str, Any]]]:
+ """
+ Create a response using the requested model.
+
+ Raises `RequestError` if a model is not provided.
+
+ Raises `ResponseError` if the request could not be fulfilled.
+
+ Returns `GenerateResponse` if `stream` is `False`, otherwise returns a `GenerateResponse` generator.
+ """
+
+ if not model:
+ raise RequestError('must provide a model')
+
+ return self._request_stream(
+ 'POST',
+ '/api/generate',
+ json={
+ 'model': model,
+ 'prompt': prompt,
+ 'suffix': suffix,
+ 'system': system,
+ 'template': template,
+ 'context': context or [],
+ 'stream': stream,
+ 'raw': raw,
+ 'images': [_encode_image(image) for image in images or []],
+ 'format': format,
+ 'options': options or {},
+ 'keep_alive': keep_alive,
+ },
+ stream=stream,
+ )
+
+ @overload
+ def chat(
+ self,
+ model: str = '',
+ messages: Optional[Sequence[Message]] = None,
+ tools: Optional[Sequence[Tool]] = None,
+ stream: Literal[False] = False,
+ format: Literal['', 'json'] = '',
+ options: Optional[Options] = None,
+ keep_alive: Optional[Union[float, str]] = None,
+ ) -> Mapping[str, Any]: ...
+
+ @overload
+ def chat(
+ self,
+ model: str = '',
+ messages: Optional[Sequence[Message]] = None,
+ tools: Optional[Sequence[Tool]] = None,
+ stream: Literal[True] = True,
+ format: Literal['', 'json'] = '',
+ options: Optional[Options] = None,
+ keep_alive: Optional[Union[float, str]] = None,
+ ) -> Iterator[Mapping[str, Any]]: ...
+
+ def chat(
+ self,
+ model: str = '',
+ messages: Optional[Sequence[Message]] = None,
+ tools: Optional[Sequence[Tool]] = None,
+ stream: bool = False,
+ format: Literal['', 'json'] = '',
+ options: Optional[Options] = None,
+ keep_alive: Optional[Union[float, str]] = None,
+ ) -> Union[Mapping[str, Any], Iterator[Mapping[str, Any]]]:
+ """
+ Create a chat response using the requested model.
+
+ Raises `RequestError` if a model is not provided.
+
+ Raises `ResponseError` if the request could not be fulfilled.
+
+ Returns `ChatResponse` if `stream` is `False`, otherwise returns a `ChatResponse` generator.
+ """
+
+ if not model:
+ raise RequestError('must provide a model')
+
+ messages = deepcopy(messages)
+
+ for message in messages or []:
+ if images := message.get('images'):
+ message['images'] = [_encode_image(image) for image in images]
+
+ return self._request_stream(
+ 'POST',
+ '/api/chat',
+ json={
+ 'model': model,
+ 'messages': messages,
+ 'tools': tools or [],
+ 'stream': stream,
+ 'format': format,
+ 'options': options or {},
+ 'keep_alive': keep_alive,
+ },
+ stream=stream,
+ )
+
+ def embed(
+ self,
+ model: str = '',
+ input: Union[str, Sequence[AnyStr]] = '',
+ truncate: bool = True,
+ options: Optional[Options] = None,
+ keep_alive: Optional[Union[float, str]] = None,
+ ) -> Mapping[str, Any]:
+ if not model:
+ raise RequestError('must provide a model')
+
+ return self._request(
+ 'POST',
+ '/api/embed',
+ json={
+ 'model': model,
+ 'input': input,
+ 'truncate': truncate,
+ 'options': options or {},
+ 'keep_alive': keep_alive,
+ },
+ ).json()
+
+ def embeddings(
+ self,
+ model: str = '',
+ prompt: str = '',
+ options: Optional[Options] = None,
+ keep_alive: Optional[Union[float, str]] = None,
+ ) -> Mapping[str, Sequence[float]]:
+ return self._request(
+ 'POST',
+ '/api/embeddings',
+ json={
+ 'model': model,
+ 'prompt': prompt,
+ 'options': options or {},
+ 'keep_alive': keep_alive,
+ },
+ ).json()
+
+ @overload
+ def pull(
+ self,
+ model: str,
+ insecure: bool = False,
+ stream: Literal[False] = False,
+ ) -> Mapping[str, Any]: ...
+
+ @overload
+ def pull(
+ self,
+ model: str,
+ insecure: bool = False,
+ stream: Literal[True] = True,
+ ) -> Iterator[Mapping[str, Any]]: ...
+
+ def pull(
+ self,
+ model: str,
+ insecure: bool = False,
+ stream: bool = False,
+ ) -> Union[Mapping[str, Any], Iterator[Mapping[str, Any]]]:
+ """
+ Raises `ResponseError` if the request could not be fulfilled.
+
+ Returns `ProgressResponse` if `stream` is `False`, otherwise returns a `ProgressResponse` generator.
+ """
+ return self._request_stream(
+ 'POST',
+ '/api/pull',
+ json={
+ 'name': model,
+ 'insecure': insecure,
+ 'stream': stream,
+ },
+ stream=stream,
+ )
+
+ @overload
+ def push(
+ self,
+ model: str,
+ insecure: bool = False,
+ stream: Literal[False] = False,
+ ) -> Mapping[str, Any]: ...
+
+ @overload
+ def push(
+ self,
+ model: str,
+ insecure: bool = False,
+ stream: Literal[True] = True,
+ ) -> Iterator[Mapping[str, Any]]: ...
+
+ def push(
+ self,
+ model: str,
+ insecure: bool = False,
+ stream: bool = False,
+ ) -> Union[Mapping[str, Any], Iterator[Mapping[str, Any]]]:
+ """
+ Raises `ResponseError` if the request could not be fulfilled.
+
+ Returns `ProgressResponse` if `stream` is `False`, otherwise returns a `ProgressResponse` generator.
+ """
+ return self._request_stream(
+ 'POST',
+ '/api/push',
+ json={
+ 'name': model,
+ 'insecure': insecure,
+ 'stream': stream,
+ },
+ stream=stream,
+ )
+
+ @overload
+ def create(
+ self,
+ model: str,
+ path: Optional[Union[str, PathLike]] = None,
+ modelfile: Optional[str] = None,
+ quantize: Optional[str] = None,
+ stream: Literal[False] = False,
+ ) -> Mapping[str, Any]: ...
+
+ @overload
+ def create(
+ self,
+ model: str,
+ path: Optional[Union[str, PathLike]] = None,
+ modelfile: Optional[str] = None,
+ quantize: Optional[str] = None,
+ stream: Literal[True] = True,
+ ) -> Iterator[Mapping[str, Any]]: ...
+
+ def create(
+ self,
+ model: str,
+ path: Optional[Union[str, PathLike]] = None,
+ modelfile: Optional[str] = None,
+ quantize: Optional[str] = None,
+ stream: bool = False,
+ ) -> Union[Mapping[str, Any], Iterator[Mapping[str, Any]]]:
+ """
+ Raises `ResponseError` if the request could not be fulfilled.
+
+ Returns `ProgressResponse` if `stream` is `False`, otherwise returns a `ProgressResponse` generator.
+ """
+ if (realpath := _as_path(path)) and realpath.exists():
+ modelfile = self._parse_modelfile(realpath.read_text(), base=realpath.parent)
+ elif modelfile:
+ modelfile = self._parse_modelfile(modelfile)
+ else:
+ raise RequestError('must provide either path or modelfile')
+
+ return self._request_stream(
+ 'POST',
+ '/api/create',
+ json={
+ 'name': model,
+ 'modelfile': modelfile,
+ 'stream': stream,
+ 'quantize': quantize,
+ },
+ stream=stream,
+ )
+
+ def _parse_modelfile(self, modelfile: str, base: Optional[Path] = None) -> str:
+ base = Path.cwd() if base is None else base
+
+ out = io.StringIO()
+ for line in io.StringIO(modelfile):
+ command, _, args = line.partition(' ')
+ if command.upper() not in ['FROM', 'ADAPTER']:
+ print(line, end='', file=out)
+ continue
+
+ path = Path(args.strip()).expanduser()
+ path = path if path.is_absolute() else base / path
+ if path.exists():
+ args = f'@{self._create_blob(path)}\n'
+ print(command, args, end='', file=out)
+
+ return out.getvalue()
+
+ def _create_blob(self, path: Union[str, Path]) -> str:
+ sha256sum = sha256()
+ with open(path, 'rb') as r:
+ while True:
+ chunk = r.read(32 * 1024)
+ if not chunk:
+ break
+ sha256sum.update(chunk)
+
+ digest = f'sha256:{sha256sum.hexdigest()}'
+
+ try:
+ self._request('HEAD', f'/api/blobs/{digest}')
+ except ResponseError as e:
+ if e.status_code != 404:
+ raise
+
+ with open(path, 'rb') as r:
+ self._request('POST', f'/api/blobs/{digest}', content=r)
+
+ return digest
+
+ def delete(self, model: str) -> Mapping[str, Any]:
+ response = self._request('DELETE', '/api/delete', json={'name': model})
+ return {'status': 'success' if response.status_code == 200 else 'error'}
+
+ def list(self) -> Mapping[str, Any]:
+ return self._request('GET', '/api/tags').json()
+
+ def copy(self, source: str, destination: str) -> Mapping[str, Any]:
+ response = self._request('POST', '/api/copy', json={'source': source, 'destination': destination})
+ return {'status': 'success' if response.status_code == 200 else 'error'}
+
+ def show(self, model: str) -> Mapping[str, Any]:
+ return self._request('POST', '/api/show', json={'name': model}).json()
+
+ def ps(self) -> Mapping[str, Any]:
+ return self._request('GET', '/api/ps').json()
+
+
+class AsyncClient(BaseClient):
+ def __init__(self, host: Optional[str] = None, **kwargs) -> None:
+ super().__init__(httpx.AsyncClient, host, **kwargs)
+
+ async def _request(self, method: str, url: str, **kwargs) -> httpx.Response:
+ response = await self._client.request(method, url, **kwargs)
+
+ try:
+ response.raise_for_status()
+ except httpx.HTTPStatusError as e:
+ raise ResponseError(e.response.text, e.response.status_code) from None
+
+ return response
+
+ async def _stream(self, method: str, url: str, **kwargs) -> AsyncIterator[Mapping[str, Any]]:
+ async def inner():
+ async with self._client.stream(method, url, **kwargs) as r:
+ try:
+ r.raise_for_status()
+ except httpx.HTTPStatusError as e:
+ await e.response.aread()
+ raise ResponseError(e.response.text, e.response.status_code) from None
+
+ async for line in r.aiter_lines():
+ partial = json.loads(line)
+ if e := partial.get('error'):
+ raise ResponseError(e)
+ yield partial
+
+ return inner()
+
+ async def _request_stream(
+ self,
+ *args,
+ stream: bool = False,
+ **kwargs,
+ ) -> Union[Mapping[str, Any], AsyncIterator[Mapping[str, Any]]]:
+ if stream:
+ return await self._stream(*args, **kwargs)
+
+ response = await self._request(*args, **kwargs)
+ return response.json()
+
+ @overload
+ async def generate(
+ self,
+ model: str = '',
+ prompt: str = '',
+ suffix: str = '',
+ system: str = '',
+ template: str = '',
+ context: Optional[Sequence[int]] = None,
+ stream: Literal[False] = False,
+ raw: bool = False,
+ format: Literal['', 'json'] = '',
+ images: Optional[Sequence[AnyStr]] = None,
+ options: Optional[Options] = None,
+ keep_alive: Optional[Union[float, str]] = None,
+ ) -> Mapping[str, Any]: ...
+
+ @overload
+ async def generate(
+ self,
+ model: str = '',
+ prompt: str = '',
+ suffix: str = '',
+ system: str = '',
+ template: str = '',
+ context: Optional[Sequence[int]] = None,
+ stream: Literal[True] = True,
+ raw: bool = False,
+ format: Literal['', 'json'] = '',
+ images: Optional[Sequence[AnyStr]] = None,
+ options: Optional[Options] = None,
+ keep_alive: Optional[Union[float, str]] = None,
+ ) -> AsyncIterator[Mapping[str, Any]]: ...
+
+ async def generate(
+ self,
+ model: str = '',
+ prompt: str = '',
+ suffix: str = '',
+ system: str = '',
+ template: str = '',
+ context: Optional[Sequence[int]] = None,
+ stream: bool = False,
+ raw: bool = False,
+ format: Literal['', 'json'] = '',
+ images: Optional[Sequence[AnyStr]] = None,
+ options: Optional[Options] = None,
+ keep_alive: Optional[Union[float, str]] = None,
+ ) -> Union[Mapping[str, Any], AsyncIterator[Mapping[str, Any]]]:
+ """
+ Create a response using the requested model.
+
+ Raises `RequestError` if a model is not provided.
+
+ Raises `ResponseError` if the request could not be fulfilled.
+
+ Returns `GenerateResponse` if `stream` is `False`, otherwise returns an asynchronous `GenerateResponse` generator.
+ """
+ if not model:
+ raise RequestError('must provide a model')
+
+ return await self._request_stream(
+ 'POST',
+ '/api/generate',
+ json={
+ 'model': model,
+ 'prompt': prompt,
+ 'suffix': suffix,
+ 'system': system,
+ 'template': template,
+ 'context': context or [],
+ 'stream': stream,
+ 'raw': raw,
+ 'images': [_encode_image(image) for image in images or []],
+ 'format': format,
+ 'options': options or {},
+ 'keep_alive': keep_alive,
+ },
+ stream=stream,
+ )
+
+ @overload
+ async def chat(
+ self,
+ model: str = '',
+ messages: Optional[Sequence[Message]] = None,
+ tools: Optional[Sequence[Tool]] = None,
+ stream: Literal[False] = False,
+ format: Literal['', 'json'] = '',
+ options: Optional[Options] = None,
+ keep_alive: Optional[Union[float, str]] = None,
+ ) -> Mapping[str, Any]: ...
+
+ @overload
+ async def chat(
+ self,
+ model: str = '',
+ messages: Optional[Sequence[Message]] = None,
+ tools: Optional[Sequence[Tool]] = None,
+ stream: Literal[True] = True,
+ format: Literal['', 'json'] = '',
+ options: Optional[Options] = None,
+ keep_alive: Optional[Union[float, str]] = None,
+ ) -> AsyncIterator[Mapping[str, Any]]: ...
+
+ async def chat(
+ self,
+ model: str = '',
+ messages: Optional[Sequence[Message]] = None,
+ tools: Optional[Sequence[Tool]] = None,
+ stream: bool = False,
+ format: Literal['', 'json'] = '',
+ options: Optional[Options] = None,
+ keep_alive: Optional[Union[float, str]] = None,
+ ) -> Union[Mapping[str, Any], AsyncIterator[Mapping[str, Any]]]:
+ """
+ Create a chat response using the requested model.
+
+ Raises `RequestError` if a model is not provided.
+
+ Raises `ResponseError` if the request could not be fulfilled.
+
+ Returns `ChatResponse` if `stream` is `False`, otherwise returns an asynchronous `ChatResponse` generator.
+ """
+ if not model:
+ raise RequestError('must provide a model')
+
+ messages = deepcopy(messages)
+
+ for message in messages or []:
+ if images := message.get('images'):
+ message['images'] = [_encode_image(image) for image in images]
+
+ return await self._request_stream(
+ 'POST',
+ '/api/chat',
+ json={
+ 'model': model,
+ 'messages': messages,
+ 'tools': tools or [],
+ 'stream': stream,
+ 'format': format,
+ 'options': options or {},
+ 'keep_alive': keep_alive,
+ },
+ stream=stream,
+ )
+
+ async def embed(
+ self,
+ model: str = '',
+ input: Union[str, Sequence[AnyStr]] = '',
+ truncate: bool = True,
+ options: Optional[Options] = None,
+ keep_alive: Optional[Union[float, str]] = None,
+ ) -> Mapping[str, Any]:
+ if not model:
+ raise RequestError('must provide a model')
+
+ response = await self._request(
+ 'POST',
+ '/api/embed',
+ json={
+ 'model': model,
+ 'input': input,
+ 'truncate': truncate,
+ 'options': options or {},
+ 'keep_alive': keep_alive,
+ },
+ )
+
+ return response.json()
+
+ async def embeddings(
+ self,
+ model: str = '',
+ prompt: str = '',
+ options: Optional[Options] = None,
+ keep_alive: Optional[Union[float, str]] = None,
+ ) -> Mapping[str, Sequence[float]]:
+ response = await self._request(
+ 'POST',
+ '/api/embeddings',
+ json={
+ 'model': model,
+ 'prompt': prompt,
+ 'options': options or {},
+ 'keep_alive': keep_alive,
+ },
+ )
+
+ return response.json()
+
+ @overload
+ async def pull(
+ self,
+ model: str,
+ insecure: bool = False,
+ stream: Literal[False] = False,
+ ) -> Mapping[str, Any]: ...
+
+ @overload
+ async def pull(
+ self,
+ model: str,
+ insecure: bool = False,
+ stream: Literal[True] = True,
+ ) -> AsyncIterator[Mapping[str, Any]]: ...
+
+ async def pull(
+ self,
+ model: str,
+ insecure: bool = False,
+ stream: bool = False,
+ ) -> Union[Mapping[str, Any], AsyncIterator[Mapping[str, Any]]]:
+ """
+ Raises `ResponseError` if the request could not be fulfilled.
+
+ Returns `ProgressResponse` if `stream` is `False`, otherwise returns a `ProgressResponse` generator.
+ """
+ return await self._request_stream(
+ 'POST',
+ '/api/pull',
+ json={
+ 'name': model,
+ 'insecure': insecure,
+ 'stream': stream,
+ },
+ stream=stream,
+ )
+
+ @overload
+ async def push(
+ self,
+ model: str,
+ insecure: bool = False,
+ stream: Literal[False] = False,
+ ) -> Mapping[str, Any]: ...
+
+ @overload
+ async def push(
+ self,
+ model: str,
+ insecure: bool = False,
+ stream: Literal[True] = True,
+ ) -> AsyncIterator[Mapping[str, Any]]: ...
+
+ async def push(
+ self,
+ model: str,
+ insecure: bool = False,
+ stream: bool = False,
+ ) -> Union[Mapping[str, Any], AsyncIterator[Mapping[str, Any]]]:
+ """
+ Raises `ResponseError` if the request could not be fulfilled.
+
+ Returns `ProgressResponse` if `stream` is `False`, otherwise returns a `ProgressResponse` generator.
+ """
+ return await self._request_stream(
+ 'POST',
+ '/api/push',
+ json={
+ 'name': model,
+ 'insecure': insecure,
+ 'stream': stream,
+ },
+ stream=stream,
+ )
+
+ @overload
+ async def create(
+ self,
+ model: str,
+ path: Optional[Union[str, PathLike]] = None,
+ modelfile: Optional[str] = None,
+ quantize: Optional[str] = None,
+ stream: Literal[False] = False,
+ ) -> Mapping[str, Any]: ...
+
+ @overload
+ async def create(
+ self,
+ model: str,
+ path: Optional[Union[str, PathLike]] = None,
+ modelfile: Optional[str] = None,
+ quantize: Optional[str] = None,
+ stream: Literal[True] = True,
+ ) -> AsyncIterator[Mapping[str, Any]]: ...
+
+ async def create(
+ self,
+ model: str,
+ path: Optional[Union[str, PathLike]] = None,
+ modelfile: Optional[str] = None,
+ quantize: Optional[str] = None,
+ stream: bool = False,
+ ) -> Union[Mapping[str, Any], AsyncIterator[Mapping[str, Any]]]:
+ """
+ Raises `ResponseError` if the request could not be fulfilled.
+
+ Returns `ProgressResponse` if `stream` is `False`, otherwise returns a `ProgressResponse` generator.
+ """
+ if (realpath := _as_path(path)) and realpath.exists():
+ modelfile = await self._parse_modelfile(realpath.read_text(), base=realpath.parent)
+ elif modelfile:
+ modelfile = await self._parse_modelfile(modelfile)
+ else:
+ raise RequestError('must provide either path or modelfile')
+
+ return await self._request_stream(
+ 'POST',
+ '/api/create',
+ json={
+ 'name': model,
+ 'modelfile': modelfile,
+ 'stream': stream,
+ 'quantize': quantize,
+ },
+ stream=stream,
+ )
+
+ async def _parse_modelfile(self, modelfile: str, base: Optional[Path] = None) -> str:
+ base = Path.cwd() if base is None else base
+
+ out = io.StringIO()
+ for line in io.StringIO(modelfile):
+ command, _, args = line.partition(' ')
+ if command.upper() not in ['FROM', 'ADAPTER']:
+ print(line, end='', file=out)
+ continue
+
+ path = Path(args.strip()).expanduser()
+ path = path if path.is_absolute() else base / path
+ if path.exists():
+ args = f'@{await self._create_blob(path)}\n'
+ print(command, args, end='', file=out)
+
+ return out.getvalue()
+
+ async def _create_blob(self, path: Union[str, Path]) -> str:
+ sha256sum = sha256()
+ with open(path, 'rb') as r:
+ while True:
+ chunk = r.read(32 * 1024)
+ if not chunk:
+ break
+ sha256sum.update(chunk)
+
+ digest = f'sha256:{sha256sum.hexdigest()}'
+
+ try:
+ await self._request('HEAD', f'/api/blobs/{digest}')
+ except ResponseError as e:
+ if e.status_code != 404:
+ raise
+
+ async def upload_bytes():
+ with open(path, 'rb') as r:
+ while True:
+ chunk = r.read(32 * 1024)
+ if not chunk:
+ break
+ yield chunk
+
+ await self._request('POST', f'/api/blobs/{digest}', content=upload_bytes())
+
+ return digest
+
+ async def delete(self, model: str) -> Mapping[str, Any]:
+ response = await self._request('DELETE', '/api/delete', json={'name': model})
+ return {'status': 'success' if response.status_code == 200 else 'error'}
+
+ async def list(self) -> Mapping[str, Any]:
+ response = await self._request('GET', '/api/tags')
+ return response.json()
+
+ async def copy(self, source: str, destination: str) -> Mapping[str, Any]:
+ response = await self._request('POST', '/api/copy', json={'source': source, 'destination': destination})
+ return {'status': 'success' if response.status_code == 200 else 'error'}
+
+ async def show(self, model: str) -> Mapping[str, Any]:
+ response = await self._request('POST', '/api/show', json={'name': model})
+ return response.json()
+
+ async def ps(self) -> Mapping[str, Any]:
+ response = await self._request('GET', '/api/ps')
+ return response.json()
+
+
+def _encode_image(image) -> str:
+ """
+ >>> _encode_image(b'ollama')
+ 'b2xsYW1h'
+ >>> _encode_image(io.BytesIO(b'ollama'))
+ 'b2xsYW1h'
+ >>> _encode_image('LICENSE')
+ 'TUlUIExpY2Vuc2UKCkNvcHlyaWdodCAoYykgT2xsYW1hCgpQZXJtaXNzaW9uIGlzIGhlcmVieSBncmFudGVkLCBmcmVlIG9mIGNoYXJnZSwgdG8gYW55IHBlcnNvbiBvYnRhaW5pbmcgYSBjb3B5Cm9mIHRoaXMgc29mdHdhcmUgYW5kIGFzc29jaWF0ZWQgZG9jdW1lbnRhdGlvbiBmaWxlcyAodGhlICJTb2Z0d2FyZSIpLCB0byBkZWFsCmluIHRoZSBTb2Z0d2FyZSB3aXRob3V0IHJlc3RyaWN0aW9uLCBpbmNsdWRpbmcgd2l0aG91dCBsaW1pdGF0aW9uIHRoZSByaWdodHMKdG8gdXNlLCBjb3B5LCBtb2RpZnksIG1lcmdlLCBwdWJsaXNoLCBkaXN0cmlidXRlLCBzdWJsaWNlbnNlLCBhbmQvb3Igc2VsbApjb3BpZXMgb2YgdGhlIFNvZnR3YXJlLCBhbmQgdG8gcGVybWl0IHBlcnNvbnMgdG8gd2hvbSB0aGUgU29mdHdhcmUgaXMKZnVybmlzaGVkIHRvIGRvIHNvLCBzdWJqZWN0IHRvIHRoZSBmb2xsb3dpbmcgY29uZGl0aW9uczoKClRoZSBhYm92ZSBjb3B5cmlnaHQgbm90aWNlIGFuZCB0aGlzIHBlcm1pc3Npb24gbm90aWNlIHNoYWxsIGJlIGluY2x1ZGVkIGluIGFsbApjb3BpZXMgb3Igc3Vic3RhbnRpYWwgcG9ydGlvbnMgb2YgdGhlIFNvZnR3YXJlLgoKVEhFIFNPRlRXQVJFIElTIFBST1ZJREVEICJBUyBJUyIsIFdJVEhPVVQgV0FSUkFOVFkgT0YgQU5ZIEtJTkQsIEVYUFJFU1MgT1IKSU1QTElFRCwgSU5DTFVESU5HIEJVVCBOT1QgTElNSVRFRCBUTyBUSEUgV0FSUkFOVElFUyBPRiBNRVJDSEFOVEFCSUxJVFksCkZJVE5FU1MgRk9SIEEgUEFSVElDVUxBUiBQVVJQT1NFIEFORCBOT05JTkZSSU5HRU1FTlQuIElOIE5PIEVWRU5UIFNIQUxMIFRIRQpBVVRIT1JTIE9SIENPUFlSSUdIVCBIT0xERVJTIEJFIExJQUJMRSBGT1IgQU5ZIENMQUlNLCBEQU1BR0VTIE9SIE9USEVSCkxJQUJJTElUWSwgV0hFVEhFUiBJTiBBTiBBQ1RJT04gT0YgQ09OVFJBQ1QsIFRPUlQgT1IgT1RIRVJXSVNFLCBBUklTSU5HIEZST00sCk9VVCBPRiBPUiBJTiBDT05ORUNUSU9OIFdJVEggVEhFIFNPRlRXQVJFIE9SIFRIRSBVU0UgT1IgT1RIRVIgREVBTElOR1MgSU4gVEhFClNPRlRXQVJFLgo='
+ >>> _encode_image(Path('LICENSE'))
+ 'TUlUIExpY2Vuc2UKCkNvcHlyaWdodCAoYykgT2xsYW1hCgpQZXJtaXNzaW9uIGlzIGhlcmVieSBncmFudGVkLCBmcmVlIG9mIGNoYXJnZSwgdG8gYW55IHBlcnNvbiBvYnRhaW5pbmcgYSBjb3B5Cm9mIHRoaXMgc29mdHdhcmUgYW5kIGFzc29jaWF0ZWQgZG9jdW1lbnRhdGlvbiBmaWxlcyAodGhlICJTb2Z0d2FyZSIpLCB0byBkZWFsCmluIHRoZSBTb2Z0d2FyZSB3aXRob3V0IHJlc3RyaWN0aW9uLCBpbmNsdWRpbmcgd2l0aG91dCBsaW1pdGF0aW9uIHRoZSByaWdodHMKdG8gdXNlLCBjb3B5LCBtb2RpZnksIG1lcmdlLCBwdWJsaXNoLCBkaXN0cmlidXRlLCBzdWJsaWNlbnNlLCBhbmQvb3Igc2VsbApjb3BpZXMgb2YgdGhlIFNvZnR3YXJlLCBhbmQgdG8gcGVybWl0IHBlcnNvbnMgdG8gd2hvbSB0aGUgU29mdHdhcmUgaXMKZnVybmlzaGVkIHRvIGRvIHNvLCBzdWJqZWN0IHRvIHRoZSBmb2xsb3dpbmcgY29uZGl0aW9uczoKClRoZSBhYm92ZSBjb3B5cmlnaHQgbm90aWNlIGFuZCB0aGlzIHBlcm1pc3Npb24gbm90aWNlIHNoYWxsIGJlIGluY2x1ZGVkIGluIGFsbApjb3BpZXMgb3Igc3Vic3RhbnRpYWwgcG9ydGlvbnMgb2YgdGhlIFNvZnR3YXJlLgoKVEhFIFNPRlRXQVJFIElTIFBST1ZJREVEICJBUyBJUyIsIFdJVEhPVVQgV0FSUkFOVFkgT0YgQU5ZIEtJTkQsIEVYUFJFU1MgT1IKSU1QTElFRCwgSU5DTFVESU5HIEJVVCBOT1QgTElNSVRFRCBUTyBUSEUgV0FSUkFOVElFUyBPRiBNRVJDSEFOVEFCSUxJVFksCkZJVE5FU1MgRk9SIEEgUEFSVElDVUxBUiBQVVJQT1NFIEFORCBOT05JTkZSSU5HRU1FTlQuIElOIE5PIEVWRU5UIFNIQUxMIFRIRQpBVVRIT1JTIE9SIENPUFlSSUdIVCBIT0xERVJTIEJFIExJQUJMRSBGT1IgQU5ZIENMQUlNLCBEQU1BR0VTIE9SIE9USEVSCkxJQUJJTElUWSwgV0hFVEhFUiBJTiBBTiBBQ1RJT04gT0YgQ09OVFJBQ1QsIFRPUlQgT1IgT1RIRVJXSVNFLCBBUklTSU5HIEZST00sCk9VVCBPRiBPUiBJTiBDT05ORUNUSU9OIFdJVEggVEhFIFNPRlRXQVJFIE9SIFRIRSBVU0UgT1IgT1RIRVIgREVBTElOR1MgSU4gVEhFClNPRlRXQVJFLgo='
+ >>> _encode_image('YWJj')
+ 'YWJj'
+ >>> _encode_image(b'YWJj')
+ 'YWJj'
+ """
+
+ if p := _as_path(image):
+ return b64encode(p.read_bytes()).decode('utf-8')
+
+ try:
+ b64decode(image, validate=True)
+ return image if isinstance(image, str) else image.decode('utf-8')
+ except (binascii.Error, TypeError):
+ ...
+
+ if b := _as_bytesio(image):
+ return b64encode(b.read()).decode('utf-8')
+
+ raise RequestError('image must be bytes, path-like object, or file-like object')
+
+
+def _as_path(s: Optional[Union[str, PathLike]]) -> Union[Path, None]:
+ if isinstance(s, str) or isinstance(s, Path):
+ try:
+ if (p := Path(s)).exists():
+ return p
+ except Exception:
+ ...
+ return None
+
+
+def _as_bytesio(s: Any) -> Union[io.BytesIO, None]:
+ if isinstance(s, io.BytesIO):
+ return s
+ elif isinstance(s, bytes):
+ return io.BytesIO(s)
+ return None
+
+
+def _parse_host(host: Optional[str]) -> str:
+ """
+ >>> _parse_host(None)
+ 'http://127.0.0.1:11434'
+ >>> _parse_host('')
+ 'http://127.0.0.1:11434'
+ >>> _parse_host('1.2.3.4')
+ 'http://1.2.3.4:11434'
+ >>> _parse_host(':56789')
+ 'http://127.0.0.1:56789'
+ >>> _parse_host('1.2.3.4:56789')
+ 'http://1.2.3.4:56789'
+ >>> _parse_host('http://1.2.3.4')
+ 'http://1.2.3.4:80'
+ >>> _parse_host('https://1.2.3.4')
+ 'https://1.2.3.4:443'
+ >>> _parse_host('https://1.2.3.4:56789')
+ 'https://1.2.3.4:56789'
+ >>> _parse_host('example.com')
+ 'http://example.com:11434'
+ >>> _parse_host('example.com:56789')
+ 'http://example.com:56789'
+ >>> _parse_host('http://example.com')
+ 'http://example.com:80'
+ >>> _parse_host('https://example.com')
+ 'https://example.com:443'
+ >>> _parse_host('https://example.com:56789')
+ 'https://example.com:56789'
+ >>> _parse_host('example.com/')
+ 'http://example.com:11434'
+ >>> _parse_host('example.com:56789/')
+ 'http://example.com:56789'
+ >>> _parse_host('example.com/path')
+ 'http://example.com:11434/path'
+ >>> _parse_host('example.com:56789/path')
+ 'http://example.com:56789/path'
+ >>> _parse_host('https://example.com:56789/path')
+ 'https://example.com:56789/path'
+ >>> _parse_host('example.com:56789/path/')
+ 'http://example.com:56789/path'
+ >>> _parse_host('[0001:002:003:0004::1]')
+ 'http://[0001:002:003:0004::1]:11434'
+ >>> _parse_host('[0001:002:003:0004::1]:56789')
+ 'http://[0001:002:003:0004::1]:56789'
+ >>> _parse_host('http://[0001:002:003:0004::1]')
+ 'http://[0001:002:003:0004::1]:80'
+ >>> _parse_host('https://[0001:002:003:0004::1]')
+ 'https://[0001:002:003:0004::1]:443'
+ >>> _parse_host('https://[0001:002:003:0004::1]:56789')
+ 'https://[0001:002:003:0004::1]:56789'
+ >>> _parse_host('[0001:002:003:0004::1]/')
+ 'http://[0001:002:003:0004::1]:11434'
+ >>> _parse_host('[0001:002:003:0004::1]:56789/')
+ 'http://[0001:002:003:0004::1]:56789'
+ >>> _parse_host('[0001:002:003:0004::1]/path')
+ 'http://[0001:002:003:0004::1]:11434/path'
+ >>> _parse_host('[0001:002:003:0004::1]:56789/path')
+ 'http://[0001:002:003:0004::1]:56789/path'
+ >>> _parse_host('https://[0001:002:003:0004::1]:56789/path')
+ 'https://[0001:002:003:0004::1]:56789/path'
+ >>> _parse_host('[0001:002:003:0004::1]:56789/path/')
+ 'http://[0001:002:003:0004::1]:56789/path'
+ """
+
+ host, port = host or '', 11434
+ scheme, _, hostport = host.partition('://')
+ if not hostport:
+ scheme, hostport = 'http', host
+ elif scheme == 'http':
+ port = 80
+ elif scheme == 'https':
+ port = 443
+
+ split = urllib.parse.urlsplit('://'.join([scheme, hostport]))
+ host = split.hostname or '127.0.0.1'
+ port = split.port or port
+
+ # Fix missing square brackets for IPv6 from urlsplit
+ try:
+ if isinstance(ipaddress.ip_address(host), ipaddress.IPv6Address):
+ host = f'[{host}]'
+ except ValueError:
+ ...
+
+ if path := split.path.strip('/'):
+ return f'{scheme}://{host}:{port}/{path}'
+
+ return f'{scheme}://{host}:{port}'