about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/google/oauth2/webauthn_handler.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/google/oauth2/webauthn_handler.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/google/oauth2/webauthn_handler.py')
-rw-r--r--.venv/lib/python3.12/site-packages/google/oauth2/webauthn_handler.py82
1 files changed, 82 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/webauthn_handler.py b/.venv/lib/python3.12/site-packages/google/oauth2/webauthn_handler.py
new file mode 100644
index 00000000..e27c7e09
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/google/oauth2/webauthn_handler.py
@@ -0,0 +1,82 @@
+import abc
+import os
+import struct
+import subprocess
+
+from google.auth import exceptions
+from google.oauth2.webauthn_types import GetRequest, GetResponse
+
+
+class WebAuthnHandler(abc.ABC):
+    @abc.abstractmethod
+    def is_available(self) -> bool:
+        """Check whether this WebAuthn handler is available"""
+        raise NotImplementedError("is_available method must be implemented")
+
+    @abc.abstractmethod
+    def get(self, get_request: GetRequest) -> GetResponse:
+        """WebAuthn get (assertion)"""
+        raise NotImplementedError("get method must be implemented")
+
+
+class PluginHandler(WebAuthnHandler):
+    """Offloads WebAuthn get reqeust to a pluggable command-line tool.
+
+    Offloads WebAuthn get to a plugin which takes the form of a
+    command-line tool. The command-line tool is configurable via the
+    PluginHandler._ENV_VAR environment variable.
+
+    The WebAuthn plugin should implement the following interface:
+
+    Communication occurs over stdin/stdout, and messages are both sent and
+    received in the form:
+
+    [4 bytes - payload size (little-endian)][variable bytes - json payload]
+    """
+
+    _ENV_VAR = "GOOGLE_AUTH_WEBAUTHN_PLUGIN"
+
+    def is_available(self) -> bool:
+        try:
+            self._find_plugin()
+        except Exception:
+            return False
+        else:
+            return True
+
+    def get(self, get_request: GetRequest) -> GetResponse:
+        request_json = get_request.to_json()
+        cmd = self._find_plugin()
+        response_json = self._call_plugin(cmd, request_json)
+        return GetResponse.from_json(response_json)
+
+    def _call_plugin(self, cmd: str, input_json: str) -> str:
+        # Calculate length of input
+        input_length = len(input_json)
+        length_bytes_le = struct.pack("<I", input_length)
+        request = length_bytes_le + input_json.encode()
+
+        # Call plugin
+        process_result = subprocess.run(
+            [cmd], input=request, capture_output=True, check=True
+        )
+
+        # Check length of response
+        response_len_le = process_result.stdout[:4]
+        response_len = struct.unpack("<I", response_len_le)[0]
+        response = process_result.stdout[4:]
+        if response_len != len(response):
+            raise exceptions.MalformedError(
+                "Plugin response length {} does not match data {}".format(
+                    response_len, len(response)
+                )
+            )
+        return response.decode()
+
+    def _find_plugin(self) -> str:
+        plugin_cmd = os.environ.get(PluginHandler._ENV_VAR)
+        if plugin_cmd is None:
+            raise exceptions.InvalidResource(
+                "{} env var is not set".format(PluginHandler._ENV_VAR)
+            )
+        return plugin_cmd