about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/grpc/beta/_server_adaptations.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/grpc/beta/_server_adaptations.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/grpc/beta/_server_adaptations.py')
-rw-r--r--.venv/lib/python3.12/site-packages/grpc/beta/_server_adaptations.py465
1 files changed, 465 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/grpc/beta/_server_adaptations.py b/.venv/lib/python3.12/site-packages/grpc/beta/_server_adaptations.py
new file mode 100644
index 00000000..a6f730bb
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/grpc/beta/_server_adaptations.py
@@ -0,0 +1,465 @@
+# Copyright 2016 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Translates gRPC's server-side API into gRPC's server-side Beta API."""
+
+import collections
+import threading
+
+import grpc
+from grpc import _common
+from grpc.beta import _metadata
+from grpc.beta import interfaces
+from grpc.framework.common import cardinality
+from grpc.framework.common import style
+from grpc.framework.foundation import abandonment
+from grpc.framework.foundation import logging_pool
+from grpc.framework.foundation import stream
+from grpc.framework.interfaces.face import face
+
+# pylint: disable=too-many-return-statements
+
+_DEFAULT_POOL_SIZE = 8
+
+
+class _ServerProtocolContext(interfaces.GRPCServicerContext):
+    def __init__(self, servicer_context):
+        self._servicer_context = servicer_context
+
+    def peer(self):
+        return self._servicer_context.peer()
+
+    def disable_next_response_compression(self):
+        pass  # TODO(https://github.com/grpc/grpc/issues/4078): design, implement.
+
+
+class _FaceServicerContext(face.ServicerContext):
+    def __init__(self, servicer_context):
+        self._servicer_context = servicer_context
+
+    def is_active(self):
+        return self._servicer_context.is_active()
+
+    def time_remaining(self):
+        return self._servicer_context.time_remaining()
+
+    def add_abortion_callback(self, abortion_callback):
+        raise NotImplementedError(
+            "add_abortion_callback no longer supported server-side!"
+        )
+
+    def cancel(self):
+        self._servicer_context.cancel()
+
+    def protocol_context(self):
+        return _ServerProtocolContext(self._servicer_context)
+
+    def invocation_metadata(self):
+        return _metadata.beta(self._servicer_context.invocation_metadata())
+
+    def initial_metadata(self, initial_metadata):
+        self._servicer_context.send_initial_metadata(
+            _metadata.unbeta(initial_metadata)
+        )
+
+    def terminal_metadata(self, terminal_metadata):
+        self._servicer_context.set_terminal_metadata(
+            _metadata.unbeta(terminal_metadata)
+        )
+
+    def code(self, code):
+        self._servicer_context.set_code(code)
+
+    def details(self, details):
+        self._servicer_context.set_details(details)
+
+
+def _adapt_unary_request_inline(unary_request_inline):
+    def adaptation(request, servicer_context):
+        return unary_request_inline(
+            request, _FaceServicerContext(servicer_context)
+        )
+
+    return adaptation
+
+
+def _adapt_stream_request_inline(stream_request_inline):
+    def adaptation(request_iterator, servicer_context):
+        return stream_request_inline(
+            request_iterator, _FaceServicerContext(servicer_context)
+        )
+
+    return adaptation
+
+
+class _Callback(stream.Consumer):
+    def __init__(self):
+        self._condition = threading.Condition()
+        self._values = []
+        self._terminated = False
+        self._cancelled = False
+
+    def consume(self, value):
+        with self._condition:
+            self._values.append(value)
+            self._condition.notify_all()
+
+    def terminate(self):
+        with self._condition:
+            self._terminated = True
+            self._condition.notify_all()
+
+    def consume_and_terminate(self, value):
+        with self._condition:
+            self._values.append(value)
+            self._terminated = True
+            self._condition.notify_all()
+
+    def cancel(self):
+        with self._condition:
+            self._cancelled = True
+            self._condition.notify_all()
+
+    def draw_one_value(self):
+        with self._condition:
+            while True:
+                if self._cancelled:
+                    raise abandonment.Abandoned()
+                elif self._values:
+                    return self._values.pop(0)
+                elif self._terminated:
+                    return None
+                else:
+                    self._condition.wait()
+
+    def draw_all_values(self):
+        with self._condition:
+            while True:
+                if self._cancelled:
+                    raise abandonment.Abandoned()
+                elif self._terminated:
+                    all_values = tuple(self._values)
+                    self._values = None
+                    return all_values
+                else:
+                    self._condition.wait()
+
+
+def _run_request_pipe_thread(
+    request_iterator, request_consumer, servicer_context
+):
+    thread_joined = threading.Event()
+
+    def pipe_requests():
+        for request in request_iterator:
+            if not servicer_context.is_active() or thread_joined.is_set():
+                return
+            request_consumer.consume(request)
+            if not servicer_context.is_active() or thread_joined.is_set():
+                return
+        request_consumer.terminate()
+
+    request_pipe_thread = threading.Thread(target=pipe_requests)
+    request_pipe_thread.daemon = True
+    request_pipe_thread.start()
+
+
+def _adapt_unary_unary_event(unary_unary_event):
+    def adaptation(request, servicer_context):
+        callback = _Callback()
+        if not servicer_context.add_callback(callback.cancel):
+            raise abandonment.Abandoned()
+        unary_unary_event(
+            request,
+            callback.consume_and_terminate,
+            _FaceServicerContext(servicer_context),
+        )
+        return callback.draw_all_values()[0]
+
+    return adaptation
+
+
+def _adapt_unary_stream_event(unary_stream_event):
+    def adaptation(request, servicer_context):
+        callback = _Callback()
+        if not servicer_context.add_callback(callback.cancel):
+            raise abandonment.Abandoned()
+        unary_stream_event(
+            request, callback, _FaceServicerContext(servicer_context)
+        )
+        while True:
+            response = callback.draw_one_value()
+            if response is None:
+                return
+            else:
+                yield response
+
+    return adaptation
+
+
+def _adapt_stream_unary_event(stream_unary_event):
+    def adaptation(request_iterator, servicer_context):
+        callback = _Callback()
+        if not servicer_context.add_callback(callback.cancel):
+            raise abandonment.Abandoned()
+        request_consumer = stream_unary_event(
+            callback.consume_and_terminate,
+            _FaceServicerContext(servicer_context),
+        )
+        _run_request_pipe_thread(
+            request_iterator, request_consumer, servicer_context
+        )
+        return callback.draw_all_values()[0]
+
+    return adaptation
+
+
+def _adapt_stream_stream_event(stream_stream_event):
+    def adaptation(request_iterator, servicer_context):
+        callback = _Callback()
+        if not servicer_context.add_callback(callback.cancel):
+            raise abandonment.Abandoned()
+        request_consumer = stream_stream_event(
+            callback, _FaceServicerContext(servicer_context)
+        )
+        _run_request_pipe_thread(
+            request_iterator, request_consumer, servicer_context
+        )
+        while True:
+            response = callback.draw_one_value()
+            if response is None:
+                return
+            else:
+                yield response
+
+    return adaptation
+
+
+class _SimpleMethodHandler(
+    collections.namedtuple(
+        "_MethodHandler",
+        (
+            "request_streaming",
+            "response_streaming",
+            "request_deserializer",
+            "response_serializer",
+            "unary_unary",
+            "unary_stream",
+            "stream_unary",
+            "stream_stream",
+        ),
+    ),
+    grpc.RpcMethodHandler,
+):
+    pass
+
+
+def _simple_method_handler(
+    implementation, request_deserializer, response_serializer
+):
+    if implementation.style is style.Service.INLINE:
+        if implementation.cardinality is cardinality.Cardinality.UNARY_UNARY:
+            return _SimpleMethodHandler(
+                False,
+                False,
+                request_deserializer,
+                response_serializer,
+                _adapt_unary_request_inline(implementation.unary_unary_inline),
+                None,
+                None,
+                None,
+            )
+        elif implementation.cardinality is cardinality.Cardinality.UNARY_STREAM:
+            return _SimpleMethodHandler(
+                False,
+                True,
+                request_deserializer,
+                response_serializer,
+                None,
+                _adapt_unary_request_inline(implementation.unary_stream_inline),
+                None,
+                None,
+            )
+        elif implementation.cardinality is cardinality.Cardinality.STREAM_UNARY:
+            return _SimpleMethodHandler(
+                True,
+                False,
+                request_deserializer,
+                response_serializer,
+                None,
+                None,
+                _adapt_stream_request_inline(
+                    implementation.stream_unary_inline
+                ),
+                None,
+            )
+        elif (
+            implementation.cardinality is cardinality.Cardinality.STREAM_STREAM
+        ):
+            return _SimpleMethodHandler(
+                True,
+                True,
+                request_deserializer,
+                response_serializer,
+                None,
+                None,
+                None,
+                _adapt_stream_request_inline(
+                    implementation.stream_stream_inline
+                ),
+            )
+    elif implementation.style is style.Service.EVENT:
+        if implementation.cardinality is cardinality.Cardinality.UNARY_UNARY:
+            return _SimpleMethodHandler(
+                False,
+                False,
+                request_deserializer,
+                response_serializer,
+                _adapt_unary_unary_event(implementation.unary_unary_event),
+                None,
+                None,
+                None,
+            )
+        elif implementation.cardinality is cardinality.Cardinality.UNARY_STREAM:
+            return _SimpleMethodHandler(
+                False,
+                True,
+                request_deserializer,
+                response_serializer,
+                None,
+                _adapt_unary_stream_event(implementation.unary_stream_event),
+                None,
+                None,
+            )
+        elif implementation.cardinality is cardinality.Cardinality.STREAM_UNARY:
+            return _SimpleMethodHandler(
+                True,
+                False,
+                request_deserializer,
+                response_serializer,
+                None,
+                None,
+                _adapt_stream_unary_event(implementation.stream_unary_event),
+                None,
+            )
+        elif (
+            implementation.cardinality is cardinality.Cardinality.STREAM_STREAM
+        ):
+            return _SimpleMethodHandler(
+                True,
+                True,
+                request_deserializer,
+                response_serializer,
+                None,
+                None,
+                None,
+                _adapt_stream_stream_event(implementation.stream_stream_event),
+            )
+    raise ValueError()
+
+
+def _flatten_method_pair_map(method_pair_map):
+    method_pair_map = method_pair_map or {}
+    flat_map = {}
+    for method_pair in method_pair_map:
+        method = _common.fully_qualified_method(method_pair[0], method_pair[1])
+        flat_map[method] = method_pair_map[method_pair]
+    return flat_map
+
+
+class _GenericRpcHandler(grpc.GenericRpcHandler):
+    def __init__(
+        self,
+        method_implementations,
+        multi_method_implementation,
+        request_deserializers,
+        response_serializers,
+    ):
+        self._method_implementations = _flatten_method_pair_map(
+            method_implementations
+        )
+        self._request_deserializers = _flatten_method_pair_map(
+            request_deserializers
+        )
+        self._response_serializers = _flatten_method_pair_map(
+            response_serializers
+        )
+        self._multi_method_implementation = multi_method_implementation
+
+    def service(self, handler_call_details):
+        method_implementation = self._method_implementations.get(
+            handler_call_details.method
+        )
+        if method_implementation is not None:
+            return _simple_method_handler(
+                method_implementation,
+                self._request_deserializers.get(handler_call_details.method),
+                self._response_serializers.get(handler_call_details.method),
+            )
+        elif self._multi_method_implementation is None:
+            return None
+        else:
+            try:
+                return None  # TODO(nathaniel): call the multimethod.
+            except face.NoSuchMethodError:
+                return None
+
+
+class _Server(interfaces.Server):
+    def __init__(self, grpc_server):
+        self._grpc_server = grpc_server
+
+    def add_insecure_port(self, address):
+        return self._grpc_server.add_insecure_port(address)
+
+    def add_secure_port(self, address, server_credentials):
+        return self._grpc_server.add_secure_port(address, server_credentials)
+
+    def start(self):
+        self._grpc_server.start()
+
+    def stop(self, grace):
+        return self._grpc_server.stop(grace)
+
+    def __enter__(self):
+        self._grpc_server.start()
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self._grpc_server.stop(None)
+        return False
+
+
+def server(
+    service_implementations,
+    multi_method_implementation,
+    request_deserializers,
+    response_serializers,
+    thread_pool,
+    thread_pool_size,
+):
+    generic_rpc_handler = _GenericRpcHandler(
+        service_implementations,
+        multi_method_implementation,
+        request_deserializers,
+        response_serializers,
+    )
+    if thread_pool is None:
+        effective_thread_pool = logging_pool.pool(
+            _DEFAULT_POOL_SIZE if thread_pool_size is None else thread_pool_size
+        )
+    else:
+        effective_thread_pool = thread_pool
+    return _Server(
+        grpc.server(effective_thread_pool, handlers=(generic_rpc_handler,))
+    )