aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/google/oauth2/webauthn_handler.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/google/oauth2/webauthn_handler.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/google/oauth2/webauthn_handler.py')
-rw-r--r--.venv/lib/python3.12/site-packages/google/oauth2/webauthn_handler.py82
1 files changed, 82 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/webauthn_handler.py b/.venv/lib/python3.12/site-packages/google/oauth2/webauthn_handler.py
new file mode 100644
index 00000000..e27c7e09
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/google/oauth2/webauthn_handler.py
@@ -0,0 +1,82 @@
+import abc
+import os
+import struct
+import subprocess
+
+from google.auth import exceptions
+from google.oauth2.webauthn_types import GetRequest, GetResponse
+
+
+class WebAuthnHandler(abc.ABC):
+ @abc.abstractmethod
+ def is_available(self) -> bool:
+ """Check whether this WebAuthn handler is available"""
+ raise NotImplementedError("is_available method must be implemented")
+
+ @abc.abstractmethod
+ def get(self, get_request: GetRequest) -> GetResponse:
+ """WebAuthn get (assertion)"""
+ raise NotImplementedError("get method must be implemented")
+
+
+class PluginHandler(WebAuthnHandler):
+ """Offloads WebAuthn get reqeust to a pluggable command-line tool.
+
+ Offloads WebAuthn get to a plugin which takes the form of a
+ command-line tool. The command-line tool is configurable via the
+ PluginHandler._ENV_VAR environment variable.
+
+ The WebAuthn plugin should implement the following interface:
+
+ Communication occurs over stdin/stdout, and messages are both sent and
+ received in the form:
+
+ [4 bytes - payload size (little-endian)][variable bytes - json payload]
+ """
+
+ _ENV_VAR = "GOOGLE_AUTH_WEBAUTHN_PLUGIN"
+
+ def is_available(self) -> bool:
+ try:
+ self._find_plugin()
+ except Exception:
+ return False
+ else:
+ return True
+
+ def get(self, get_request: GetRequest) -> GetResponse:
+ request_json = get_request.to_json()
+ cmd = self._find_plugin()
+ response_json = self._call_plugin(cmd, request_json)
+ return GetResponse.from_json(response_json)
+
+ def _call_plugin(self, cmd: str, input_json: str) -> str:
+ # Calculate length of input
+ input_length = len(input_json)
+ length_bytes_le = struct.pack("<I", input_length)
+ request = length_bytes_le + input_json.encode()
+
+ # Call plugin
+ process_result = subprocess.run(
+ [cmd], input=request, capture_output=True, check=True
+ )
+
+ # Check length of response
+ response_len_le = process_result.stdout[:4]
+ response_len = struct.unpack("<I", response_len_le)[0]
+ response = process_result.stdout[4:]
+ if response_len != len(response):
+ raise exceptions.MalformedError(
+ "Plugin response length {} does not match data {}".format(
+ response_len, len(response)
+ )
+ )
+ return response.decode()
+
+ def _find_plugin(self) -> str:
+ plugin_cmd = os.environ.get(PluginHandler._ENV_VAR)
+ if plugin_cmd is None:
+ raise exceptions.InvalidResource(
+ "{} env var is not set".format(PluginHandler._ENV_VAR)
+ )
+ return plugin_cmd