app_engine.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. # Copyright 2016 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Google App Engine standard environment support.
  15. This module provides authentication and signing for applications running on App
  16. Engine in the standard environment using the `App Identity API`_.
  17. .. _App Identity API:
  18. https://cloud.google.com/appengine/docs/python/appidentity/
  19. """
  20. import datetime
  21. from google.auth import _helpers
  22. from google.auth import credentials
  23. from google.auth import crypt
  24. # pytype: disable=import-error
  25. try:
  26. from google.appengine.api import app_identity
  27. except ImportError:
  28. app_identity = None
  29. # pytype: enable=import-error
  30. class Signer(crypt.Signer):
  31. """Signs messages using the App Engine App Identity service.
  32. This can be used in place of :class:`google.auth.crypt.Signer` when
  33. running in the App Engine standard environment.
  34. """
  35. @property
  36. def key_id(self):
  37. """Optional[str]: The key ID used to identify this private key.
  38. .. warning::
  39. This is always ``None``. The key ID used by App Engine can not
  40. be reliably determined ahead of time.
  41. """
  42. return None
  43. @_helpers.copy_docstring(crypt.Signer)
  44. def sign(self, message):
  45. message = _helpers.to_bytes(message)
  46. _, signature = app_identity.sign_blob(message)
  47. return signature
  48. def get_project_id():
  49. """Gets the project ID for the current App Engine application.
  50. Returns:
  51. str: The project ID
  52. Raises:
  53. EnvironmentError: If the App Engine APIs are unavailable.
  54. """
  55. # pylint: disable=missing-raises-doc
  56. # Pylint rightfully thinks EnvironmentError is OSError, but doesn't
  57. # realize it's a valid alias.
  58. if app_identity is None:
  59. raise EnvironmentError("The App Engine APIs are not available.")
  60. return app_identity.get_application_id()
  61. class Credentials(
  62. credentials.Scoped, credentials.Signing, credentials.CredentialsWithQuotaProject
  63. ):
  64. """App Engine standard environment credentials.
  65. These credentials use the App Engine App Identity API to obtain access
  66. tokens.
  67. """
  68. def __init__(
  69. self,
  70. scopes=None,
  71. default_scopes=None,
  72. service_account_id=None,
  73. quota_project_id=None,
  74. ):
  75. """
  76. Args:
  77. scopes (Sequence[str]): Scopes to request from the App Identity
  78. API.
  79. default_scopes (Sequence[str]): Default scopes passed by a
  80. Google client library. Use 'scopes' for user-defined scopes.
  81. service_account_id (str): The service account ID passed into
  82. :func:`google.appengine.api.app_identity.get_access_token`.
  83. If not specified, the default application service account
  84. ID will be used.
  85. quota_project_id (Optional[str]): The project ID used for quota
  86. and billing.
  87. Raises:
  88. EnvironmentError: If the App Engine APIs are unavailable.
  89. """
  90. # pylint: disable=missing-raises-doc
  91. # Pylint rightfully thinks EnvironmentError is OSError, but doesn't
  92. # realize it's a valid alias.
  93. if app_identity is None:
  94. raise EnvironmentError("The App Engine APIs are not available.")
  95. super(Credentials, self).__init__()
  96. self._scopes = scopes
  97. self._default_scopes = default_scopes
  98. self._service_account_id = service_account_id
  99. self._signer = Signer()
  100. self._quota_project_id = quota_project_id
  101. @_helpers.copy_docstring(credentials.Credentials)
  102. def refresh(self, request):
  103. scopes = self._scopes if self._scopes is not None else self._default_scopes
  104. # pylint: disable=unused-argument
  105. token, ttl = app_identity.get_access_token(scopes, self._service_account_id)
  106. expiry = datetime.datetime.utcfromtimestamp(ttl)
  107. self.token, self.expiry = token, expiry
  108. @property
  109. def service_account_email(self):
  110. """The service account email."""
  111. if self._service_account_id is None:
  112. self._service_account_id = app_identity.get_service_account_name()
  113. return self._service_account_id
  114. @property
  115. def requires_scopes(self):
  116. """Checks if the credentials requires scopes.
  117. Returns:
  118. bool: True if there are no scopes set otherwise False.
  119. """
  120. return not self._scopes and not self._default_scopes
  121. @_helpers.copy_docstring(credentials.Scoped)
  122. def with_scopes(self, scopes, default_scopes=None):
  123. return self.__class__(
  124. scopes=scopes,
  125. default_scopes=default_scopes,
  126. service_account_id=self._service_account_id,
  127. quota_project_id=self.quota_project_id,
  128. )
  129. @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
  130. def with_quota_project(self, quota_project_id):
  131. return self.__class__(
  132. scopes=self._scopes,
  133. service_account_id=self._service_account_id,
  134. quota_project_id=quota_project_id,
  135. )
  136. @_helpers.copy_docstring(credentials.Signing)
  137. def sign_bytes(self, message):
  138. return self._signer.sign(message)
  139. @property
  140. @_helpers.copy_docstring(credentials.Signing)
  141. def signer_email(self):
  142. return self.service_account_email
  143. @property
  144. @_helpers.copy_docstring(credentials.Signing)
  145. def signer(self):
  146. return self._signer