aboutsummaryrefslogtreecommitdiff
# File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details.

from __future__ import annotations

import asyncio
from typing import Dict, List, Iterable, Optional
from typing_extensions import Union, Literal
from concurrent.futures import Future, ThreadPoolExecutor, as_completed

import httpx
import sniffio

from ... import _legacy_response
from ...types import FileChunkingStrategyParam
from ..._types import NOT_GIVEN, Body, Query, Headers, NotGiven, FileTypes
from ..._utils import (
    is_given,
    maybe_transform,
    async_maybe_transform,
)
from ..._compat import cached_property
from ..._resource import SyncAPIResource, AsyncAPIResource
from ..._response import to_streamed_response_wrapper, async_to_streamed_response_wrapper
from ...pagination import SyncCursorPage, AsyncCursorPage
from ..._base_client import AsyncPaginator, make_request_options
from ...types.file_object import FileObject
from ...types.vector_stores import file_batch_create_params, file_batch_list_files_params
from ...types.file_chunking_strategy_param import FileChunkingStrategyParam
from ...types.vector_stores.vector_store_file import VectorStoreFile
from ...types.vector_stores.vector_store_file_batch import VectorStoreFileBatch

__all__ = ["FileBatches", "AsyncFileBatches"]


class FileBatches(SyncAPIResource):
    @cached_property
    def with_raw_response(self) -> FileBatchesWithRawResponse:
        """
        This property can be used as a prefix for any HTTP method call to return
        the raw response object instead of the parsed content.

        For more information, see https://www.github.com/openai/openai-python#accessing-raw-response-data-eg-headers
        """
        return FileBatchesWithRawResponse(self)

    @cached_property
    def with_streaming_response(self) -> FileBatchesWithStreamingResponse:
        """
        An alternative to `.with_raw_response` that doesn't eagerly read the response body.

        For more information, see https://www.github.com/openai/openai-python#with_streaming_response
        """
        return FileBatchesWithStreamingResponse(self)

    def create(
        self,
        vector_store_id: str,
        *,
        file_ids: List[str],
        attributes: Optional[Dict[str, Union[str, float, bool]]] | NotGiven = NOT_GIVEN,
        chunking_strategy: FileChunkingStrategyParam | NotGiven = NOT_GIVEN,
        # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
        # The extra values given here take precedence over values defined on the client or passed to this method.
        extra_headers: Headers | None = None,
        extra_query: Query | None = None,
        extra_body: Body | None = None,
        timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
    ) -> VectorStoreFileBatch:
        """
        Create a vector store file batch.

        Args:
          file_ids: A list of [File](https://platform.openai.com/docs/api-reference/files) IDs that
              the vector store should use. Useful for tools like `file_search` that can access
              files.

          attributes: Set of 16 key-value pairs that can be attached to an object. This can be useful
              for storing additional information about the object in a structured format, and
              querying for objects via API or the dashboard. Keys are strings with a maximum
              length of 64 characters. Values are strings with a maximum length of 512
              characters, booleans, or numbers.

          chunking_strategy: The chunking strategy used to chunk the file(s). If not set, will use the `auto`
              strategy. Only applicable if `file_ids` is non-empty.

          extra_headers: Send extra headers

          extra_query: Add additional query parameters to the request

          extra_body: Add additional JSON properties to the request

          timeout: Override the client-level default timeout for this request, in seconds
        """
        if not vector_store_id:
            raise ValueError(f"Expected a non-empty value for `vector_store_id` but received {vector_store_id!r}")
        extra_headers = {"OpenAI-Beta": "assistants=v2", **(extra_headers or {})}
        return self._post(
            f"/vector_stores/{vector_store_id}/file_batches",
            body=maybe_transform(
                {
                    "file_ids": file_ids,
                    "attributes": attributes,
                    "chunking_strategy": chunking_strategy,
                },
                file_batch_create_params.FileBatchCreateParams,
            ),
            options=make_request_options(
                extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
            ),
            cast_to=VectorStoreFileBatch,
        )

    def retrieve(
        self,
        batch_id: str,
        *,
        vector_store_id: str,
        # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
        # The extra values given here take precedence over values defined on the client or passed to this method.
        extra_headers: Headers | None = None,
        extra_query: Query | None = None,
        extra_body: Body | None = None,
        timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
    ) -> VectorStoreFileBatch:
        """
        Retrieves a vector store file batch.

        Args:
          extra_headers: Send extra headers

          extra_query: Add additional query parameters to the request

          extra_body: Add additional JSON properties to the request

          timeout: Override the client-level default timeout for this request, in seconds
        """
        if not vector_store_id:
            raise ValueError(f"Expected a non-empty value for `vector_store_id` but received {vector_store_id!r}")
        if not batch_id:
            raise ValueError(f"Expected a non-empty value for `batch_id` but received {batch_id!r}")
        extra_headers = {"OpenAI-Beta": "assistants=v2", **(extra_headers or {})}
        return self._get(
            f"/vector_stores/{vector_store_id}/file_batches/{batch_id}",
            options=make_request_options(
                extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
            ),
            cast_to=VectorStoreFileBatch,
        )

    def cancel(
        self,
        batch_id: str,
        *,
        vector_store_id: str,
        # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
        # The extra values given here take precedence over values defined on the client or passed to this method.
        extra_headers: Headers | None = None,
        extra_query: Query | None = None,
        extra_body: Body | None = None,
        timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
    ) -> VectorStoreFileBatch:
        """Cancel a vector store file batch.

        This attempts to cancel the processing of
        files in this batch as soon as possible.

        Args:
          extra_headers: Send extra headers

          extra_query: Add additional query parameters to the request

          extra_body: Add additional JSON properties to the request

          timeout: Override the client-level default timeout for this request, in seconds
        """
        if not vector_store_id:
            raise ValueError(f"Expected a non-empty value for `vector_store_id` but received {vector_store_id!r}")
        if not batch_id:
            raise ValueError(f"Expected a non-empty value for `batch_id` but received {batch_id!r}")
        extra_headers = {"OpenAI-Beta": "assistants=v2", **(extra_headers or {})}
        return self._post(
            f"/vector_stores/{vector_store_id}/file_batches/{batch_id}/cancel",
            options=make_request_options(
                extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
            ),
            cast_to=VectorStoreFileBatch,
        )

    def create_and_poll(
        self,
        vector_store_id: str,
        *,
        file_ids: List[str],
        poll_interval_ms: int | NotGiven = NOT_GIVEN,
        chunking_strategy: FileChunkingStrategyParam | NotGiven = NOT_GIVEN,
    ) -> VectorStoreFileBatch:
        """Create a vector store batch and poll until all files have been processed."""
        batch = self.create(
            vector_store_id=vector_store_id,
            file_ids=file_ids,
            chunking_strategy=chunking_strategy,
        )
        # TODO: don't poll unless necessary??
        return self.poll(
            batch.id,
            vector_store_id=vector_store_id,
            poll_interval_ms=poll_interval_ms,
        )

    def list_files(
        self,
        batch_id: str,
        *,
        vector_store_id: str,
        after: str | NotGiven = NOT_GIVEN,
        before: str | NotGiven = NOT_GIVEN,
        filter: Literal["in_progress", "completed", "failed", "cancelled"] | NotGiven = NOT_GIVEN,
        limit: int | NotGiven = NOT_GIVEN,
        order: Literal["asc", "desc"] | NotGiven = NOT_GIVEN,
        # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
        # The extra values given here take precedence over values defined on the client or passed to this method.
        extra_headers: Headers | None = None,
        extra_query: Query | None = None,
        extra_body: Body | None = None,
        timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
    ) -> SyncCursorPage[VectorStoreFile]:
        """
        Returns a list of vector store files in a batch.

        Args:
          after: A cursor for use in pagination. `after` is an object ID that defines your place
              in the list. For instance, if you make a list request and receive 100 objects,
              ending with obj_foo, your subsequent call can include after=obj_foo in order to
              fetch the next page of the list.

          before: A cursor for use in pagination. `before` is an object ID that defines your place
              in the list. For instance, if you make a list request and receive 100 objects,
              starting with obj_foo, your subsequent call can include before=obj_foo in order
              to fetch the previous page of the list.

          filter: Filter by file status. One of `in_progress`, `completed`, `failed`, `cancelled`.

          limit: A limit on the number of objects to be returned. Limit can range between 1 and
              100, and the default is 20.

          order: Sort order by the `created_at` timestamp of the objects. `asc` for ascending
              order and `desc` for descending order.

          extra_headers: Send extra headers

          extra_query: Add additional query parameters to the request

          extra_body: Add additional JSON properties to the request

          timeout: Override the client-level default timeout for this request, in seconds
        """
        if not vector_store_id:
            raise ValueError(f"Expected a non-empty value for `vector_store_id` but received {vector_store_id!r}")
        if not batch_id:
            raise ValueError(f"Expected a non-empty value for `batch_id` but received {batch_id!r}")
        extra_headers = {"OpenAI-Beta": "assistants=v2", **(extra_headers or {})}
        return self._get_api_list(
            f"/vector_stores/{vector_store_id}/file_batches/{batch_id}/files",
            page=SyncCursorPage[VectorStoreFile],
            options=make_request_options(
                extra_headers=extra_headers,
                extra_query=extra_query,
                extra_body=extra_body,
                timeout=timeout,
                query=maybe_transform(
                    {
                        "after": after,
                        "before": before,
                        "filter": filter,
                        "limit": limit,
                        "order": order,
                    },
                    file_batch_list_files_params.FileBatchListFilesParams,
                ),
            ),
            model=VectorStoreFile,
        )

    def poll(
        self,
        batch_id: str,
        *,
        vector_store_id: str,
        poll_interval_ms: int | NotGiven = NOT_GIVEN,
    ) -> VectorStoreFileBatch:
        """Wait for the given file batch to be processed.

        Note: this will return even if one of the files failed to process, you need to
        check batch.file_counts.failed_count to handle this case.
        """
        headers: dict[str, str] = {"X-Stainless-Poll-Helper": "true"}
        if is_given(poll_interval_ms):
            headers["X-Stainless-Custom-Poll-Interval"] = str(poll_interval_ms)

        while True:
            response = self.with_raw_response.retrieve(
                batch_id,
                vector_store_id=vector_store_id,
                extra_headers=headers,
            )

            batch = response.parse()
            if batch.file_counts.in_progress > 0:
                if not is_given(poll_interval_ms):
                    from_header = response.headers.get("openai-poll-after-ms")
                    if from_header is not None:
                        poll_interval_ms = int(from_header)
                    else:
                        poll_interval_ms = 1000

                self._sleep(poll_interval_ms / 1000)
                continue

            return batch

    def upload_and_poll(
        self,
        vector_store_id: str,
        *,
        files: Iterable[FileTypes],
        max_concurrency: int = 5,
        file_ids: List[str] = [],
        poll_interval_ms: int | NotGiven = NOT_GIVEN,
        chunking_strategy: FileChunkingStrategyParam | NotGiven = NOT_GIVEN,
    ) -> VectorStoreFileBatch:
        """Uploads the given files concurrently and then creates a vector store file batch.

        If you've already uploaded certain files that you want to include in this batch
        then you can pass their IDs through the `file_ids` argument.

        By default, if any file upload fails then an exception will be eagerly raised.

        The number of concurrency uploads is configurable using the `max_concurrency`
        parameter.

        Note: this method only supports `asyncio` or `trio` as the backing async
        runtime.
        """
        results: list[FileObject] = []

        with ThreadPoolExecutor(max_workers=max_concurrency) as executor:
            futures: list[Future[FileObject]] = [
                executor.submit(
                    self._client.files.create,
                    file=file,
                    purpose="assistants",
                )
                for file in files
            ]

        for future in as_completed(futures):
            exc = future.exception()
            if exc:
                raise exc

            results.append(future.result())

        batch = self.create_and_poll(
            vector_store_id=vector_store_id,
            file_ids=[*file_ids, *(f.id for f in results)],
            poll_interval_ms=poll_interval_ms,
            chunking_strategy=chunking_strategy,
        )
        return batch


