# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Code generated by the Google Gen AI SDK generator DO NOT EDIT. import io import mimetypes import os import pathlib from typing import Optional, Union from urllib.parse import urlencode from . import _common from . import _transformers as t from . import types from ._api_client import ApiClient from ._common import get_value_by_path as getv from ._common import set_value_by_path as setv from .pagers import AsyncPager, Pager def _ListFilesConfig_to_mldev( api_client: ApiClient, from_object: Union[dict, object], parent_object: dict = None, ) -> dict: to_object = {} if getv(from_object, ['http_options']) is not None: setv(to_object, ['httpOptions'], getv(from_object, ['http_options'])) if getv(from_object, ['page_size']) is not None: setv( parent_object, ['_query', 'pageSize'], getv(from_object, ['page_size']) ) if getv(from_object, ['page_token']) is not None: setv( parent_object, ['_query', 'pageToken'], getv(from_object, ['page_token']), ) return to_object def _ListFilesConfig_to_vertex( api_client: ApiClient, from_object: Union[dict, object], parent_object: dict = None, ) -> dict: to_object = {} if getv(from_object, ['http_options']) is not None: setv(to_object, ['httpOptions'], getv(from_object, ['http_options'])) if getv(from_object, ['page_size']) is not None: setv( parent_object, ['_query', 'pageSize'], getv(from_object, ['page_size']) ) if getv(from_object, ['page_token']) is not None: setv( parent_object, ['_query', 'pageToken'], getv(from_object, ['page_token']), ) return to_object def _ListFilesParameters_to_mldev( api_client: ApiClient, from_object: Union[dict, object], parent_object: dict = None, ) -> dict: to_object = {} if getv(from_object, ['config']) is not None: setv( to_object, ['config'], _ListFilesConfig_to_mldev( api_client, getv(from_object, ['config']), to_object ), ) return to_object def _ListFilesParameters_to_vertex( api_client: ApiClient, from_object: Union[dict, object], parent_object: dict = None, ) -> dict: to_object = {} if getv(from_object, ['config']) is not None: raise ValueError('config parameter is not supported in Vertex AI.') return to_object def _FileStatus_to_mldev( api_client: ApiClient, from_object: Union[dict, object], parent_object: dict = None, ) -> dict: to_object = {} if getv(from_object, ['details']) is not None: setv(to_object, ['details'], getv(from_object, ['details'])) if getv(from_object, ['message']) is not None: setv(to_object, ['message'], getv(from_object, ['message'])) if getv(from_object, ['code']) is not None: setv(to_object, ['code'], getv(from_object, ['code'])) return to_object def _FileStatus_to_vertex( api_client: ApiClient, from_object: Union[dict, object], parent_object: dict = None, ) -> dict: to_object = {} if getv(from_object, ['details']) is not None: raise ValueError('details parameter is not supported in Vertex AI.') if getv(from_object, ['message']) is not None: raise ValueError('message parameter is not supported in Vertex AI.') if getv(from_object, ['code']) is not None: raise ValueError('code parameter is not supported in Vertex AI.') return to_object def _File_to_mldev( api_client: ApiClient, from_object: Union[dict, object], parent_object: dict = None, ) -> dict: to_object = {} if getv(from_object, ['name']) is not None: setv(to_object, ['name'], getv(from_object, ['name'])) if getv(from_object, ['display_name']) is not None: setv(to_object, ['displayName'], getv(from_object, ['display_name'])) if getv(from_object, ['mime_type']) is not None: setv(to_object, ['mimeType'], getv(from_object, ['mime_type'])) if getv(from_object, ['size_bytes']) is not None: setv(to_object, ['sizeBytes'], getv(from_object, ['size_bytes'])) if getv(from_object, ['create_time']) is not None: setv(to_object, ['createTime'], getv(from_object, ['create_time'])) if getv(from_object, ['expiration_time']) is not None: setv(to_object, ['expirationTime'], getv(from_object, ['expiration_time'])) if getv(from_object, ['update_time']) is not None: setv(to_object, ['updateTime'], getv(from_object, ['update_time'])) if getv(from_object, ['sha256_hash']) is not None: setv(to_object, ['sha256Hash'], getv(from_object, ['sha256_hash'])) if getv(from_object, ['uri']) is not None: setv(to_object, ['uri'], getv(from_object, ['uri'])) if getv(from_object, ['download_uri']) is not None: setv(to_object, ['downloadUri'], getv(from_object, ['download_uri'])) if getv(from_object, ['state']) is not None: setv(to_object, ['state'], getv(from_object, ['state'])) if getv(from_object, ['source']) is not None: setv(to_object, ['source'], getv(from_object, ['source'])) if getv(from_object, ['video_metadata']) is not None: setv(to_object, ['videoMetadata'], getv(from_object, ['video_metadata'])) if getv(from_object, ['error']) is not None: setv( to_object, ['error'], _FileStatus_to_mldev( api_client, getv(from_object, ['error']), to_object ), ) return to_object def _File_to_vertex( api_client: ApiClient, from_object: Union[dict, object], parent_object: dict = None, ) -> dict: to_object = {} if getv(from_object, ['name']) is not None: raise ValueError('name parameter is not supported in Vertex AI.') if getv(from_object, ['display_name']) is not None: raise ValueError('display_name parameter is not supported in Vertex AI.') if getv(from_object, ['mime_type']) is not None: raise ValueError('mime_type parameter is not supported in Vertex AI.') if getv(from_object, ['size_bytes']) is not None: raise ValueError('size_bytes parameter is not supported in Vertex AI.') if getv(from_object, ['create_time']) is not None: raise ValueError('create_time parameter is not supported in Vertex AI.') if getv(from_object, ['expiration_time']) is not None: raise ValueError('expiration_time parameter is not supported in Vertex AI.') if getv(from_object, ['update_time']) is not None: raise ValueError('update_time parameter is not supported in Vertex AI.') if getv(from_object, ['sha256_hash']) is not None: raise ValueError('sha256_hash parameter is not supported in Vertex AI.') if getv(from_object, ['uri']) is not None: raise ValueError('uri parameter is not supported in Vertex AI.') if getv(from_object, ['download_uri']) is not None: raise ValueError('download_uri parameter is not supported in Vertex AI.') if getv(from_object, ['state']) is not None: raise ValueError('state parameter is not supported in Vertex AI.') if getv(from_object, ['source']) is not None: raise ValueError('source parameter is not supported in Vertex AI.') if getv(from_object, ['video_metadata']) is not None: raise ValueError('video_metadata parameter is not supported in Vertex AI.') if getv(from_object, ['error']) is not None: raise ValueError('error parameter is not supported in Vertex AI.') return to_object def _CreateFileConfig_to_mldev( api_client: ApiClient, from_object: Union[dict, object], parent_object: dict = None, ) -> dict: to_object = {} if getv(from_object, ['http_options']) is not None: setv(to_object, ['httpOptions'], getv(from_object, ['http_options'])) return to_object def _CreateFileConfig_to_vertex( api_client: ApiClient, from_object: Union[dict, object], parent_object: dict = None, ) -> dict: to_object = {} if getv(from_object, ['http_options']) is not None: setv(to_object, ['httpOptions'], getv(from_object, ['http_options'])) return to_object def _CreateFileParameters_to_mldev( api_client: ApiClient, from_object: Union[dict, object], parent_object: dict = None, ) -> dict: to_object = {} if getv(from_object, ['file']) is not None: setv( to_object, ['file'], _File_to_mldev(api_client, getv(from_object, ['file']), to_object), ) if getv(from_object, ['config']) is not None: setv( to_object, ['config'], _CreateFileConfig_to_mldev( api_client, getv(from_object, ['config']), to_object ), ) return to_object def _CreateFileParameters_to_vertex( api_client: ApiClient, from_object: Union[dict, object], parent_object: dict = None, ) -> dict: to_object = {} if getv(from_object, ['file']) is not None: raise ValueError('file parameter is not supported in Vertex AI.') if getv(from_object, ['config']) is not None: raise ValueError('config parameter is not supported in Vertex AI.') return to_object def _GetFileConfig_to_mldev( api_client: ApiClient, from_object: Union[dict, object], parent_object: dict = None, ) -> dict: to_object = {} if getv(from_object, ['http_options']) is not None: setv(to_object, ['httpOptions'], getv(from_object, ['http_options'])) return to_object def _GetFileConfig_to_vertex( api_client: ApiClient, from_object: Union[dict, object], parent_object: dict = None, ) -> dict: to_object = {} if getv(from_object, ['http_options']) is not None: setv(to_object, ['httpOptions'], getv(from_object, ['http_options'])) return to_object def _GetFileParameters_to_mldev( api_client: ApiClient, from_object: Union[dict, object], parent_object: dict = None, ) -> dict: to_object = {} if getv(from_object, ['name']) is not None: setv( to_object, ['_url', 'file'], t.t_file_name(api_client, getv(from_object, ['name'])), ) if getv(from_object, ['config']) is not None: setv( to_object, ['config'], _GetFileConfig_to_mldev( api_client, getv(from_object, ['config']), to_object ), ) return to_object def _GetFileParameters_to_vertex( api_client: ApiClient, from_object: Union[dict, object], parent_object: dict = None, ) -> dict: to_object = {} if getv(from_object, ['name']) is not None: raise ValueError('name parameter is not supported in Vertex AI.') if getv(from_object, ['config']) is not None: raise ValueError('config parameter is not supported in Vertex AI.') return to_object def _DeleteFileConfig_to_mldev( api_client: ApiClient, from_object: Union[dict, object], parent_object: dict = None, ) -> dict: to_object = {} if getv(from_object, ['http_options']) is not None: setv(to_object, ['httpOptions'], getv(from_object, ['http_options'])) return to_object def _DeleteFileConfig_to_vertex( api_client: ApiClient, from_object: Union[dict, object], parent_object: dict = None, ) -> dict: to_object = {} if getv(from_object, ['http_options']) is not None: setv(to_object, ['httpOptions'], getv(from_object, ['http_options'])) return to_object def _DeleteFileParameters_to_mldev( api_client: ApiClient, from_object: Union[dict, object], parent_object: dict = None, ) -> dict: to_object = {} if getv(from_object, ['name']) is not None: setv( to_object, ['_url', 'file'], t.t_file_name(api_client, getv(from_object, ['name'])), ) if getv(from_object, ['config']) is not None: setv( to_object, ['config'], _DeleteFileConfig_to_mldev( api_client, getv(from_object, ['config']), to_object ), ) return to_object def _DeleteFileParameters_to_vertex( api_client: ApiClient, from_object: Union[dict, object], parent_object: dict = None, ) -> dict: to_object = {} if getv(from_object, ['name']) is not None: raise ValueError('name parameter is not supported in Vertex AI.') if getv(from_object, ['config']) is not None: raise ValueError('config parameter is not supported in Vertex AI.') return to_object def _FileStatus_from_mldev( api_client: ApiClient, from_object: Union[dict, object], parent_object: dict = None, ) -> dict: to_object = {} if getv(from_object, ['details']) is not None: setv(to_object, ['details'], getv(from_object, ['details'])) if getv(from_object, ['message']) is not None: setv(to_object, ['message'], getv(from_object, ['message'])) if getv(from_object, ['code']) is not None: setv(to_object, ['code'], getv(from_object, ['code'])) return to_object def _FileStatus_from_vertex( api_client: ApiClient, from_object: Union[dict, object], parent_object: dict = None, ) -> dict: to_object = {} return to_object def _File_from_mldev( api_client: ApiClient, from_object: Union[dict, object], parent_object: dict = None, ) -> dict: to_object = {} if getv(from_object, ['name']) is not None: setv(to_object, ['name'], getv(from_object, ['name'])) if getv(from_object, ['displayName']) is not None: setv(to_object, ['display_name'], getv(from_object, ['displayName'])) if getv(from_object, ['mimeType']) is not None: setv(to_object, ['mime_type'], getv(from_object, ['mimeType'])) if getv(from_object, ['sizeBytes']) is not None: setv(to_object, ['size_bytes'], getv(from_object, ['sizeBytes'])) if getv(from_object, ['createTime']) is not None: setv(to_object, ['create_time'], getv(from_object, ['createTime'])) if getv(from_object, ['expirationTime']) is not None: setv(to_object, ['expiration_time'], getv(from_object, ['expirationTime'])) if getv(from_object, ['updateTime']) is not None: setv(to_object, ['update_time'], getv(from_object, ['updateTime'])) if getv(from_object, ['sha256Hash']) is not None: setv(to_object, ['sha256_hash'], getv(from_object, ['sha256Hash'])) if getv(from_object, ['uri']) is not None: setv(to_object, ['uri'], getv(from_object, ['uri'])) if getv(from_object, ['downloadUri']) is not None: setv(to_object, ['download_uri'], getv(from_object, ['downloadUri'])) if getv(from_object, ['state']) is not None: setv(to_object, ['state'], getv(from_object, ['state'])) if getv(from_object, ['source']) is not None: setv(to_object, ['source'], getv(from_object, ['source'])) if getv(from_object, ['videoMetadata']) is not None: setv(to_object, ['video_metadata'], getv(from_object, ['videoMetadata'])) if getv(from_object, ['error']) is not None: setv( to_object, ['error'], _FileStatus_from_mldev( api_client, getv(from_object, ['error']), to_object ), ) return to_object def _File_from_vertex( api_client: ApiClient, from_object: Union[dict, object], parent_object: dict = None, ) -> dict: to_object = {} return to_object def _ListFilesResponse_from_mldev( api_client: ApiClient, from_object: Union[dict, object], parent_object: dict = None, ) -> dict: to_object = {} if getv(from_object, ['nextPageToken']) is not None: setv(to_object, ['next_page_token'], getv(from_object, ['nextPageToken'])) if getv(from_object, ['files']) is not None: setv( to_object, ['files'], [ _File_from_mldev(api_client, item, to_object) for item in getv(from_object, ['files']) ], ) return to_object def _ListFilesResponse_from_vertex( api_client: ApiClient, from_object: Union[dict, object], parent_object: dict = None, ) -> dict: to_object = {} return to_object def _CreateFileResponse_from_mldev( api_client: ApiClient, from_object: Union[dict, object], parent_object: dict = None, ) -> dict: to_object = {} return to_object def _CreateFileResponse_from_vertex( api_client: ApiClient, from_object: Union[dict, object], parent_object: dict = None, ) -> dict: to_object = {} return to_object def _DeleteFileResponse_from_mldev( api_client: ApiClient, from_object: Union[dict, object], parent_object: dict = None, ) -> dict: to_object = {} return to_object def _DeleteFileResponse_from_vertex( api_client: ApiClient, from_object: Union[dict, object], parent_object: dict = None, ) -> dict: to_object = {} return to_object class Files(_common.BaseModule): def _list( self, *, config: Optional[types.ListFilesConfigOrDict] = None ) -> types.ListFilesResponse: """Lists all files from the service. Args: config (ListFilesConfig): Optional, configuration for the list method. Returns: ListFilesResponse: The response for the list method. Usage: .. code-block:: python pager = client.files.list(config={'page_size': 10}) for file in pager.page: print(file.name) """ parameter_model = types._ListFilesParameters( config=config, ) if self._api_client.vertexai: raise ValueError('This method is only supported in the default client.') else: request_dict = _ListFilesParameters_to_mldev( self._api_client, parameter_model ) path = 'files'.format_map(request_dict.get('_url')) query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. config = request_dict.pop('config', None) http_options = config.pop('httpOptions', None) if config else None request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response_dict = self._api_client.request( 'get', path, request_dict, http_options ) if self._api_client.vertexai: response_dict = _ListFilesResponse_from_vertex( self._api_client, response_dict ) else: response_dict = _ListFilesResponse_from_mldev( self._api_client, response_dict ) return_value = types.ListFilesResponse._from_response( response_dict, parameter_model ) self._api_client._verify_response(return_value) return return_value def _create( self, *, file: types.FileOrDict, config: Optional[types.CreateFileConfigOrDict] = None, ) -> types.CreateFileResponse: parameter_model = types._CreateFileParameters( file=file, config=config, ) if self._api_client.vertexai: raise ValueError('This method is only supported in the default client.') else: request_dict = _CreateFileParameters_to_mldev( self._api_client, parameter_model ) path = 'upload/v1beta/files'.format_map(request_dict.get('_url')) query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. config = request_dict.pop('config', None) http_options = config.pop('httpOptions', None) if config else None request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response_dict = self._api_client.request( 'post', path, request_dict, http_options ) if self._api_client.vertexai: response_dict = _CreateFileResponse_from_vertex( self._api_client, response_dict ) else: response_dict = _CreateFileResponse_from_mldev( self._api_client, response_dict ) return_value = types.CreateFileResponse._from_response( response_dict, parameter_model ) self._api_client._verify_response(return_value) return return_value def get( self, *, name: str, config: Optional[types.GetFileConfigOrDict] = None ) -> types.File: """Retrieves the file information from the service. Args: name (str): The name identifier for the file to retrieve. config (GetFileConfig): Optional, configuration for the get method. Returns: File: The file information. Usage: .. code-block:: python file = client.files.get(name='files/...') print(file.uri) """ parameter_model = types._GetFileParameters( name=name, config=config, ) if self._api_client.vertexai: raise ValueError('This method is only supported in the default client.') else: request_dict = _GetFileParameters_to_mldev( self._api_client, parameter_model ) path = 'files/{file}'.format_map(request_dict.get('_url')) query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. config = request_dict.pop('config', None) http_options = config.pop('httpOptions', None) if config else None request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response_dict = self._api_client.request( 'get', path, request_dict, http_options ) if self._api_client.vertexai: response_dict = _File_from_vertex(self._api_client, response_dict) else: response_dict = _File_from_mldev(self._api_client, response_dict) return_value = types.File._from_response(response_dict, parameter_model) self._api_client._verify_response(return_value) return return_value def delete( self, *, name: str, config: Optional[types.DeleteFileConfigOrDict] = None ) -> types.DeleteFileResponse: """Deletes an existing file from the service. Args: name (str): The name identifier for the file to delete. config (DeleteFileConfig): Optional, configuration for the delete method. Returns: DeleteFileResponse: The response for the delete method Usage: .. code-block:: python client.files.delete(name='files/...') """ parameter_model = types._DeleteFileParameters( name=name, config=config, ) if self._api_client.vertexai: raise ValueError('This method is only supported in the default client.') else: request_dict = _DeleteFileParameters_to_mldev( self._api_client, parameter_model ) path = 'files/{file}'.format_map(request_dict.get('_url')) query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. config = request_dict.pop('config', None) http_options = config.pop('httpOptions', None) if config else None request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response_dict = self._api_client.request( 'delete', path, request_dict, http_options ) if self._api_client.vertexai: response_dict = _DeleteFileResponse_from_vertex( self._api_client, response_dict ) else: response_dict = _DeleteFileResponse_from_mldev( self._api_client, response_dict ) return_value = types.DeleteFileResponse._from_response( response_dict, parameter_model ) self._api_client._verify_response(return_value) return return_value def upload( self, *, path: Union[str, pathlib.Path, os.PathLike, io.IOBase], config: Optional[types.UploadFileConfigOrDict] = None, ) -> types.File: """Calls the API to upload a file using a supported file service. Args: path: The path to the file or an `IOBase` object to be uploaded. If it's an IOBase object, it must be opened in blocking mode and binary mode. In other words, do not use non-blocking mode or text mode. The given stream must be seekable, that is, it must be able to call seek() on 'path'. config: Optional parameters to set `diplay_name`, `mime_type`, and `name`. """ if self._api_client.vertexai: raise ValueError( 'Vertex AI does not support creating files. You can upload files to' ' GCS files instead.' ) config_model = None if config: if isinstance(config, dict): config_model = types.UploadFileConfig(**config) else: config_model = config file = types.File( mime_type=config_model.mime_type, name=config_model.name, display_name=config_model.display_name, ) else: # if not config file = types.File() if file.name is not None and not file.name.startswith('files/'): file.name = f'files/{file.name}' if isinstance(path, io.IOBase): if file.mime_type is None: raise ValueError( 'Unknown mime type: Could not determine the mimetype for your' ' file\n please set the `mime_type` argument' ) if hasattr(path, 'mode'): if 'b' not in path.mode: raise ValueError('The file must be opened in binary mode.') offset = path.tell() path.seek(0, os.SEEK_END) file.size_bytes = path.tell() - offset path.seek(offset, os.SEEK_SET) else: fs_path = os.fspath(path) if not fs_path or not os.path.isfile(fs_path): raise FileNotFoundError(f'{path} is not a valid file path.') file.size_bytes = os.path.getsize(fs_path) if file.mime_type is None: file.mime_type, _ = mimetypes.guess_type(fs_path) if file.mime_type is None: raise ValueError( 'Unknown mime type: Could not determine the mimetype for your' ' file\n please set the `mime_type` argument' ) response = {} if config_model and config_model.http_options: http_options = config_model.http_options else: http_options = { 'api_version': '', # api-version is set in the path. 'headers': { 'Content-Type': 'application/json', 'X-Goog-Upload-Protocol': 'resumable', 'X-Goog-Upload-Command': 'start', 'X-Goog-Upload-Header-Content-Length': f'{file.size_bytes}', 'X-Goog-Upload-Header-Content-Type': f'{file.mime_type}', }, 'response_payload': response, } self._create(file=file, config={'http_options': http_options}) if ( 'headers' not in response or 'X-Goog-Upload-URL' not in response['headers'] ): raise KeyError( 'Failed to create file. Upload URL did not returned from the create' ' file request.' ) upload_url = response['headers']['X-Goog-Upload-URL'] if isinstance(path, io.IOBase): return_file = self._api_client.upload_file( path, upload_url, file.size_bytes ) else: return_file = self._api_client.upload_file( fs_path, upload_url, file.size_bytes ) return types.File._from_response( _File_from_mldev(self._api_client, return_file['file']), None ) def list( self, *, config: Optional[types.ListFilesConfigOrDict] = None ) -> Pager[types.File]: return Pager( 'files', self._list, self._list(config=config), config, ) def download( self, *, file: Union[str, types.File], config: Optional[types.DownloadFileConfigOrDict] = None, ) -> bytes: """Downloads a file's data from storage. Files created by `upload` can't be downloaded. You can tell which files are downloadable by checking the `source` or `download_uri` property. Args: file (str): A file name, uri, or file object. Identifying which file to download. config (DownloadFileConfigOrDict): Optional, configuration for the get method. Returns: File: The file data as bytes. Usage: .. code-block:: python for file client.files.list(): if file.download_uri is not None: break else: raise ValueError('No files found with a `download_uri`.') data = client.files.download(file=file) # data = client.files.download(file=file.name) # data = client.files.download(file=file.download_uri) """ if self._api_client.vertexai: raise ValueError( 'Vertex AI does not support the Files API. Use GCS files instead.' ) config_model = None if config: if isinstance(config, dict): config_model = types.DownloadFileConfig(**config) else: config_model = config if isinstance(file, types.File) and file.download_uri is None: raise ValueError( "Only generated files can be downloaded, uploaded files can't be " 'downloaded. You can tell which files are downloadable by checking ' 'the `source` or `download_uri` property.' ) name = t.t_file_name(self, file) path = f'files/{name}:download' query_params = {'alt': 'media'} path = f'{path}?{urlencode(query_params)}' http_options = None if getv(config_model, ['http_options']) is not None: http_options = getv(config_model, ['http_options']) data = self._api_client.download_file( path, http_options, ) return data class AsyncFiles(_common.BaseModule): async def _list( self, *, config: Optional[types.ListFilesConfigOrDict] = None ) -> types.ListFilesResponse: """Lists all files from the service. Args: config (ListFilesConfig): Optional, configuration for the list method. Returns: ListFilesResponse: The response for the list method. Usage: .. code-block:: python pager = client.files.list(config={'page_size': 10}) for file in pager.page: print(file.name) """ parameter_model = types._ListFilesParameters( config=config, ) if self._api_client.vertexai: raise ValueError('This method is only supported in the default client.') else: request_dict = _ListFilesParameters_to_mldev( self._api_client, parameter_model ) path = 'files'.format_map(request_dict.get('_url')) query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. config = request_dict.pop('config', None) http_options = config.pop('httpOptions', None) if config else None request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response_dict = await self._api_client.async_request( 'get', path, request_dict, http_options ) if self._api_client.vertexai: response_dict = _ListFilesResponse_from_vertex( self._api_client, response_dict ) else: response_dict = _ListFilesResponse_from_mldev( self._api_client, response_dict ) return_value = types.ListFilesResponse._from_response( response_dict, parameter_model ) self._api_client._verify_response(return_value) return return_value async def _create( self, *, file: types.FileOrDict, config: Optional[types.CreateFileConfigOrDict] = None, ) -> types.CreateFileResponse: parameter_model = types._CreateFileParameters( file=file, config=config, ) if self._api_client.vertexai: raise ValueError('This method is only supported in the default client.') else: request_dict = _CreateFileParameters_to_mldev( self._api_client, parameter_model ) path = 'upload/v1beta/files'.format_map(request_dict.get('_url')) query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. config = request_dict.pop('config', None) http_options = config.pop('httpOptions', None) if config else None request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response_dict = await self._api_client.async_request( 'post', path, request_dict, http_options ) if self._api_client.vertexai: response_dict = _CreateFileResponse_from_vertex( self._api_client, response_dict ) else: response_dict = _CreateFileResponse_from_mldev( self._api_client, response_dict ) return_value = types.CreateFileResponse._from_response( response_dict, parameter_model ) self._api_client._verify_response(return_value) return return_value async def get( self, *, name: str, config: Optional[types.GetFileConfigOrDict] = None ) -> types.File: """Retrieves the file information from the service. Args: name (str): The name identifier for the file to retrieve. config (GetFileConfig): Optional, configuration for the get method. Returns: File: The file information. Usage: .. code-block:: python file = client.files.get(name='files/...') print(file.uri) """ parameter_model = types._GetFileParameters( name=name, config=config, ) if self._api_client.vertexai: raise ValueError('This method is only supported in the default client.') else: request_dict = _GetFileParameters_to_mldev( self._api_client, parameter_model ) path = 'files/{file}'.format_map(request_dict.get('_url')) query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. config = request_dict.pop('config', None) http_options = config.pop('httpOptions', None) if config else None request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response_dict = await self._api_client.async_request( 'get', path, request_dict, http_options ) if self._api_client.vertexai: response_dict = _File_from_vertex(self._api_client, response_dict) else: response_dict = _File_from_mldev(self._api_client, response_dict) return_value = types.File._from_response(response_dict, parameter_model) self._api_client._verify_response(return_value) return return_value async def delete( self, *, name: str, config: Optional[types.DeleteFileConfigOrDict] = None ) -> types.DeleteFileResponse: """Deletes an existing file from the service. Args: name (str): The name identifier for the file to delete. config (DeleteFileConfig): Optional, configuration for the delete method. Returns: DeleteFileResponse: The response for the delete method Usage: .. code-block:: python client.files.delete(name='files/...') """ parameter_model = types._DeleteFileParameters( name=name, config=config, ) if self._api_client.vertexai: raise ValueError('This method is only supported in the default client.') else: request_dict = _DeleteFileParameters_to_mldev( self._api_client, parameter_model ) path = 'files/{file}'.format_map(request_dict.get('_url')) query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. config = request_dict.pop('config', None) http_options = config.pop('httpOptions', None) if config else None request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response_dict = await self._api_client.async_request( 'delete', path, request_dict, http_options ) if self._api_client.vertexai: response_dict = _DeleteFileResponse_from_vertex( self._api_client, response_dict ) else: response_dict = _DeleteFileResponse_from_mldev( self._api_client, response_dict ) return_value = types.DeleteFileResponse._from_response( response_dict, parameter_model ) self._api_client._verify_response(return_value) return return_value async def upload( self, *, path: Union[str, pathlib.Path, os.PathLike, io.IOBase], config: Optional[types.UploadFileConfigOrDict] = None, ) -> types.File: """Calls the API to upload a file asynchronously using a supported file service. Args: path: The path to the file or an `IOBase` object to be uploaded. If it's an IOBase object, it must be opened in blocking mode and binary mode. In other words, do not use non-blocking mode or text mode. The given stream must be seekable, that is, it must be able to call seek() on 'path'. config: Optional parameters to set `diplay_name`, `mime_type`, and `name`. """ if self._api_client.vertexai: raise ValueError( 'Vertex AI does not support creating files. You can upload files to' ' GCS files instead.' ) config_model = None if config: if isinstance(config, dict): config_model = types.UploadFileConfig(**config) else: config_model = config file = types.File( mime_type=config_model.mime_type, name=config_model.name, display_name=config_model.display_name, ) else: # if not config file = types.File() if file.name is not None and not file.name.startswith('files/'): file.name = f'files/{file.name}' if isinstance(path, io.IOBase): if file.mime_type is None: raise ValueError( 'Unknown mime type: Could not determine the mimetype for your' ' file\n please set the `mime_type` argument' ) if hasattr(path, 'mode'): if 'b' not in path.mode: raise ValueError('The file must be opened in binary mode.') offset = path.tell() path.seek(0, os.SEEK_END) file.size_bytes = path.tell() - offset path.seek(offset, os.SEEK_SET) else: fs_path = os.fspath(path) if not fs_path or not os.path.isfile(fs_path): raise FileNotFoundError(f'{path} is not a valid file path.') file.size_bytes = os.path.getsize(fs_path) if file.mime_type is None: file.mime_type, _ = mimetypes.guess_type(fs_path) if file.mime_type is None: raise ValueError( 'Unknown mime type: Could not determine the mimetype for your' ' file\n please set the `mime_type` argument' ) response = {} if config_model and config_model.http_options: http_options = config_model.http_options else: http_options = { 'api_version': '', # api-version is set in the path. 'headers': { 'Content-Type': 'application/json', 'X-Goog-Upload-Protocol': 'resumable', 'X-Goog-Upload-Command': 'start', 'X-Goog-Upload-Header-Content-Length': f'{file.size_bytes}', 'X-Goog-Upload-Header-Content-Type': f'{file.mime_type}', }, 'response_payload': response, } await self._create(file=file, config={'http_options': http_options}) if ( 'headers' not in response or 'X-Goog-Upload-URL' not in response['headers'] ): raise KeyError( 'Failed to create file. Upload URL did not returned from the create' ' file request.' ) upload_url = response['headers']['X-Goog-Upload-URL'] if isinstance(path, io.IOBase): return_file = await self._api_client.async_upload_file( path, upload_url, file.size_bytes ) else: return_file = await self._api_client.async_upload_file( fs_path, upload_url, file.size_bytes ) return types.File._from_response( _File_from_mldev(self._api_client, return_file['file']), None ) async def list( self, *, config: Optional[types.ListFilesConfigOrDict] = None ) -> AsyncPager[types.File]: return AsyncPager( 'files', self._list, await self._list(config=config), config, ) async def download( self, *, file: Union[str, types.File], config: Optional[types.DownloadFileConfigOrDict] = None, ) -> bytes: """Downloads a file's data from the file service. The Vertex-AI implementation of the API foes not include the file service. Files created by `upload` can't be downloaded. You can tell which files are downloadable by checking the `download_uri` property. Args: File (str): A file name, uri, or file object. Identifying which file to download. config (DownloadFileConfigOrDict): Optional, configuration for the get method. Returns: File: The file data as bytes. Usage: .. code-block:: python for file client.files.list(): if file.download_uri is not None: break else: raise ValueError('No files found with a `download_uri`.') data = client.files.download(file=file) # data = client.files.download(file=file.name) # data = client.files.download(file=file.uri) """ if self._api_client.vertexai: raise ValueError( 'Vertex AI does not support the Files API. Use GCS files instead.' ) config_model = None if config: if isinstance(config, dict): config_model = types.DownloadFileConfig(**config) else: config_model = config name = t.t_file_name(self, file) path = f'files/{name}:download' http_options = None if getv(config_model, ['http_options']) is not None: http_options = getv(config_model, ['http_options']) query_params = {'alt': 'media'} if query_params: path = f'{path}?{urlencode(query_params)}' data = await self._api_client.async_download_file( path, http_options, ) return data