about summary refs log tree commit diff
path: root/gn3/auth/authentication/oauth2/models/oauth2client.py
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2022-12-19 16:02:19 +0300
committerFrederick Muriuki Muriithi2022-12-22 09:05:53 +0300
commitb0641272491eb51d321b1b8a7d062e395e70800f (patch)
treec9b2065ea60399579c4c4d84c648b61ed67402ba /gn3/auth/authentication/oauth2/models/oauth2client.py
parente9031e28594fcd21371adb2b9b26e17a1df95599 (diff)
downloadgenenetwork3-b0641272491eb51d321b1b8a7d062e395e70800f.tar.gz
auth: implement OAuth2 flow. oauth2_auth_flow
Add code to implement the OAuth2 flow.

* Add test fixtures for setting up users and OAuth2 clients
* Add tests for token generation with the "Password Grant" flow
* Fix some issues with test due to changes in the database connection's
  row_factory
Diffstat (limited to 'gn3/auth/authentication/oauth2/models/oauth2client.py')
-rw-r--r--gn3/auth/authentication/oauth2/models/oauth2client.py141
1 files changed, 141 insertions, 0 deletions
diff --git a/gn3/auth/authentication/oauth2/models/oauth2client.py b/gn3/auth/authentication/oauth2/models/oauth2client.py
new file mode 100644
index 0000000..2ee7858
--- /dev/null
+++ b/gn3/auth/authentication/oauth2/models/oauth2client.py
@@ -0,0 +1,141 @@
+"""OAuth2 Client model."""
+import json
+import uuid
+import datetime
+from typing import NamedTuple, Sequence
+
+from pymonad.maybe import Just, Maybe, Nothing
+
+from gn3.auth import db
+from gn3.auth.authentication.users import User, user_by_id
+
+class OAuth2Client(NamedTuple):
+    """
+    Client to the OAuth2 Server.
+
+    This is defined according to the mixin at
+    https://docs.authlib.org/en/latest/specs/rfc6749.html#authlib.oauth2.rfc6749.ClientMixin
+    """
+    client_id: uuid.UUID
+    client_secret: str
+    client_id_issued_at: datetime.datetime
+    client_secret_expires_at: datetime.datetime
+    client_metadata: dict
+    user: User
+
+    def check_client_secret(self, client_secret: str) -> bool:
+        """Check whether the `client_secret` matches this client."""
+        return self.client_secret == client_secret
+
+    @property
+    def token_endpoint_auth_method(self) -> str:
+        """Return the token endpoint authorisation method."""
+        return self.client_metadata.get("token_endpoint_auth_method", ["none"])
+
+    @property
+    def client_type(self) -> str:
+        """Return the token endpoint authorisation method."""
+        return self.client_metadata.get("client_type", "public")
+
+    def check_endpoint_auth_method(self, method: str, endpoint: str) -> bool:
+        """
+        Check if the client supports the given method for the given endpoint.
+
+        Acceptable methods:
+        * none: Client is a public client and does not have a client secret
+        * client_secret_post: Client uses the HTTP POST parameters
+        * client_secret_basic: Client uses HTTP Basic
+        """
+        if endpoint == "token":
+            return (method in self.token_endpoint_auth_method
+                    and method == "client_secret_post")
+        if endpoint in ("introspection", "revoke"):
+            return (method in self.token_endpoint_auth_method
+                    and method == "client_secret_basic")
+        return False
+
+    @property
+    def id(self):# pylint: disable=[invalid-name]
+        """Return the client_id."""
+        return self.client_id
+
+    @property
+    def grant_types(self) -> Sequence[str]:
+        """
+        Return the grant types that this client supports.
+
+        Valid grant types:
+        * authorisation_code
+        * implicit
+        * client_credentials
+        * password
+        """
+        return self.client_metadata.get("grant_types", [])
+
+    def check_grant_type(self, grant_type: str) -> bool:
+        """
+        Validate that client can handle the given grant types
+        """
+        return grant_type in self.grant_types
+
+    @property
+    def redirect_uris(self) -> Sequence[str]:
+        """Return the redirect_uris that this client supports."""
+        return self.client_metadata.get('redirect_uris', [])
+
+    def check_redirect_uri(self, redirect_uri: str) -> bool:
+        """
+        Check whether the given `redirect_uri` is one of the expected ones.
+        """
+        return redirect_uri in self.redirect_uris
+
+    @property
+    def response_types(self) -> Sequence[str]:
+        """Return the response_types that this client supports."""
+        return self.client_metadata.get("response_types", [])
+
+    def check_response_type(self, response_type: str) -> bool:
+        """Check whether this client supports `response_type`."""
+        return response_type in self.response_types
+
+    @property
+    def scope(self) -> Sequence[str]:
+        """Return valid scopes for this client."""
+        return tuple(set(self.client_metadata.get("scope", [])))
+
+    def get_allowed_scope(self, scope: str) -> str:
+        """Return list of scopes in `scope` that are supported by this client."""
+        if not bool(scope):
+            return ""
+        requested = scope.split()
+        return " ".join(sorted(set(
+            scp for scp in requested if scp in self.scope)))
+
+    def get_client_id(self):
+        """Return this client's identifier."""
+        return self.client_id
+
+    def get_default_redirect_uri(self) -> str:
+        """Return the default redirect uri"""
+        return self.client_metadata.get("default_redirect_uri", "")
+
+def client(conn: db.DbConnection, client_id: uuid.UUID) -> Maybe:
+    """Retrieve a client by its ID"""
+    with db.cursor(conn) as cursor:
+        cursor.execute(
+            "SELECT * FROM oauth2_clients WHERE client_id=?", (str(client_id),))
+        result = cursor.fetchone()
+        if result:
+            return Just(
+                OAuth2Client(uuid.UUID(result["client_id"]),
+                             result["client_secret"],
+                             datetime.datetime.fromtimestamp(
+                                 result["client_id_issued_at"]),
+                             datetime.datetime.fromtimestamp(
+                                 result["client_secret_expires_at"]),
+                             json.loads(result["client_metadata"]),
+                             user_by_id( # type: ignore[misc]
+                                 conn, uuid.UUID(result["user_id"])).maybe(
+                                     None, lambda usr: usr)))
+
+    return Nothing