123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281 |
- # Copyright 2021 Google LLC
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """ Challenges for reauthentication.
- """
- import abc
- import base64
- import getpass
- import sys
- from google.auth import _helpers
- from google.auth import exceptions
- from google.oauth2 import webauthn_handler_factory
- from google.oauth2.webauthn_types import (
- AuthenticationExtensionsClientInputs,
- GetRequest,
- PublicKeyCredentialDescriptor,
- )
- REAUTH_ORIGIN = "https://accounts.google.com"
- SAML_CHALLENGE_MESSAGE = (
- "Please run `gcloud auth login` to complete reauthentication with SAML."
- )
- WEBAUTHN_TIMEOUT_MS = 120000 # Two minute timeout
- def get_user_password(text):
- """Get password from user.
- Override this function with a different logic if you are using this library
- outside a CLI.
- Args:
- text (str): message for the password prompt.
- Returns:
- str: password string.
- """
- return getpass.getpass(text)
- class ReauthChallenge(metaclass=abc.ABCMeta):
- """Base class for reauth challenges."""
- @property
- @abc.abstractmethod
- def name(self): # pragma: NO COVER
- """Returns the name of the challenge."""
- raise NotImplementedError("name property must be implemented")
- @property
- @abc.abstractmethod
- def is_locally_eligible(self): # pragma: NO COVER
- """Returns true if a challenge is supported locally on this machine."""
- raise NotImplementedError("is_locally_eligible property must be implemented")
- @abc.abstractmethod
- def obtain_challenge_input(self, metadata): # pragma: NO COVER
- """Performs logic required to obtain credentials and returns it.
- Args:
- metadata (Mapping): challenge metadata returned in the 'challenges' field in
- the initial reauth request. Includes the 'challengeType' field
- and other challenge-specific fields.
- Returns:
- response that will be send to the reauth service as the content of
- the 'proposalResponse' field in the request body. Usually a dict
- with the keys specific to the challenge. For example,
- ``{'credential': password}`` for password challenge.
- """
- raise NotImplementedError("obtain_challenge_input method must be implemented")
- class PasswordChallenge(ReauthChallenge):
- """Challenge that asks for user's password."""
- @property
- def name(self):
- return "PASSWORD"
- @property
- def is_locally_eligible(self):
- return True
- @_helpers.copy_docstring(ReauthChallenge)
- def obtain_challenge_input(self, unused_metadata):
- passwd = get_user_password("Please enter your password:")
- if not passwd:
- passwd = " " # avoid the server crashing in case of no password :D
- return {"credential": passwd}
- class SecurityKeyChallenge(ReauthChallenge):
- """Challenge that asks for user's security key touch."""
- @property
- def name(self):
- return "SECURITY_KEY"
- @property
- def is_locally_eligible(self):
- return True
- @_helpers.copy_docstring(ReauthChallenge)
- def obtain_challenge_input(self, metadata):
- # Check if there is an available Webauthn Handler, if not use pyu2f
- try:
- factory = webauthn_handler_factory.WebauthnHandlerFactory()
- webauthn_handler = factory.get_handler()
- if webauthn_handler is not None:
- sys.stderr.write("Please insert and touch your security key\n")
- return self._obtain_challenge_input_webauthn(metadata, webauthn_handler)
- except Exception:
- # Attempt pyu2f if exception in webauthn flow
- pass
- try:
- import pyu2f.convenience.authenticator # type: ignore
- import pyu2f.errors # type: ignore
- import pyu2f.model # type: ignore
- except ImportError:
- raise exceptions.ReauthFailError(
- "pyu2f dependency is required to use Security key reauth feature. "
- "It can be installed via `pip install pyu2f` or `pip install google-auth[reauth]`."
- )
- sk = metadata["securityKey"]
- challenges = sk["challenges"]
- # Read both 'applicationId' and 'relyingPartyId', if they are the same, use
- # applicationId, if they are different, use relyingPartyId first and retry
- # with applicationId
- application_id = sk["applicationId"]
- relying_party_id = sk["relyingPartyId"]
- if application_id != relying_party_id:
- application_parameters = [relying_party_id, application_id]
- else:
- application_parameters = [application_id]
- challenge_data = []
- for c in challenges:
- kh = c["keyHandle"].encode("ascii")
- key = pyu2f.model.RegisteredKey(bytearray(base64.urlsafe_b64decode(kh)))
- challenge = c["challenge"].encode("ascii")
- challenge = base64.urlsafe_b64decode(challenge)
- challenge_data.append({"key": key, "challenge": challenge})
- # Track number of tries to suppress error message until all application_parameters
- # are tried.
- tries = 0
- for app_id in application_parameters:
- try:
- tries += 1
- api = pyu2f.convenience.authenticator.CreateCompositeAuthenticator(
- REAUTH_ORIGIN
- )
- response = api.Authenticate(
- app_id, challenge_data, print_callback=sys.stderr.write
- )
- return {"securityKey": response}
- except pyu2f.errors.U2FError as e:
- if e.code == pyu2f.errors.U2FError.DEVICE_INELIGIBLE:
- # Only show error if all app_ids have been tried
- if tries == len(application_parameters):
- sys.stderr.write("Ineligible security key.\n")
- return None
- continue
- if e.code == pyu2f.errors.U2FError.TIMEOUT:
- sys.stderr.write(
- "Timed out while waiting for security key touch.\n"
- )
- else:
- raise e
- except pyu2f.errors.PluginError as e:
- sys.stderr.write("Plugin error: {}.\n".format(e))
- continue
- except pyu2f.errors.NoDeviceFoundError:
- sys.stderr.write("No security key found.\n")
- return None
- def _obtain_challenge_input_webauthn(self, metadata, webauthn_handler):
- sk = metadata.get("securityKey")
- if sk is None:
- raise exceptions.InvalidValue("securityKey is None")
- challenges = sk.get("challenges")
- application_id = sk.get("applicationId")
- relying_party_id = sk.get("relyingPartyId")
- if challenges is None or len(challenges) < 1:
- raise exceptions.InvalidValue("challenges is None or empty")
- if application_id is None:
- raise exceptions.InvalidValue("application_id is None")
- if relying_party_id is None:
- raise exceptions.InvalidValue("relying_party_id is None")
- allow_credentials = []
- for challenge in challenges:
- kh = challenge.get("keyHandle")
- if kh is None:
- raise exceptions.InvalidValue("keyHandle is None")
- key_handle = self._unpadded_urlsafe_b64recode(kh)
- allow_credentials.append(PublicKeyCredentialDescriptor(id=key_handle))
- extension = AuthenticationExtensionsClientInputs(appid=application_id)
- challenge = challenges[0].get("challenge")
- if challenge is None:
- raise exceptions.InvalidValue("challenge is None")
- get_request = GetRequest(
- origin=REAUTH_ORIGIN,
- rpid=relying_party_id,
- challenge=self._unpadded_urlsafe_b64recode(challenge),
- timeout_ms=WEBAUTHN_TIMEOUT_MS,
- allow_credentials=allow_credentials,
- user_verification="required",
- extensions=extension,
- )
- try:
- get_response = webauthn_handler.get(get_request)
- except Exception as e:
- sys.stderr.write("Webauthn Error: {}.\n".format(e))
- raise e
- response = {
- "clientData": get_response.response.client_data_json,
- "authenticatorData": get_response.response.authenticator_data,
- "signatureData": get_response.response.signature,
- "applicationId": application_id,
- "keyHandle": get_response.id,
- "securityKeyReplyType": 2,
- }
- return {"securityKey": response}
- def _unpadded_urlsafe_b64recode(self, s):
- """Converts standard b64 encoded string to url safe b64 encoded string
- with no padding."""
- b = base64.urlsafe_b64decode(s)
- return base64.urlsafe_b64encode(b).decode().rstrip("=")
- class SamlChallenge(ReauthChallenge):
- """Challenge that asks the users to browse to their ID Providers.
- Currently SAML challenge is not supported. When obtaining the challenge
- input, exception will be raised to instruct the users to run
- `gcloud auth login` for reauthentication.
- """
- @property
- def name(self):
- return "SAML"
- @property
- def is_locally_eligible(self):
- return True
- def obtain_challenge_input(self, metadata):
- # Magic Arch has not fully supported returning a proper dedirect URL
- # for programmatic SAML users today. So we error our here and request
- # users to use gcloud to complete a login.
- raise exceptions.ReauthSamlChallengeFailError(SAML_CHALLENGE_MESSAGE)
- AVAILABLE_CHALLENGES = {
- challenge.name: challenge
- for challenge in [SecurityKeyChallenge(), PasswordChallenge(), SamlChallenge()]
- }
|