class AsyncFileBatches(AsyncAPIResource):
    @cached_property
    def with_raw_response(self) -> AsyncFileBatchesWithRawResponse:
        """
        This property can be used as a prefix for any HTTP method call to return
        the raw response object instead of the parsed content.

        For more information, see https://www.github.com/openai/openai-python#accessing-raw-response-data-eg-headers
        """
        return AsyncFileBatchesWithRawResponse(self)

    @cached_property
    def with_streaming_response(self) -> AsyncFileBatchesWithStreamingResponse:
        """
        An alternative to `.with_raw_response` that doesn't eagerly read the response body.

        For more information, see https://www.github.com/openai/openai-python#with_streaming_response
        """
        return AsyncFileBatchesWithStreamingResponse(self)

    async def create(
        self,
        vector_store_id: str,
        *,
        file_ids: List[str],
        attributes: Optional[Dict[str, Union[str, float, bool]]] | NotGiven = NOT_GIVEN,
        chunking_strategy: FileChunkingStrategyParam | NotGiven = NOT_GIVEN,
        # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
        # The extra values given here take precedence over values defined on the client or passed to this method.
        extra_headers: Headers | None = None,
        extra_query: Query | None = None,
        extra_body: Body | None = None,
        timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
    ) -> VectorStoreFileBatch:
        """
        Create a vector store file batch.

        Args:
          file_ids: A list of [File](https://platform.openai.com/docs/api-reference/files) IDs that
              the vector store should use. Useful for tools like `file_search` that can access
              files.

          attributes: Set of 16 key-value pairs that can be attached to an object. This can be useful
              for storing additional information about the object in a structured format, and
              querying for objects via API or the dashboard. Keys are strings with a maximum
              length of 64 characters. Values are strings with a maximum length of 512
              characters, booleans, or numbers.

          chunking_strategy: The chunking strategy used to chunk the file(s). If not set, will use the `auto`
              strategy. Only applicable if `file_ids` is non-empty.

          extra_headers: Send extra headers

          extra_query: Add additional query parameters to the request

          extra_body: Add additional JSON properties to the request

          timeout: Override the client-level default timeout for this request, in seconds
        """
        if not vector_store_id:
            raise ValueError(f"Expected a non-empty value for `vector_store_id` but received {vector_store_id!r}")
        extra_headers = {"OpenAI-Beta": "assistants=v2", **(extra_headers or {})}
        return await self._post(
            f"/vector_stores/{vector_store_id}/file_batches",
            body=await async_maybe_transform(
                {
                    "file_ids": file_ids,
                    "attributes": attributes,
                    "chunking_strategy": chunking_strategy,
                },
                file_batch_create_params.FileBatchCreateParams,
            ),
            options=make_request_options(
                extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
            ),
            cast_to=VectorStoreFileBatch,
        )

    async def retrieve(
        self,
        batch_id: str,
        *,
        vector_store_id: str,
        # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
        # The extra values given here take precedence over values defined on the client or passed to this method.
        extra_headers: Headers | None = None,
        extra_query: Query | None = None,
        extra_body: Body | None = None,
        timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
    ) -> VectorStoreFileBatch:
        """
        Retrieves a vector store file batch.

        Args:
          extra_headers: Send extra headers

          extra_query: Add additional query parameters to the request

          extra_body: Add additional JSON properties to the request

          timeout: Override the client-level default timeout for this request, in seconds
        """
        if not vector_store_id:
            raise ValueError(f"Expected a non-empty value for `vector_store_id` but received {vector_store_id!r}")
        if not batch_id:
            raise ValueError(f"Expected a non-empty value for `batch_id` but received {batch_id!r}")
        extra_headers = {"OpenAI-Beta": "assistants=v2", **(extra_headers or {})}
        return await self._get(
            f"/vector_stores/{vector_store_id}/file_batches/{batch_id}",
            options=make_request_options(
                extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
            ),
            cast_to=VectorStoreFileBatch,
        )

    async def cancel(
        self,
        batch_id: str,
        *,
        vector_store_id: str,
        # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
        # The extra values given here take precedence over values defined on the client or passed to this method.
        extra_headers: Headers | None = None,
        extra_query: Query | None = None,
        extra_body: Body | None = None,
        timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
    ) -> VectorStoreFileBatch:
        """Cancel a vector store file batch.

        This attempts to cancel the processing of
        files in this batch as soon as possible.

        Args:
          extra_headers: Send extra headers

          extra_query: Add additional query parameters to the request

          extra_body: Add additional JSON properties to the request

          timeout: Override the client-level default timeout for this request, in seconds
        """
        if not vector_store_id:
            raise ValueError(f"Expected a non-empty value for `vector_store_id` but received {vector_store_id!r}")
        if not batch_id:
            raise ValueError(f"Expected a non-empty value for `batch_id` but received {batch_id!r}")
        extra_headers = {"OpenAI-Beta": "assistants=v2", **(extra_headers or {})}
        return await self._post(
            f"/vector_stores/{vector_store_id}/file_batches/{batch_id}/cancel",
            options=make_request_options(
                extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
            ),
            cast_to=VectorStoreFileBatch,
        )

    async def create_and_poll(
        self,
        vector_store_id: str,
        *,
        file_ids: List[str],
        poll_interval_ms: int | NotGiven = NOT_GIVEN,
        chunking_strategy: FileChunkingStrategyParam | NotGiven = NOT_GIVEN,
    ) -> VectorStoreFileBatch:
        """Create a vector store batch and poll until all files have been processed."""
        batch = await self.create(
            vector_store_id=vector_store_id,
            file_ids=file_ids,
            chunking_strategy=chunking_strategy,
        )
        # TODO: don't poll unless necessary??
        return await self.poll(
            batch.id,
            vector_store_id=vector_store_id,
            poll_interval_ms=poll_interval_ms,
        )

    def list_files(
        self,
        batch_id: str,
        *,
        vector_store_id: str,
        after: str | NotGiven = NOT_GIVEN,
        before: str | NotGiven = NOT_GIVEN,
        filter: Literal["in_progress", "completed", "failed", "cancelled"] | NotGiven = NOT_GIVEN,
        limit: int | NotGiven = NOT_GIVEN,
        order: Literal["asc", "desc"] | NotGiven = NOT_GIVEN,
        # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
        # The extra values given here take precedence over values defined on the client or passed to this method.
        extra_headers: Headers | None = None,
        extra_query: Query | None = None,
        extra_body: Body | None = None,
        timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
    ) -> AsyncPaginator[VectorStoreFile, AsyncCursorPage[VectorStoreFile]]:
        """
        Returns a list of vector store files in a batch.

        Args:
          after: A cursor for use in pagination. `after` is an object ID that defines your place
              in the list. For instance, if you make a list request and receive 100 objects,
              ending with obj_foo, your subsequent call can include after=obj_foo in order to
              fetch the next page of the list.

          before: A cursor for use in pagination. `before` is an object ID that defines your place
              in the list. For instance, if you make a list request and receive 100 objects,
              starting with obj_foo, your subsequent call can include before=obj_foo in order
              to fetch the previous page of the list.

          filter: Filter by file status. One of `in_progress`, `completed`, `failed`, `cancelled`.

          limit: A limit on the number of objects to be returned. Limit can range between 1 and
              100, and the default is 20.

          order: Sort order by the `created_at` timestamp of the objects. `asc` for ascending
              order and `desc` for descending order.

          extra_headers: Send extra headers

          extra_query: Add additional query parameters to the request

          extra_body: Add additional JSON properties to the request

          timeout: Override the client-level default timeout for this request, in seconds
        """
        if not vector_store_id:
            raise ValueError(f"Expected a non-empty value for `vector_store_id` but received {vector_store_id!r}")
        if not batch_id:
            raise ValueError(f"Expected a non-empty value for `batch_id` but received {batch_id!r}")
        extra_headers = {"OpenAI-Beta": "assistants=v2", **(extra_headers or {})}
        return self._get_api_list(
            f"/vector_stores/{vector_store_id}/file_batches/{batch_id}/files",
            page=AsyncCursorPage[VectorStoreFile],
            options=make_request_options(
                extra_headers=extra_headers,
                extra_query=extra_query,
                extra_body=extra_body,
                timeout=timeout,
                query=maybe_transform(
                    {
                        "after": after,
                        "before": before,
                        "filter": filter,
                        "limit": limit,
                        "order": order,
                    },
                    file_batch_list_files_params.FileBatchListFilesParams,
                ),
            ),
            model=VectorStoreFile,
        )

    async def poll(
        self,
        batch_id: str,
        *,
        vector_store_id: str,
        poll_interval_ms: int | NotGiven = NOT_GIVEN,
    ) -> VectorStoreFileBatch:
        """Wait for the given file batch to be processed.

        Note: this will return even if one of the files failed to process, you need to
        check batch.file_counts.failed_count to handle this case.
        """
        headers: dict[str, str] = {"X-Stainless-Poll-Helper": "true"}
        if is_given(poll_interval_ms):
            headers["X-Stainless-Custom-Poll-Interval"] = str(poll_interval_ms)

        while True:
            response = await self.with_raw_response.retrieve(
                batch_id,
                vector_store_id=vector_store_id,
                extra_headers=headers,
            )

            batch = response.parse()
            if batch.file_counts.in_progress > 0:
                if not is_given(poll_interval_ms):
                    from_header = response.headers.get("openai-poll-after-ms")
                    if from_header is not None:
                        poll_interval_ms = int(from_header)
                    else:
                        poll_interval_ms = 1000

                await self._sleep(poll_interval_ms / 1000)
                continue

            return batch

    async def upload_and_poll(
        self,
        vector_store_id: str,
        *,
        files: Iterable[FileTypes],
        max_concurrency: int = 5,
        file_ids: List[str] = [],
        poll_interval_ms: int | NotGiven = NOT_GIVEN,
        chunking_strategy: FileChunkingStrategyParam | NotGiven = NOT_GIVEN,
    ) -> VectorStoreFileBatch:
        """Uploads the given files concurrently and then creates a vector store file batch.

        If you've already uploaded certain files that you want to include in this batch
        then you can pass their IDs through the `file_ids` argument.

        By default, if any file upload fails then an exception will be eagerly raised.

        The number of concurrency uploads is configurable using the `max_concurrency`
        parameter.

        Note: this method only supports `asyncio` or `trio` as the backing async
        runtime.
        """
        uploaded_files: list[FileObject] = []

        async_library = sniffio.current_async_library()

        if async_library == "asyncio":

            async def asyncio_upload_file(semaphore: asyncio.Semaphore, file: FileTypes) -> None:
                async with semaphore:
                    file_obj = await self._client.files.create(
                        file=file,
                        purpose="assistants",
                    )
                    uploaded_files.append(file_obj)

            semaphore = asyncio.Semaphore(max_concurrency)

            tasks = [asyncio_upload_file(semaphore, file) for file in files]

            await asyncio.gather(*tasks)
        elif async_library == "trio":
            # We only import if the library is being used.
            # We support Python 3.7 so are using an older version of trio that does not have type information
            import trio  # type: ignore # pyright: ignore[reportMissingTypeStubs]

            async def trio_upload_file(limiter: trio.CapacityLimiter, file: FileTypes) -> None:
                async with limiter:
                    file_obj = await self._client.files.create(
                        file=file,
                        purpose="assistants",
                    )
                    uploaded_files.append(file_obj)

            limiter = trio.CapacityLimiter(max_concurrency)

            async with trio.open_nursery() as nursery:
                for file in files:
                    nursery.start_soon(trio_upload_file, limiter, file)  # pyright: ignore [reportUnknownMemberType]
        else:
            raise RuntimeError(
                f"Async runtime {async_library} is not supported yet. Only asyncio or trio is supported",
            )

        batch = await self.create_and_poll(
            vector_store_id=vector_store_id,
            file_ids=[*file_ids, *(f.id for f in uploaded_files)],
            poll_interval_ms=poll_interval_ms,
            chunking_strategy=chunking_strategy,
        )
        return batch


