about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/grpc/beta/_client_adaptations.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/grpc/beta/_client_adaptations.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/grpc/beta/_client_adaptations.py')
-rw-r--r--.venv/lib/python3.12/site-packages/grpc/beta/_client_adaptations.py1015
1 files changed, 1015 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/grpc/beta/_client_adaptations.py b/.venv/lib/python3.12/site-packages/grpc/beta/_client_adaptations.py
new file mode 100644
index 00000000..01214921
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/grpc/beta/_client_adaptations.py
@@ -0,0 +1,1015 @@
+# Copyright 2016 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Translates gRPC's client-side API into gRPC's client-side Beta API."""
+
+import grpc
+from grpc import _common
+from grpc.beta import _metadata
+from grpc.beta import interfaces
+from grpc.framework.common import cardinality
+from grpc.framework.foundation import future
+from grpc.framework.interfaces.face import face
+
+# pylint: disable=too-many-arguments,too-many-locals,unused-argument
+
+_STATUS_CODE_TO_ABORTION_KIND_AND_ABORTION_ERROR_CLASS = {
+    grpc.StatusCode.CANCELLED: (
+        face.Abortion.Kind.CANCELLED,
+        face.CancellationError,
+    ),
+    grpc.StatusCode.UNKNOWN: (
+        face.Abortion.Kind.REMOTE_FAILURE,
+        face.RemoteError,
+    ),
+    grpc.StatusCode.DEADLINE_EXCEEDED: (
+        face.Abortion.Kind.EXPIRED,
+        face.ExpirationError,
+    ),
+    grpc.StatusCode.UNIMPLEMENTED: (
+        face.Abortion.Kind.LOCAL_FAILURE,
+        face.LocalError,
+    ),
+}
+
+
+def _effective_metadata(metadata, metadata_transformer):
+    non_none_metadata = () if metadata is None else metadata
+    if metadata_transformer is None:
+        return non_none_metadata
+    else:
+        return metadata_transformer(non_none_metadata)
+
+
+def _credentials(grpc_call_options):
+    return None if grpc_call_options is None else grpc_call_options.credentials
+
+
+def _abortion(rpc_error_call):
+    code = rpc_error_call.code()
+    pair = _STATUS_CODE_TO_ABORTION_KIND_AND_ABORTION_ERROR_CLASS.get(code)
+    error_kind = face.Abortion.Kind.LOCAL_FAILURE if pair is None else pair[0]
+    return face.Abortion(
+        error_kind,
+        rpc_error_call.initial_metadata(),
+        rpc_error_call.trailing_metadata(),
+        code,
+        rpc_error_call.details(),
+    )
+
+
+def _abortion_error(rpc_error_call):
+    code = rpc_error_call.code()
+    pair = _STATUS_CODE_TO_ABORTION_KIND_AND_ABORTION_ERROR_CLASS.get(code)
+    exception_class = face.AbortionError if pair is None else pair[1]
+    return exception_class(
+        rpc_error_call.initial_metadata(),
+        rpc_error_call.trailing_metadata(),
+        code,
+        rpc_error_call.details(),
+    )
+
+
+class _InvocationProtocolContext(interfaces.GRPCInvocationContext):
+    def disable_next_request_compression(self):
+        pass  # TODO(https://github.com/grpc/grpc/issues/4078): design, implement.
+
+
+class _Rendezvous(future.Future, face.Call):
+    def __init__(self, response_future, response_iterator, call):
+        self._future = response_future
+        self._iterator = response_iterator
+        self._call = call
+
+    def cancel(self):
+        return self._call.cancel()
+
+    def cancelled(self):
+        return self._future.cancelled()
+
+    def running(self):
+        return self._future.running()
+
+    def done(self):
+        return self._future.done()
+
+    def result(self, timeout=None):
+        try:
+            return self._future.result(timeout=timeout)
+        except grpc.RpcError as rpc_error_call:
+            raise _abortion_error(rpc_error_call)
+        except grpc.FutureTimeoutError:
+            raise future.TimeoutError()
+        except grpc.FutureCancelledError:
+            raise future.CancelledError()
+
+    def exception(self, timeout=None):
+        try:
+            rpc_error_call = self._future.exception(timeout=timeout)
+            if rpc_error_call is None:
+                return None
+            else:
+                return _abortion_error(rpc_error_call)
+        except grpc.FutureTimeoutError:
+            raise future.TimeoutError()
+        except grpc.FutureCancelledError:
+            raise future.CancelledError()
+
+    def traceback(self, timeout=None):
+        try:
+            return self._future.traceback(timeout=timeout)
+        except grpc.FutureTimeoutError:
+            raise future.TimeoutError()
+        except grpc.FutureCancelledError:
+            raise future.CancelledError()
+
+    def add_done_callback(self, fn):
+        self._future.add_done_callback(lambda ignored_callback: fn(self))
+
+    def __iter__(self):
+        return self
+
+    def _next(self):
+        try:
+            return next(self._iterator)
+        except grpc.RpcError as rpc_error_call:
+            raise _abortion_error(rpc_error_call)
+
+    def __next__(self):
+        return self._next()
+
+    def next(self):
+        return self._next()
+
+    def is_active(self):
+        return self._call.is_active()
+
+    def time_remaining(self):
+        return self._call.time_remaining()
+
+    def add_abortion_callback(self, abortion_callback):
+        def done_callback():
+            if self.code() is not grpc.StatusCode.OK:
+                abortion_callback(_abortion(self._call))
+
+        registered = self._call.add_callback(done_callback)
+        return None if registered else done_callback()
+
+    def protocol_context(self):
+        return _InvocationProtocolContext()
+
+    def initial_metadata(self):
+        return _metadata.beta(self._call.initial_metadata())
+
+    def terminal_metadata(self):
+        return _metadata.beta(self._call.terminal_metadata())
+
+    def code(self):
+        return self._call.code()
+
+    def details(self):
+        return self._call.details()
+
+
+def _blocking_unary_unary(
+    channel,
+    group,
+    method,
+    timeout,
+    with_call,
+    protocol_options,
+    metadata,
+    metadata_transformer,
+    request,
+    request_serializer,
+    response_deserializer,
+):
+    try:
+        multi_callable = channel.unary_unary(
+            _common.fully_qualified_method(group, method),
+            request_serializer=request_serializer,
+            response_deserializer=response_deserializer,
+        )
+        effective_metadata = _effective_metadata(metadata, metadata_transformer)
+        if with_call:
+            response, call = multi_callable.with_call(
+                request,
+                timeout=timeout,
+                metadata=_metadata.unbeta(effective_metadata),
+                credentials=_credentials(protocol_options),
+            )
+            return response, _Rendezvous(None, None, call)
+        else:
+            return multi_callable(
+                request,
+                timeout=timeout,
+                metadata=_metadata.unbeta(effective_metadata),
+                credentials=_credentials(protocol_options),
+            )
+    except grpc.RpcError as rpc_error_call:
+        raise _abortion_error(rpc_error_call)
+
+
+def _future_unary_unary(
+    channel,
+    group,
+    method,
+    timeout,
+    protocol_options,
+    metadata,
+    metadata_transformer,
+    request,
+    request_serializer,
+    response_deserializer,
+):
+    multi_callable = channel.unary_unary(
+        _common.fully_qualified_method(group, method),
+        request_serializer=request_serializer,
+        response_deserializer=response_deserializer,
+    )
+    effective_metadata = _effective_metadata(metadata, metadata_transformer)
+    response_future = multi_callable.future(
+        request,
+        timeout=timeout,
+        metadata=_metadata.unbeta(effective_metadata),
+        credentials=_credentials(protocol_options),
+    )
+    return _Rendezvous(response_future, None, response_future)
+
+
+def _unary_stream(
+    channel,
+    group,
+    method,
+    timeout,
+    protocol_options,
+    metadata,
+    metadata_transformer,
+    request,
+    request_serializer,
+    response_deserializer,
+):
+    multi_callable = channel.unary_stream(
+        _common.fully_qualified_method(group, method),
+        request_serializer=request_serializer,
+        response_deserializer=response_deserializer,
+    )
+    effective_metadata = _effective_metadata(metadata, metadata_transformer)
+    response_iterator = multi_callable(
+        request,
+        timeout=timeout,
+        metadata=_metadata.unbeta(effective_metadata),
+        credentials=_credentials(protocol_options),
+    )
+    return _Rendezvous(None, response_iterator, response_iterator)
+
+
+def _blocking_stream_unary(
+    channel,
+    group,
+    method,
+    timeout,
+    with_call,
+    protocol_options,
+    metadata,
+    metadata_transformer,
+    request_iterator,
+    request_serializer,
+    response_deserializer,
+):
+    try:
+        multi_callable = channel.stream_unary(
+            _common.fully_qualified_method(group, method),
+            request_serializer=request_serializer,
+            response_deserializer=response_deserializer,
+        )
+        effective_metadata = _effective_metadata(metadata, metadata_transformer)
+        if with_call:
+            response, call = multi_callable.with_call(
+                request_iterator,
+                timeout=timeout,
+                metadata=_metadata.unbeta(effective_metadata),
+                credentials=_credentials(protocol_options),
+            )
+            return response, _Rendezvous(None, None, call)
+        else:
+            return multi_callable(
+                request_iterator,
+                timeout=timeout,
+                metadata=_metadata.unbeta(effective_metadata),
+                credentials=_credentials(protocol_options),
+            )
+    except grpc.RpcError as rpc_error_call:
+        raise _abortion_error(rpc_error_call)
+
+
+def _future_stream_unary(
+    channel,
+    group,
+    method,
+    timeout,
+    protocol_options,
+    metadata,
+    metadata_transformer,
+    request_iterator,
+    request_serializer,
+    response_deserializer,
+):
+    multi_callable = channel.stream_unary(
+        _common.fully_qualified_method(group, method),
+        request_serializer=request_serializer,
+        response_deserializer=response_deserializer,
+    )
+    effective_metadata = _effective_metadata(metadata, metadata_transformer)
+    response_future = multi_callable.future(
+        request_iterator,
+        timeout=timeout,
+        metadata=_metadata.unbeta(effective_metadata),
+        credentials=_credentials(protocol_options),
+    )
+    return _Rendezvous(response_future, None, response_future)
+
+
+def _stream_stream(
+    channel,
+    group,
+    method,
+    timeout,
+    protocol_options,
+    metadata,
+    metadata_transformer,
+    request_iterator,
+    request_serializer,
+    response_deserializer,
+):
+    multi_callable = channel.stream_stream(
+        _common.fully_qualified_method(group, method),
+        request_serializer=request_serializer,
+        response_deserializer=response_deserializer,
+    )
+    effective_metadata = _effective_metadata(metadata, metadata_transformer)
+    response_iterator = multi_callable(
+        request_iterator,
+        timeout=timeout,
+        metadata=_metadata.unbeta(effective_metadata),
+        credentials=_credentials(protocol_options),
+    )
+    return _Rendezvous(None, response_iterator, response_iterator)
+
+
+class _UnaryUnaryMultiCallable(face.UnaryUnaryMultiCallable):
+    def __init__(
+        self,
+        channel,
+        group,
+        method,
+        metadata_transformer,
+        request_serializer,
+        response_deserializer,
+    ):
+        self._channel = channel
+        self._group = group
+        self._method = method
+        self._metadata_transformer = metadata_transformer
+        self._request_serializer = request_serializer
+        self._response_deserializer = response_deserializer
+
+    def __call__(
+        self,
+        request,
+        timeout,
+        metadata=None,
+        with_call=False,
+        protocol_options=None,
+    ):
+        return _blocking_unary_unary(
+            self._channel,
+            self._group,
+            self._method,
+            timeout,
+            with_call,
+            protocol_options,
+            metadata,
+            self._metadata_transformer,
+            request,
+            self._request_serializer,
+            self._response_deserializer,
+        )
+
+    def future(self, request, timeout, metadata=None, protocol_options=None):
+        return _future_unary_unary(
+            self._channel,
+            self._group,
+            self._method,
+            timeout,
+            protocol_options,
+            metadata,
+            self._metadata_transformer,
+            request,
+            self._request_serializer,
+            self._response_deserializer,
+        )
+
+    def event(
+        self,
+        request,
+        receiver,
+        abortion_callback,
+        timeout,
+        metadata=None,
+        protocol_options=None,
+    ):
+        raise NotImplementedError()
+
+
+class _UnaryStreamMultiCallable(face.UnaryStreamMultiCallable):
+    def __init__(
+        self,
+        channel,
+        group,
+        method,
+        metadata_transformer,
+        request_serializer,
+        response_deserializer,
+    ):
+        self._channel = channel
+        self._group = group
+        self._method = method
+        self._metadata_transformer = metadata_transformer
+        self._request_serializer = request_serializer
+        self._response_deserializer = response_deserializer
+
+    def __call__(self, request, timeout, metadata=None, protocol_options=None):
+        return _unary_stream(
+            self._channel,
+            self._group,
+            self._method,
+            timeout,
+            protocol_options,
+            metadata,
+            self._metadata_transformer,
+            request,
+            self._request_serializer,
+            self._response_deserializer,
+        )
+
+    def event(
+        self,
+        request,
+        receiver,
+        abortion_callback,
+        timeout,
+        metadata=None,
+        protocol_options=None,
+    ):
+        raise NotImplementedError()
+
+
+class _StreamUnaryMultiCallable(face.StreamUnaryMultiCallable):
+    def __init__(
+        self,
+        channel,
+        group,
+        method,
+        metadata_transformer,
+        request_serializer,
+        response_deserializer,
+    ):
+        self._channel = channel
+        self._group = group
+        self._method = method
+        self._metadata_transformer = metadata_transformer
+        self._request_serializer = request_serializer
+        self._response_deserializer = response_deserializer
+
+    def __call__(
+        self,
+        request_iterator,
+        timeout,
+        metadata=None,
+        with_call=False,
+        protocol_options=None,
+    ):
+        return _blocking_stream_unary(
+            self._channel,
+            self._group,
+            self._method,
+            timeout,
+            with_call,
+            protocol_options,
+            metadata,
+            self._metadata_transformer,
+            request_iterator,
+            self._request_serializer,
+            self._response_deserializer,
+        )
+
+    def future(
+        self, request_iterator, timeout, metadata=None, protocol_options=None
+    ):
+        return _future_stream_unary(
+            self._channel,
+            self._group,
+            self._method,
+            timeout,
+            protocol_options,
+            metadata,
+            self._metadata_transformer,
+            request_iterator,
+            self._request_serializer,
+            self._response_deserializer,
+        )
+
+    def event(
+        self,
+        receiver,
+        abortion_callback,
+        timeout,
+        metadata=None,
+        protocol_options=None,
+    ):
+        raise NotImplementedError()
+
+
+class _StreamStreamMultiCallable(face.StreamStreamMultiCallable):
+    def __init__(
+        self,
+        channel,
+        group,
+        method,
+        metadata_transformer,
+        request_serializer,
+        response_deserializer,
+    ):
+        self._channel = channel
+        self._group = group
+        self._method = method
+        self._metadata_transformer = metadata_transformer
+        self._request_serializer = request_serializer
+        self._response_deserializer = response_deserializer
+
+    def __call__(
+        self, request_iterator, timeout, metadata=None, protocol_options=None
+    ):
+        return _stream_stream(
+            self._channel,
+            self._group,
+            self._method,
+            timeout,
+            protocol_options,
+            metadata,
+            self._metadata_transformer,
+            request_iterator,
+            self._request_serializer,
+            self._response_deserializer,
+        )
+
+    def event(
+        self,
+        receiver,
+        abortion_callback,
+        timeout,
+        metadata=None,
+        protocol_options=None,
+    ):
+        raise NotImplementedError()
+
+
+class _GenericStub(face.GenericStub):
+    def __init__(
+        self,
+        channel,
+        metadata_transformer,
+        request_serializers,
+        response_deserializers,
+    ):
+        self._channel = channel
+        self._metadata_transformer = metadata_transformer
+        self._request_serializers = request_serializers or {}
+        self._response_deserializers = response_deserializers or {}
+
+    def blocking_unary_unary(
+        self,
+        group,
+        method,
+        request,
+        timeout,
+        metadata=None,
+        with_call=None,
+        protocol_options=None,
+    ):
+        request_serializer = self._request_serializers.get(
+            (
+                group,
+                method,
+            )
+        )
+        response_deserializer = self._response_deserializers.get(
+            (
+                group,
+                method,
+            )
+        )
+        return _blocking_unary_unary(
+            self._channel,
+            group,
+            method,
+            timeout,
+            with_call,
+            protocol_options,
+            metadata,
+            self._metadata_transformer,
+            request,
+            request_serializer,
+            response_deserializer,
+        )
+
+    def future_unary_unary(
+        self,
+        group,
+        method,
+        request,
+        timeout,
+        metadata=None,
+        protocol_options=None,
+    ):
+        request_serializer = self._request_serializers.get(
+            (
+                group,
+                method,
+            )
+        )
+        response_deserializer = self._response_deserializers.get(
+            (
+                group,
+                method,
+            )
+        )
+        return _future_unary_unary(
+            self._channel,
+            group,
+            method,
+            timeout,
+            protocol_options,
+            metadata,
+            self._metadata_transformer,
+            request,
+            request_serializer,
+            response_deserializer,
+        )
+
+    def inline_unary_stream(
+        self,
+        group,
+        method,
+        request,
+        timeout,
+        metadata=None,
+        protocol_options=None,
+    ):
+        request_serializer = self._request_serializers.get(
+            (
+                group,
+                method,
+            )
+        )
+        response_deserializer = self._response_deserializers.get(
+            (
+                group,
+                method,
+            )
+        )
+        return _unary_stream(
+            self._channel,
+            group,
+            method,
+            timeout,
+            protocol_options,
+            metadata,
+            self._metadata_transformer,
+            request,
+            request_serializer,
+            response_deserializer,
+        )
+
+    def blocking_stream_unary(
+        self,
+        group,
+        method,
+        request_iterator,
+        timeout,
+        metadata=None,
+        with_call=None,
+        protocol_options=None,
+    ):
+        request_serializer = self._request_serializers.get(
+            (
+                group,
+                method,
+            )
+        )
+        response_deserializer = self._response_deserializers.get(
+            (
+                group,
+                method,
+            )
+        )
+        return _blocking_stream_unary(
+            self._channel,
+            group,
+            method,
+            timeout,
+            with_call,
+            protocol_options,
+            metadata,
+            self._metadata_transformer,
+            request_iterator,
+            request_serializer,
+            response_deserializer,
+        )
+
+    def future_stream_unary(
+        self,
+        group,
+        method,
+        request_iterator,
+        timeout,
+        metadata=None,
+        protocol_options=None,
+    ):
+        request_serializer = self._request_serializers.get(
+            (
+                group,
+                method,
+            )
+        )
+        response_deserializer = self._response_deserializers.get(
+            (
+                group,
+                method,
+            )
+        )
+        return _future_stream_unary(
+            self._channel,
+            group,
+            method,
+            timeout,
+            protocol_options,
+            metadata,
+            self._metadata_transformer,
+            request_iterator,
+            request_serializer,
+            response_deserializer,
+        )
+
+    def inline_stream_stream(
+        self,
+        group,
+        method,
+        request_iterator,
+        timeout,
+        metadata=None,
+        protocol_options=None,
+    ):
+        request_serializer = self._request_serializers.get(
+            (
+                group,
+                method,
+            )
+        )
+        response_deserializer = self._response_deserializers.get(
+            (
+                group,
+                method,
+            )
+        )
+        return _stream_stream(
+            self._channel,
+            group,
+            method,
+            timeout,
+            protocol_options,
+            metadata,
+            self._metadata_transformer,
+            request_iterator,
+            request_serializer,
+            response_deserializer,
+        )
+
+    def event_unary_unary(
+        self,
+        group,
+        method,
+        request,
+        receiver,
+        abortion_callback,
+        timeout,
+        metadata=None,
+        protocol_options=None,
+    ):
+        raise NotImplementedError()
+
+    def event_unary_stream(
+        self,
+        group,
+        method,
+        request,
+        receiver,
+        abortion_callback,
+        timeout,
+        metadata=None,
+        protocol_options=None,
+    ):
+        raise NotImplementedError()
+
+    def event_stream_unary(
+        self,
+        group,
+        method,
+        receiver,
+        abortion_callback,
+        timeout,
+        metadata=None,
+        protocol_options=None,
+    ):
+        raise NotImplementedError()
+
+    def event_stream_stream(
+        self,
+        group,
+        method,
+        receiver,
+        abortion_callback,
+        timeout,
+        metadata=None,
+        protocol_options=None,
+    ):
+        raise NotImplementedError()
+
+    def unary_unary(self, group, method):
+        request_serializer = self._request_serializers.get(
+            (
+                group,
+                method,
+            )
+        )
+        response_deserializer = self._response_deserializers.get(
+            (
+                group,
+                method,
+            )
+        )
+        return _UnaryUnaryMultiCallable(
+            self._channel,
+            group,
+            method,
+            self._metadata_transformer,
+            request_serializer,
+            response_deserializer,
+        )
+
+    def unary_stream(self, group, method):
+        request_serializer = self._request_serializers.get(
+            (
+                group,
+                method,
+            )
+        )
+        response_deserializer = self._response_deserializers.get(
+            (
+                group,
+                method,
+            )
+        )
+        return _UnaryStreamMultiCallable(
+            self._channel,
+            group,
+            method,
+            self._metadata_transformer,
+            request_serializer,
+            response_deserializer,
+        )
+
+    def stream_unary(self, group, method):
+        request_serializer = self._request_serializers.get(
+            (
+                group,
+                method,
+            )
+        )
+        response_deserializer = self._response_deserializers.get(
+            (
+                group,
+                method,
+            )
+        )
+        return _StreamUnaryMultiCallable(
+            self._channel,
+            group,
+            method,
+            self._metadata_transformer,
+            request_serializer,
+            response_deserializer,
+        )
+
+    def stream_stream(self, group, method):
+        request_serializer = self._request_serializers.get(
+            (
+                group,
+                method,
+            )
+        )
+        response_deserializer = self._response_deserializers.get(
+            (
+                group,
+                method,
+            )
+        )
+        return _StreamStreamMultiCallable(
+            self._channel,
+            group,
+            method,
+            self._metadata_transformer,
+            request_serializer,
+            response_deserializer,
+        )
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        return False
+
+
+class _DynamicStub(face.DynamicStub):
+    def __init__(self, backing_generic_stub, group, cardinalities):
+        self._generic_stub = backing_generic_stub
+        self._group = group
+        self._cardinalities = cardinalities
+
+    def __getattr__(self, attr):
+        method_cardinality = self._cardinalities.get(attr)
+        if method_cardinality is cardinality.Cardinality.UNARY_UNARY:
+            return self._generic_stub.unary_unary(self._group, attr)
+        elif method_cardinality is cardinality.Cardinality.UNARY_STREAM:
+            return self._generic_stub.unary_stream(self._group, attr)
+        elif method_cardinality is cardinality.Cardinality.STREAM_UNARY:
+            return self._generic_stub.stream_unary(self._group, attr)
+        elif method_cardinality is cardinality.Cardinality.STREAM_STREAM:
+            return self._generic_stub.stream_stream(self._group, attr)
+        else:
+            raise AttributeError(
+                '_DynamicStub object has no attribute "%s"!' % attr
+            )
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        return False
+
+
+def generic_stub(
+    channel,
+    host,
+    metadata_transformer,
+    request_serializers,
+    response_deserializers,
+):
+    return _GenericStub(
+        channel,
+        metadata_transformer,
+        request_serializers,
+        response_deserializers,
+    )
+
+
+def dynamic_stub(
+    channel,
+    service,
+    cardinalities,
+    host,
+    metadata_transformer,
+    request_serializers,
+    response_deserializers,
+):
+    return _DynamicStub(
+        _GenericStub(
+            channel,
+            metadata_transformer,
+            request_serializers,
+            response_deserializers,
+        ),
+        service,
+        cardinalities,
+    )