diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/msrest/authentication.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/msrest/authentication.py | 306 |
1 files changed, 306 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/msrest/authentication.py b/.venv/lib/python3.12/site-packages/msrest/authentication.py new file mode 100644 index 00000000..d7d7cad2 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/msrest/authentication.py @@ -0,0 +1,306 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +from typing import Optional, Dict + +import requests +from requests.auth import HTTPBasicAuth +import requests_oauthlib as oauth + + +class Authentication(object): + """Default, simple auth object. + Doesn't actually add any auth headers. + """ + + header = "Authorization" + + def signed_session(self, session=None): + # type: (Optional[requests.Session]) -> requests.Session + """Create requests session with any required auth headers applied. + + If a session object is provided, configure it directly. Otherwise, + create a new session and return it. + + :param session: The session to configure for authentication + :type session: requests.Session + :rtype: requests.Session + """ + return session or requests.Session() + + +class BasicAuthentication(Authentication): + """Implementation of Basic Authentication. + + :param str username: Authentication username. + :param str password: Authentication password. + """ + + def __init__(self, username, password): + # type: (str, str) -> None + self.scheme = 'Basic' + self.username = username + self.password = password + + def signed_session(self, session=None): + # type: (Optional[requests.Session]) -> requests.Session + """Create requests session with any required auth headers + applied. + + If a session object is provided, configure it directly. Otherwise, + create a new session and return it. + + :param session: The session to configure for authentication + :type session: requests.Session + :rtype: requests.Session + """ + session = super(BasicAuthentication, self).signed_session(session) + session.auth = HTTPBasicAuth(self.username, self.password) + return session + + +class BasicTokenAuthentication(Authentication): + """Simple Token Authentication. + Does not adhere to OAuth, simply adds provided token as a header. + + :param dict[str,str] token: Authentication token, must have 'access_token' key. + """ + + def __init__(self, token): + # type: (Dict[str, str]) -> None + self.scheme = 'Bearer' + self.token = token + + def set_token(self): + # type: () -> None + """Should be used to define the self.token attribute. + + In this implementation, does nothing since the token is statically provided + at creation. + """ + pass + + def signed_session(self, session=None): + # type: (Optional[requests.Session]) -> requests.Session + """Create requests session with any required auth headers + applied. + + If a session object is provided, configure it directly. Otherwise, + create a new session and return it. + + :param session: The session to configure for authentication + :type session: requests.Session + :rtype: requests.Session + """ + session = super(BasicTokenAuthentication, self).signed_session(session) + header = "{} {}".format(self.scheme, self.token['access_token']) + session.headers['Authorization'] = header + return session + + +class OAuthTokenAuthentication(BasicTokenAuthentication): + """OAuth Token Authentication. + + Requires that supplied token contains an expires_in field. + + :param str client_id: Account Client ID. + :param dict[str,str] token: OAuth2 token. + """ + + def __init__(self, client_id, token): + # type: (str, Dict[str, str]) -> None + super(OAuthTokenAuthentication, self).__init__(token) + self.id = client_id + self.store_key = self.id + + def construct_auth(self): + # type: () -> str + """Format token header. + + :rtype: str + """ + return "{} {}".format(self.scheme, self.token) + + def refresh_session(self, session=None): + # type: (Optional[requests.Session]) -> requests.Session + """Return updated session if token has expired, attempts to + refresh using refresh token. + + If a session object is provided, configure it directly. Otherwise, + create a new session and return it. + + :param session: The session to configure for authentication + :type session: requests.Session + :rtype: requests.Session + """ + return self.signed_session(session) + + def signed_session(self, session=None): + # type: (Optional[requests.Session]) -> requests.Session + """Create requests session with any required auth headers applied. + + If a session object is provided, configure it directly. Otherwise, + create a new session and return it. + + :param session: The session to configure for authentication + :type session: requests.Session + :rtype: requests.Session + """ + session = session or requests.Session() # Don't call super on purpose, let's "auth" manage the headers. + session.auth = oauth.OAuth2(self.id, token=self.token) + return session + + +class KerberosAuthentication(Authentication): + """Kerberos Authentication + Kerberos Single Sign On (SSO); requires requests_kerberos is installed. + + :param mutual_authentication: whether to require mutual authentication. Use values from requests_kerberos import REQUIRED, OPTIONAL, or DISABLED + """ + def __init__(self, mutual_authentication=None): + super(KerberosAuthentication, self).__init__() + self.mutual_authentication = mutual_authentication + + def signed_session(self, session=None): + """Create requests session with Negotiate (SPNEGO) headers applied. + + If a session object is provided, configure it directly. Otherwise, + create a new session and return it. + + :param session: The session to configure for authentication + :type session: requests.Session + :rtype: requests.Session + """ + session = super(KerberosAuthentication, self).signed_session(session) + try: + from requests_kerberos import HTTPKerberosAuth + except ImportError: + raise ImportError("In order to use KerberosAuthentication please do 'pip install requests_kerberos' first") + if self.mutual_authentication: + session.auth = HTTPKerberosAuth(mutual_authentication=self.mutual_authentication) + else: + session.auth = HTTPKerberosAuth() + return session + + +class ApiKeyCredentials(Authentication): + """Represent the ApiKey feature of Swagger. + + Dict should be dict[str,str] to be accepted by requests. + + :param dict[str,str] in_headers: Headers part of the ApiKey + :param dict[str,str] in_query: ApiKey in the query as parameters + """ + def __init__(self, in_headers=None, in_query=None): + # type: (Optional[Dict[str, str]], Optional[Dict[str, str]]) -> None + super(ApiKeyCredentials, self).__init__() + if in_headers is None: + in_headers = {} + if in_query is None: + in_query = {} + + if not in_headers and not in_query: + raise ValueError("You need to define in_headers or in_query") + + self.in_headers = in_headers + self.in_query = in_query + + def signed_session(self, session=None): + # type: (Optional[requests.Session]) -> requests.Session + """Create requests session with ApiKey. + + If a session object is provided, configure it directly. Otherwise, + create a new session and return it. + + :param session: The session to configure for authentication + :type session: requests.Session + :rtype: requests.Session + """ + session = super(ApiKeyCredentials, self).signed_session(session) + session.headers.update(self.in_headers) + try: + # params is actually Union[bytes, MutableMapping[Text, Text]] + session.params.update(self.in_query) # type: ignore + except AttributeError: # requests.params can be bytes + raise ValueError("session.params must be a dict to be used in ApiKeyCredentials") + return session + + +class CognitiveServicesCredentials(ApiKeyCredentials): + """Cognitive Services authentication. + + :param str subscription_key: The CS subscription key + """ + + _subscription_key_header = 'Ocp-Apim-Subscription-Key' + + def __init__(self, subscription_key): + # type: (str) -> None + if not subscription_key: + raise ValueError("Subscription key cannot be None") + super(CognitiveServicesCredentials, self).__init__( + in_headers={ + self._subscription_key_header: subscription_key, + 'X-BingApis-SDK-Client': 'Python-SDK' + } + ) + + +class TopicCredentials(ApiKeyCredentials): + """Event Grid authentication. + + :param str topic_key: The Event Grid topic key + """ + + _topic_key_header = 'aeg-sas-key' + + def __init__(self, topic_key): + # type: (str) -> None + if not topic_key: + raise ValueError("Topic key cannot be None") + super(TopicCredentials, self).__init__( + in_headers={ + self._topic_key_header: topic_key, + } + ) + + +class DomainCredentials(ApiKeyCredentials): + """Event Grid domain authentication. + + :param str domain_key: The Event Grid domain key + """ + + _domain_key_header = 'aeg-sas-key' + + def __init__(self, domain_key): + # type: (str) -> None + if not domain_key: + raise ValueError("Domain key cannot be None") + super(DomainCredentials, self).__init__( + in_headers={ + self._domain_key_header: domain_key, + } + ) |