aboutsummaryrefslogtreecommitdiff
path: root/gn3/auth/authentication/oauth2/models/oauth2client.py
blob: da20200cc19673b262f8c29493273a3f644750cd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
"""OAuth2 Client model."""
import json
import uuid
import datetime
from typing import Sequence, Optional, NamedTuple

from pymonad.maybe import Just, Maybe, Nothing

from gn3.auth import db
from gn3.auth.authentication.users import User, user_by_id, same_password

from gn3.auth.authorisation.errors import NotFoundError

class OAuth2Client(NamedTuple):
    """
    Client to the OAuth2 Server.

    This is defined according to the mixin at
    https://docs.authlib.org/en/latest/specs/rfc6749.html#authlib.oauth2.rfc6749.ClientMixin
    """
    client_id: uuid.UUID
    client_secret: str
    client_id_issued_at: datetime.datetime
    client_secret_expires_at: datetime.datetime
    client_metadata: dict
    user: User

    def check_client_secret(self, client_secret: str) -> bool:
        """Check whether the `client_secret` matches this client."""
        return self.client_secret == client_secret

    @property
    def token_endpoint_auth_method(self) -> str:
        """Return the token endpoint authorisation method."""
        return self.client_metadata.get("token_endpoint_auth_method", ["none"])

    @property
    def client_type(self) -> str:
        """
        Return the token endpoint authorisation method.

        Acceptable client types:
        * public: Unable to use registered client secrets, e.g. browsers, apps
          on mobile devices.
        * confidential: able to securely authenticate with authorisation server
          e.g. being able to keep their registered client secret safe.
        """
        return self.client_metadata.get("client_type", "public")

    def check_endpoint_auth_method(self, method: str, endpoint: str) -> bool:
        """
        Check if the client supports the given method for the given endpoint.

        Acceptable methods:
        * none: Client is a public client and does not have a client secret
        * client_secret_post: Client uses the HTTP POST parameters
        * client_secret_basic: Client uses HTTP Basic
        """
        if endpoint == "token":
            return (method in self.token_endpoint_auth_method
                    and method == "client_secret_post")
        if endpoint in ("introspection", "revoke"):
            return (method in self.token_endpoint_auth_method
                    and method == "client_secret_basic")
        return False

    @property
    def id(self):# pylint: disable=[invalid-name]
        """Return the client_id."""
        return self.client_id

    @property
    def grant_types(self) -> Sequence[str]:
        """
        Return the grant types that this client supports.

        Valid grant types:
        * authorisation_code
        * implicit
        * client_credentials
        * password
        """
        return self.client_metadata.get("grant_types", [])

    def check_grant_type(self, grant_type: str) -> bool:
        """
        Validate that client can handle the given grant types
        """
        return grant_type in self.grant_types

    @property
    def redirect_uris(self) -> Sequence[str]:
        """Return the redirect_uris that this client supports."""
        return self.client_metadata.get('redirect_uris', [])

    def check_redirect_uri(self, redirect_uri: str) -> bool:
        """
        Check whether the given `redirect_uri` is one of the expected ones.
        """
        return redirect_uri in self.redirect_uris

    @property
    def response_types(self) -> Sequence[str]:
        """Return the response_types that this client supports."""
        return self.client_metadata.get("response_type", [])

    def check_response_type(self, response_type: str) -> bool:
        """Check whether this client supports `response_type`."""
        return response_type in self.response_types

    @property
    def scope(self) -> Sequence[str]:
        """Return valid scopes for this client."""
        return tuple(set(self.client_metadata.get("scope", [])))

    def get_allowed_scope(self, scope: str) -> str:
        """Return list of scopes in `scope` that are supported by this client."""
        if not bool(scope):
            return ""
        requested = scope.split()
        return " ".join(sorted(set(
            scp for scp in requested if scp in self.scope)))

    def get_client_id(self):
        """Return this client's identifier."""
        return self.client_id

    def get_default_redirect_uri(self) -> str:
        """Return the default redirect uri"""
        return self.client_metadata.get("default_redirect_uri", "")

def client(conn: db.DbConnection, client_id: uuid.UUID,
           user: Optional[User] = None) -> Maybe:
    """Retrieve a client by its ID"""
    with db.cursor(conn) as cursor:
        cursor.execute(
            "SELECT * FROM oauth2_clients WHERE client_id=?", (str(client_id),))
        result = cursor.fetchone()
        the_user = user
        if result:
            if not bool(the_user):
                try:
                    the_user = user_by_id(conn, result["user_id"])
                except NotFoundError as _nfe:
                    the_user = None

            return Just(
                OAuth2Client(uuid.UUID(result["client_id"]),
                             result["client_secret"],
                             datetime.datetime.fromtimestamp(
                                 result["client_id_issued_at"]),
                             datetime.datetime.fromtimestamp(
                                 result["client_secret_expires_at"]),
                             json.loads(result["client_metadata"]),
                             the_user))# type: ignore[arg-type]

    return Nothing

def client_by_id_and_secret(conn: db.DbConnection, client_id: uuid.UUID,
                            client_secret: str) -> OAuth2Client:
    """Retrieve a client by its ID and secret"""
    with db.cursor(conn) as cursor:
        cursor.execute(
            "SELECT * FROM oauth2_clients WHERE client_id=?",
            (str(client_id),))
        row = cursor.fetchone()
        if bool(row) and same_password(client_secret, row["client_secret"]):
            return OAuth2Client(
                client_id, client_secret,
                datetime.datetime.fromtimestamp(row["client_id_issued_at"]),
                datetime.datetime.fromtimestamp(
                    row["client_secret_expires_at"]),
                json.loads(row["client_metadata"]),
                user_by_id(conn, uuid.UUID(row["user_id"])))

        raise NotFoundError("Could not find client with the given credentials.")

def save_client(conn: db.DbConnection, the_client: OAuth2Client) -> OAuth2Client:
    """Persist the client details into the database."""
    with db.cursor(conn) as cursor:
        query = (
            "INSERT INTO oauth2_clients "
            "(client_id, client_secret, client_id_issued_at, "
            "client_secret_expires_at, client_metadata, user_id) "
            "VALUES "
            "(:client_id, :client_secret, :client_id_issued_at, "
            ":client_secret_expires_at, :client_metadata, :user_id) "
            "ON CONFLICT (client_id) DO UPDATE SET "
            "client_secret=:client_secret, "
            "client_id_issued_at=:client_id_issued_at, "
            "client_secret_expires_at=:client_secret_expires_at, "
            "client_metadata=:client_metadata, user_id=:user_id")
        cursor.execute(
            query,
            {
                "client_id": str(the_client.client_id),
                "client_secret": the_client.client_secret,
                "client_id_issued_at": (
                    the_client.client_id_issued_at.timestamp()),
                "client_secret_expires_at": (
                    the_client.client_secret_expires_at.timestamp()),
                "client_metadata": json.dumps(the_client.client_metadata),
                "user_id": str(the_client.user.user_id)
            })
        return the_client