aboutsummaryrefslogtreecommitdiff
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Live client."""

import asyncio
import base64
import contextlib
import json
import logging
from typing import AsyncIterator, Optional, Sequence, Union

import google.auth
from websockets import ConnectionClosed

from . import _common
from . import _transformers as t
from . import client
from . import types
from ._api_client import ApiClient
from ._common import get_value_by_path as getv
from ._common import set_value_by_path as setv
from .models import _Content_from_mldev
from .models import _Content_from_vertex
from .models import _Content_to_mldev
from .models import _Content_to_vertex
from .models import _GenerateContentConfig_to_mldev
from .models import _GenerateContentConfig_to_vertex
from .models import _SafetySetting_to_mldev
from .models import _SafetySetting_to_vertex
from .models import _SpeechConfig_to_mldev
from .models import _SpeechConfig_to_vertex
from .models import _Tool_to_mldev
from .models import _Tool_to_vertex

try:
  from websockets.asyncio.client import ClientConnection
  from websockets.asyncio.client import connect
except ModuleNotFoundError:
  from websockets.client import ClientConnection
  from websockets.client import connect


_FUNCTION_RESPONSE_REQUIRES_ID = (
    'FunctionResponse request must have an `id` field from the'
    ' response of a ToolCall.FunctionalCalls in Google AI.'
)


class AsyncSession:
  """AsyncSession."""

  def __init__(self, api_client: client.ApiClient, websocket: ClientConnection):
    self._api_client = api_client
    self._ws = websocket

  async def send(
      self,
      *,
      input: Union[
          types.ContentListUnion,
          types.ContentListUnionDict,
          types.LiveClientContentOrDict,
          types.LiveClientRealtimeInputOrDict,
          types.LiveClientRealtimeInputOrDict,
          types.LiveClientToolResponseOrDict,
          types.FunctionResponseOrDict,
          Sequence[types.FunctionResponseOrDict],
      ],
      end_of_turn: Optional[bool] = False,
  ):
    """Send input to the model.

    The method will send the input request to the server.

    Args:
      input: The input request to the model.
      end_of_turn: Whether the input is the last message in a turn.

    Example usage:

    .. code-block:: python

      client = genai.Client(api_key=API_KEY)

      async with client.aio.live.connect(model='...') as session:
        await session.send(input='Hello world!', end_of_turn=True)
        async for message in session.receive():
          print(message)
    """
    client_message = self._parse_client_message(input, end_of_turn)
    await self._ws.send(json.dumps(client_message))

  async def receive(self) -> AsyncIterator[types.LiveServerMessage]:
    """Receive model responses from the server.

    The method will yield the model responses from the server. The returned
    responses will represent a complete model turn. When the returned message
    is function call, user must call `send` with the function response to
    continue the turn.

    Yields:
      The model responses from the server.

    Example usage:

    .. code-block:: python

      client = genai.Client(api_key=API_KEY)

      async with client.aio.live.connect(model='...') as session:
        await session.send(input='Hello world!', end_of_turn=True)
        async for message in session.receive():
          print(message)
    """
    # TODO(b/365983264) Handle intermittent issues for the user.
    while result := await self._receive():
      if result.server_content and result.server_content.turn_complete:
        yield result
        break
      yield result

  async def start_stream(
      self, *, stream: AsyncIterator[bytes], mime_type: str
  ) -> AsyncIterator[types.LiveServerMessage]:
    """start a live session from a data stream.

    The interaction terminates when the input stream is complete.
    This method will start two async tasks. One task will be used to send the
    input stream to the model and the other task will be used to receive the
    responses from the model.

    Args:
      stream: An iterator that yields the model response.
      mime_type: The MIME type of the data in the stream.

    Yields:
      The audio bytes received from the model and server response messages.

    Example usage:

    .. code-block:: python

      client = genai.Client(api_key=API_KEY)
      config = {'response_modalities': ['AUDIO']}
      async def audio_stream():
        stream = read_audio()
        for data in stream:
          yield data
      async with client.aio.live.connect(model='...') as session:
        for audio in session.start_stream(stream = audio_stream(),
        mime_type = 'audio/pcm'):
          play_audio_chunk(audio.data)
    """
    stop_event = asyncio.Event()
    # Start the send loop. When stream is complete stop_event is set.
    asyncio.create_task(self._send_loop(stream, mime_type, stop_event))
    recv_task = None
    while not stop_event.is_set():
      try:
        recv_task = asyncio.create_task(self._receive())
        await asyncio.wait(
            [
                recv_task,
                asyncio.create_task(stop_event.wait()),
            ],
            return_when=asyncio.FIRST_COMPLETED,
        )
        if recv_task.done():
          yield recv_task.result()
          # Give a chance for the send loop to process requests.
          await asyncio.sleep(10**-12)
      except ConnectionClosed:
        break
    if recv_task is not None and not recv_task.done():
      recv_task.cancel()
      # Wait for the task to finish (cancelled or not)
      try:
        await recv_task
      except asyncio.CancelledError:
        pass

  async def _receive(self) -> types.LiveServerMessage:
    parameter_model = types.LiveServerMessage()
    raw_response = await self._ws.recv(decode=False)
    if raw_response:
      try:
        response = json.loads(raw_response)
      except json.decoder.JSONDecodeError:
        raise ValueError(f'Failed to parse response: {raw_response}')
    else:
      response = {}
    if self._api_client.vertexai:
      response_dict = self._LiveServerMessage_from_vertex(response)
    else:
      response_dict = self._LiveServerMessage_from_mldev(response)

    return types.LiveServerMessage._from_response(
        response_dict, parameter_model
    )

  async def _send_loop(
      self,
      data_stream: AsyncIterator[bytes],
      mime_type: str,
      stop_event: asyncio.Event,
  ):
    async for data in data_stream:
      input = {'data': data, 'mimeType': mime_type}
      await self.send(input=input)
      # Give a chance for the receive loop to process responses.
      await asyncio.sleep(10**-12)
    # Give a chance for the receiver to process the last response.
    stop_event.set()

  def _LiveServerContent_from_mldev(
      self,
      from_object: Union[dict, object],
  ) -> dict:
    to_object = {}
    if getv(from_object, ['modelTurn']) is not None:
      setv(
          to_object,
          ['model_turn'],
          _Content_from_mldev(
              self._api_client,
              getv(from_object, ['modelTurn']),
          ),
      )
    if getv(from_object, ['turnComplete']) is not None:
      setv(to_object, ['turn_complete'], getv(from_object, ['turnComplete']))
    if getv(from_object, ['interrupted']) is not None:
      setv(to_object, ['interrupted'], getv(from_object, ['interrupted']))
    return to_object

  def _LiveToolCall_from_mldev(
      self,
      from_object: Union[dict, object],
  ) -> dict:
    to_object = {}
    if getv(from_object, ['functionCalls']) is not None:
      setv(
          to_object,
          ['function_calls'],
          getv(from_object, ['functionCalls']),
      )
    return to_object

  def _LiveToolCall_from_vertex(
      self,
      from_object: Union[dict, object],
  ) -> dict:
    to_object = {}
    if getv(from_object, ['functionCalls']) is not None:
      setv(
          to_object,
          ['function_calls'],
          getv(from_object, ['functionCalls']),
      )
    return to_object

  def _LiveServerMessage_from_mldev(
      self,
      from_object: Union[dict, object],
  ) -> dict:
    to_object = {}
    if getv(from_object, ['serverContent']) is not None:
      setv(
          to_object,
          ['server_content'],
          self._LiveServerContent_from_mldev(
              getv(from_object, ['serverContent'])
          ),
      )
    if getv(from_object, ['toolCall']) is not None:
      setv(
          to_object,
          ['tool_call'],
          self._LiveToolCall_from_mldev(getv(from_object, ['toolCall'])),
      )
    if getv(from_object, ['toolCallCancellation']) is not None:
      setv(
          to_object,
          ['tool_call_cancellation'],
          getv(from_object, ['toolCallCancellation']),
      )
    return to_object

  def _LiveServerContent_from_vertex(
      self,
      from_object: Union[dict, object],
  ) -> dict:
    to_object = {}
    if getv(from_object, ['modelTurn']) is not None:
      setv(
          to_object,
          ['model_turn'],
          _Content_from_vertex(
              self._api_client,
              getv(from_object, ['modelTurn']),
          ),
      )
    if getv(from_object, ['turnComplete']) is not None:
      setv(to_object, ['turn_complete'], getv(from_object, ['turnComplete']))
    if getv(from_object, ['interrupted']) is not None:
      setv(to_object, ['interrupted'], getv(from_object, ['interrupted']))
    return to_object

  def _LiveServerMessage_from_vertex(
      self,
      from_object: Union[dict, object],
  ) -> dict:
    to_object = {}
    if getv(from_object, ['serverContent']) is not None:
      setv(
          to_object,
          ['server_content'],
          self._LiveServerContent_from_vertex(
              getv(from_object, ['serverContent'])
          ),
      )

    if getv(from_object, ['toolCall']) is not None:
      setv(
          to_object,
          ['tool_call'],
          self._LiveToolCall_from_vertex(getv(from_object, ['toolCall'])),
      )
    if getv(from_object, ['toolCallCancellation']) is not None:
      setv(
          to_object,
          ['tool_call_cancellation'],
          getv(from_object, ['toolCallCancellation']),
      )
    return to_object

  def _parse_client_message(
      self,
      input: Union[
          types.ContentListUnion,
          types.ContentListUnionDict,
          types.LiveClientContentOrDict,
          types.LiveClientRealtimeInputOrDict,
          types.LiveClientRealtimeInputOrDict,
          types.LiveClientToolResponseOrDict,
          types.FunctionResponseOrDict,
          Sequence[types.FunctionResponseOrDict],
      ],
      end_of_turn: Optional[bool] = False,
  ) -> dict:
    if isinstance(input, str):
      input = [input]
    elif isinstance(input, dict) and 'data' in input:
      if isinstance(input['data'], bytes):
        decoded_data = base64.b64encode(input['data']).decode('utf-8')
        input['data'] = decoded_data
      input = [input]
    elif isinstance(input, types.Blob):
      input.data = base64.b64encode(input.data).decode('utf-8')
      input = [input]
    elif isinstance(input, dict) and 'name' in input and 'response' in input:
      # ToolResponse.FunctionResponse
      if not (self._api_client.vertexai) and 'id' not in input:
        raise ValueError(_FUNCTION_RESPONSE_REQUIRES_ID)
      input = [input]

    if isinstance(input, Sequence) and any(
        isinstance(c, dict) and 'name' in c and 'response' in c for c in input
    ):
      # ToolResponse.FunctionResponse
      if not (self._api_client.vertexai):
        for item in input:
          if 'id' not in item:
            raise ValueError(_FUNCTION_RESPONSE_REQUIRES_ID)
      client_message = {'tool_response': {'function_responses': input}}
    elif isinstance(input, Sequence) and any(isinstance(c, str) for c in input):
      to_object = {}
      if self._api_client.vertexai:
        contents = [
            _Content_to_vertex(self._api_client, item, to_object)
            for item in t.t_contents(self._api_client, input)
        ]
      else:
        contents = [
            _Content_to_mldev(self._api_client, item, to_object)
            for item in t.t_contents(self._api_client, input)
        ]

      client_message = {
          'client_content': {'turns': contents, 'turn_complete': end_of_turn}
      }
    elif isinstance(input, Sequence):
      if any((isinstance(b, dict) and 'data' in b) for b in input):
        pass
      elif any(isinstance(b, types.Blob) for b in input):
        input = [b.model_dump(exclude_none=True) for b in input]
      else:
        raise ValueError(
            f'Unsupported input type "{type(input)}" or input content "{input}"'
        )

      client_message = {'realtime_input': {'media_chunks': input}}

    elif isinstance(input, dict) and 'content' in input:
      # TODO(b/365983264) Add validation checks for content_update input_dict.
      client_message = {'client_content': input}
    elif isinstance(input, types.LiveClientRealtimeInput):
      client_message = {'realtime_input': input.model_dump(exclude_none=True)}
      if isinstance(
          client_message['realtime_input']['media_chunks'][0]['data'], bytes
      ):
        client_message['realtime_input']['media_chunks'] = [
            {
                'data': base64.b64encode(item['data']).decode('utf-8'),
                'mime_type': item['mime_type'],
            }
            for item in client_message['realtime_input']['media_chunks']
        ]

    elif isinstance(input, types.LiveClientContent):
      client_message = {'client_content': input.model_dump(exclude_none=True)}
    elif isinstance(input, types.LiveClientToolResponse):
      # ToolResponse.FunctionResponse
      if not (self._api_client.vertexai) and not (
          input.function_responses[0].id
      ):
        raise ValueError(_FUNCTION_RESPONSE_REQUIRES_ID)
      client_message = {'tool_response': input.model_dump(exclude_none=True)}
    elif isinstance(input, types.FunctionResponse):
      if not (self._api_client.vertexai) and not (input.id):
        raise ValueError(_FUNCTION_RESPONSE_REQUIRES_ID)
      client_message = {
          'tool_response': {
              'function_responses': [input.model_dump(exclude_none=True)]
          }
      }
    elif isinstance(input, Sequence) and isinstance(
        input[0], types.FunctionResponse
    ):
      if not (self._api_client.vertexai) and not (input[0].id):
        raise ValueError(_FUNCTION_RESPONSE_REQUIRES_ID)
      client_message = {
          'tool_response': {
              'function_responses': [
                  c.model_dump(exclude_none=True) for c in input
              ]
          }
      }
    else:
      raise ValueError(
          f'Unsupported input type "{type(input)}" or input content "{input}"'
      )

    return client_message

  async def close(self):
    # Close the websocket connection.
    await self._ws.close()


class AsyncLive(_common.BaseModule):
  """AsyncLive."""

  def _LiveSetup_to_mldev(
      self, model: str, config: Optional[types.LiveConnectConfigOrDict] = None
  ):
    if isinstance(config, types.LiveConnectConfig):
      from_object = config.model_dump(exclude_none=True)
    else:
      from_object = config

    to_object = {}
    if getv(from_object, ['generation_config']) is not None:
      setv(
          to_object,
          ['generationConfig'],
          _GenerateContentConfig_to_mldev(
              self._api_client,
              getv(from_object, ['generation_config']),
              to_object,
          ),
      )
    if getv(from_object, ['response_modalities']) is not None:
      if getv(to_object, ['generationConfig']) is not None:
        to_object['generationConfig']['responseModalities'] = from_object[
            'response_modalities'
        ]
      else:
        to_object['generationConfig'] = {
            'responseModalities': from_object['response_modalities']
        }
    if getv(from_object, ['speech_config']) is not None:
      if getv(to_object, ['generationConfig']) is not None:
        to_object['generationConfig']['speechConfig'] = _SpeechConfig_to_mldev(
            self._api_client,
            t.t_speech_config(
                self._api_client, getv(from_object, ['speech_config'])
            ),
            to_object,
        )
      else:
        to_object['generationConfig'] = {
            'speechConfig': _SpeechConfig_to_mldev(
                self._api_client,
                t.t_speech_config(
                    self._api_client, getv(from_object, ['speech_config'])
                ),
                to_object,
            )
        }

    if getv(from_object, ['system_instruction']) is not None:
      setv(
          to_object,
          ['systemInstruction'],
          _Content_to_mldev(
              self._api_client,
              t.t_content(
                  self._api_client, getv(from_object, ['system_instruction'])
              ),
              to_object,
          ),
      )
    if getv(from_object, ['tools']) is not None:
      setv(
          to_object,
          ['tools'],
          [
              _Tool_to_mldev(self._api_client, item, to_object)
              for item in getv(from_object, ['tools'])
          ],
      )

    return_value = {'setup': {'model': model}}
    return_value['setup'].update(to_object)
    return return_value

  def _LiveSetup_to_vertex(
      self, model: str, config: Optional[types.LiveConnectConfigOrDict] = None
  ):
    if isinstance(config, types.LiveConnectConfig):
      from_object = config.model_dump(exclude_none=True)
    else:
      from_object = config

    to_object = {}

    if getv(from_object, ['generation_config']) is not None:
      setv(
          to_object,
          ['generationConfig'],
          _GenerateContentConfig_to_vertex(
              self._api_client,
              getv(from_object, ['generation_config']),
              to_object,
          ),
      )
    if getv(from_object, ['response_modalities']) is not None:
      if getv(to_object, ['generationConfig']) is not None:
        to_object['generationConfig']['responseModalities'] = from_object[
            'response_modalities'
        ]
      else:
        to_object['generationConfig'] = {
            'responseModalities': from_object['response_modalities']
        }
    else:
      # Set default to AUDIO to align with MLDev API.
      if getv(to_object, ['generationConfig']) is not None:
        to_object['generationConfig'].update({'responseModalities': ['AUDIO']})
      else:
        to_object.update(
            {'generationConfig': {'responseModalities': ['AUDIO']}}
        )
    if getv(from_object, ['speech_config']) is not None:
      if getv(to_object, ['generationConfig']) is not None:
        to_object['generationConfig']['speechConfig'] = _SpeechConfig_to_vertex(
            self._api_client,
            t.t_speech_config(
                self._api_client, getv(from_object, ['speech_config'])
            ),
            to_object,
        )
      else:
        to_object['generationConfig'] = {
            'speechConfig': _SpeechConfig_to_vertex(
                self._api_client,
                t.t_speech_config(
                    self._api_client, getv(from_object, ['speech_config'])
                ),
                to_object,
            )
        }
    if getv(from_object, ['system_instruction']) is not None:
      setv(
          to_object,
          ['systemInstruction'],
          _Content_to_vertex(
              self._api_client,
              t.t_content(
                  self._api_client, getv(from_object, ['system_instruction'])
              ),
              to_object,
          ),
      )
    if getv(from_object, ['tools']) is not None:
      setv(
          to_object,
          ['tools'],
          [
              _Tool_to_vertex(self._api_client, item, to_object)
              for item in getv(from_object, ['tools'])
          ],
      )

    return_value = {'setup': {'model': model}}
    return_value['setup'].update(to_object)
    return return_value

  @contextlib.asynccontextmanager
  async def connect(
      self,
      *,
      model: str,
      config: Optional[types.LiveConnectConfigOrDict] = None,
  ) -> AsyncSession:
    """Connect to the live server.

    Usage:

    .. code-block:: python

      client = genai.Client(api_key=API_KEY)
      config = {}
      async with client.aio.live.connect(model='...', config=config) as session:
        await session.send(input='Hello world!', end_of_turn=True)
        async for message in session.receive():
          print(message)
    """
    base_url = self._api_client._websocket_base_url()
    if self._api_client.api_key:
      api_key = self._api_client.api_key
      version = self._api_client._http_options['api_version']
      uri = f'{base_url}/ws/google.ai.generativelanguage.{version}.GenerativeService.BidiGenerateContent?key={api_key}'
      headers = self._api_client._http_options['headers']

      transformed_model = t.t_model(self._api_client, model)
      request = json.dumps(
          self._LiveSetup_to_mldev(model=transformed_model, config=config)
      )
    else:
      # Get bearer token through Application Default Credentials.
      creds, _ = google.auth.default(
          scopes=['https://www.googleapis.com/auth/cloud-platform']
      )

      # creds.valid is False, and creds.token is None
      # Need to refresh credentials to populate those
      auth_req = google.auth.transport.requests.Request()
      creds.refresh(auth_req)
      bearer_token = creds.token
      headers = {
          'Content-Type': 'application/json',
          'Authorization': 'Bearer {}'.format(bearer_token),
      }
      version = self._api_client._http_options['api_version']
      uri = f'{base_url}/ws/google.cloud.aiplatform.{version}.LlmBidiService/BidiGenerateContent'
      location = self._api_client.location
      project = self._api_client.project
      transformed_model = t.t_model(self._api_client, model)
      if transformed_model.startswith('publishers/'):
        transformed_model = (
            f'projects/{project}/locations/{location}/' + transformed_model
        )

      request = json.dumps(
          self._LiveSetup_to_vertex(model=transformed_model, config=config)
      )

    async with connect(uri, additional_headers=headers) as ws:
      await ws.send(request)
      logging.info(await ws.recv(decode=False))

      yield AsyncSession(api_client=self._api_client, websocket=ws)