challenges.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. # Copyright 2021 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """ Challenges for reauthentication.
  15. """
  16. import abc
  17. import base64
  18. import getpass
  19. import sys
  20. from google.auth import _helpers
  21. from google.auth import exceptions
  22. REAUTH_ORIGIN = "https://accounts.google.com"
  23. SAML_CHALLENGE_MESSAGE = (
  24. "Please run `gcloud auth login` to complete reauthentication with SAML."
  25. )
  26. def get_user_password(text):
  27. """Get password from user.
  28. Override this function with a different logic if you are using this library
  29. outside a CLI.
  30. Args:
  31. text (str): message for the password prompt.
  32. Returns:
  33. str: password string.
  34. """
  35. return getpass.getpass(text)
  36. class ReauthChallenge(metaclass=abc.ABCMeta):
  37. """Base class for reauth challenges."""
  38. @property
  39. @abc.abstractmethod
  40. def name(self): # pragma: NO COVER
  41. """Returns the name of the challenge."""
  42. raise NotImplementedError("name property must be implemented")
  43. @property
  44. @abc.abstractmethod
  45. def is_locally_eligible(self): # pragma: NO COVER
  46. """Returns true if a challenge is supported locally on this machine."""
  47. raise NotImplementedError("is_locally_eligible property must be implemented")
  48. @abc.abstractmethod
  49. def obtain_challenge_input(self, metadata): # pragma: NO COVER
  50. """Performs logic required to obtain credentials and returns it.
  51. Args:
  52. metadata (Mapping): challenge metadata returned in the 'challenges' field in
  53. the initial reauth request. Includes the 'challengeType' field
  54. and other challenge-specific fields.
  55. Returns:
  56. response that will be send to the reauth service as the content of
  57. the 'proposalResponse' field in the request body. Usually a dict
  58. with the keys specific to the challenge. For example,
  59. ``{'credential': password}`` for password challenge.
  60. """
  61. raise NotImplementedError("obtain_challenge_input method must be implemented")
  62. class PasswordChallenge(ReauthChallenge):
  63. """Challenge that asks for user's password."""
  64. @property
  65. def name(self):
  66. return "PASSWORD"
  67. @property
  68. def is_locally_eligible(self):
  69. return True
  70. @_helpers.copy_docstring(ReauthChallenge)
  71. def obtain_challenge_input(self, unused_metadata):
  72. passwd = get_user_password("Please enter your password:")
  73. if not passwd:
  74. passwd = " " # avoid the server crashing in case of no password :D
  75. return {"credential": passwd}
  76. class SecurityKeyChallenge(ReauthChallenge):
  77. """Challenge that asks for user's security key touch."""
  78. @property
  79. def name(self):
  80. return "SECURITY_KEY"
  81. @property
  82. def is_locally_eligible(self):
  83. return True
  84. @_helpers.copy_docstring(ReauthChallenge)
  85. def obtain_challenge_input(self, metadata):
  86. try:
  87. import pyu2f.convenience.authenticator # type: ignore
  88. import pyu2f.errors # type: ignore
  89. import pyu2f.model # type: ignore
  90. except ImportError:
  91. raise exceptions.ReauthFailError(
  92. "pyu2f dependency is required to use Security key reauth feature. "
  93. "It can be installed via `pip install pyu2f` or `pip install google-auth[reauth]`."
  94. )
  95. sk = metadata["securityKey"]
  96. challenges = sk["challenges"]
  97. # Read both 'applicationId' and 'relyingPartyId', if they are the same, use
  98. # applicationId, if they are different, use relyingPartyId first and retry
  99. # with applicationId
  100. application_id = sk["applicationId"]
  101. relying_party_id = sk["relyingPartyId"]
  102. if application_id != relying_party_id:
  103. application_parameters = [relying_party_id, application_id]
  104. else:
  105. application_parameters = [application_id]
  106. challenge_data = []
  107. for c in challenges:
  108. kh = c["keyHandle"].encode("ascii")
  109. key = pyu2f.model.RegisteredKey(bytearray(base64.urlsafe_b64decode(kh)))
  110. challenge = c["challenge"].encode("ascii")
  111. challenge = base64.urlsafe_b64decode(challenge)
  112. challenge_data.append({"key": key, "challenge": challenge})
  113. # Track number of tries to suppress error message until all application_parameters
  114. # are tried.
  115. tries = 0
  116. for app_id in application_parameters:
  117. try:
  118. tries += 1
  119. api = pyu2f.convenience.authenticator.CreateCompositeAuthenticator(
  120. REAUTH_ORIGIN
  121. )
  122. response = api.Authenticate(
  123. app_id, challenge_data, print_callback=sys.stderr.write
  124. )
  125. return {"securityKey": response}
  126. except pyu2f.errors.U2FError as e:
  127. if e.code == pyu2f.errors.U2FError.DEVICE_INELIGIBLE:
  128. # Only show error if all app_ids have been tried
  129. if tries == len(application_parameters):
  130. sys.stderr.write("Ineligible security key.\n")
  131. return None
  132. continue
  133. if e.code == pyu2f.errors.U2FError.TIMEOUT:
  134. sys.stderr.write(
  135. "Timed out while waiting for security key touch.\n"
  136. )
  137. else:
  138. raise e
  139. except pyu2f.errors.PluginError as e:
  140. sys.stderr.write("Plugin error: {}.\n".format(e))
  141. continue
  142. except pyu2f.errors.NoDeviceFoundError:
  143. sys.stderr.write("No security key found.\n")
  144. return None
  145. class SamlChallenge(ReauthChallenge):
  146. """Challenge that asks the users to browse to their ID Providers.
  147. Currently SAML challenge is not supported. When obtaining the challenge
  148. input, exception will be raised to instruct the users to run
  149. `gcloud auth login` for reauthentication.
  150. """
  151. @property
  152. def name(self):
  153. return "SAML"
  154. @property
  155. def is_locally_eligible(self):
  156. return True
  157. def obtain_challenge_input(self, metadata):
  158. # Magic Arch has not fully supported returning a proper dedirect URL
  159. # for programmatic SAML users today. So we error our here and request
  160. # users to use gcloud to complete a login.
  161. raise exceptions.ReauthSamlChallengeFailError(SAML_CHALLENGE_MESSAGE)
  162. AVAILABLE_CHALLENGES = {
  163. challenge.name: challenge
  164. for challenge in [SecurityKeyChallenge(), PasswordChallenge(), SamlChallenge()]
  165. }