class FileBatchesWithRawResponse:
    def __init__(self, file_batches: FileBatches) -> None:
        self._file_batches = file_batches

        self.create = _legacy_response.to_raw_response_wrapper(
            file_batches.create,
        )
        self.retrieve = _legacy_response.to_raw_response_wrapper(
            file_batches.retrieve,
        )
        self.cancel = _legacy_response.to_raw_response_wrapper(
            file_batches.cancel,
        )
        self.list_files = _legacy_response.to_raw_response_wrapper(
            file_batches.list_files,
        )


class AsyncFileBatchesWithRawResponse:
    def __init__(self, file_batches: AsyncFileBatches) -> None:
        self._file_batches = file_batches

        self.create = _legacy_response.async_to_raw_response_wrapper(
            file_batches.create,
        )
        self.retrieve = _legacy_response.async_to_raw_response_wrapper(
            file_batches.retrieve,
        )
        self.cancel = _legacy_response.async_to_raw_response_wrapper(
            file_batches.cancel,
        )
        self.list_files = _legacy_response.async_to_raw_response_wrapper(
            file_batches.list_files,
        )


class FileBatchesWithStreamingResponse:
    def __init__(self, file_batches: FileBatches) -> None:
        self._file_batches = file_batches

        self.create = to_streamed_response_wrapper(
            file_batches.create,
        )
        self.retrieve = to_streamed_response_wrapper(
            file_batches.retrieve,
        )
        self.cancel = to_streamed_response_wrapper(
            file_batches.cancel,
        )
        self.list_files = to_streamed_response_wrapper(
            file_batches.list_files,
        )


class AsyncFileBatchesWithStreamingResponse:
    def __init__(self, file_batches: AsyncFileBatches) -> None:
        self._file_batches = file_batches

        self.create = async_to_streamed_response_wrapper(
            file_batches.create,
        )
        self.retrieve = async_to_streamed_response_wrapper(
            file_batches.retrieve,
        )
        self.cancel = async_to_streamed_response_wrapper(
            file_batches.cancel,
        )
        self.list_files = async_to_streamed_response_wrapper(
            file_batches.list_files,
        )