iam.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. # Copyright 2017 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Tools for using the Google `Cloud Identity and Access Management (IAM)
  15. API`_'s auth-related functionality.
  16. .. _Cloud Identity and Access Management (IAM) API:
  17. https://cloud.google.com/iam/docs/
  18. """
  19. import base64
  20. import http.client as http_client
  21. import json
  22. from google.auth import _exponential_backoff
  23. from google.auth import _helpers
  24. from google.auth import credentials
  25. from google.auth import crypt
  26. from google.auth import exceptions
  27. IAM_RETRY_CODES = {
  28. http_client.INTERNAL_SERVER_ERROR,
  29. http_client.BAD_GATEWAY,
  30. http_client.SERVICE_UNAVAILABLE,
  31. http_client.GATEWAY_TIMEOUT,
  32. }
  33. _IAM_SCOPE = ["https://www.googleapis.com/auth/iam"]
  34. _IAM_ENDPOINT = (
  35. "https://iamcredentials.googleapis.com/v1/projects/-"
  36. + "/serviceAccounts/{}:generateAccessToken"
  37. )
  38. _IAM_SIGN_ENDPOINT = (
  39. "https://iamcredentials.googleapis.com/v1/projects/-"
  40. + "/serviceAccounts/{}:signBlob"
  41. )
  42. _IAM_SIGNJWT_ENDPOINT = (
  43. "https://iamcredentials.googleapis.com/v1/projects/-"
  44. + "/serviceAccounts/{}:signJwt"
  45. )
  46. _IAM_IDTOKEN_ENDPOINT = (
  47. "https://iamcredentials.googleapis.com/v1/"
  48. + "projects/-/serviceAccounts/{}:generateIdToken"
  49. )
  50. class Signer(crypt.Signer):
  51. """Signs messages using the IAM `signBlob API`_.
  52. This is useful when you need to sign bytes but do not have access to the
  53. credential's private key file.
  54. .. _signBlob API:
  55. https://cloud.google.com/iam/reference/rest/v1/projects.serviceAccounts
  56. /signBlob
  57. """
  58. def __init__(self, request, credentials, service_account_email):
  59. """
  60. Args:
  61. request (google.auth.transport.Request): The object used to make
  62. HTTP requests.
  63. credentials (google.auth.credentials.Credentials): The credentials
  64. that will be used to authenticate the request to the IAM API.
  65. The credentials must have of one the following scopes:
  66. - https://www.googleapis.com/auth/iam
  67. - https://www.googleapis.com/auth/cloud-platform
  68. service_account_email (str): The service account email identifying
  69. which service account to use to sign bytes. Often, this can
  70. be the same as the service account email in the given
  71. credentials.
  72. """
  73. self._request = request
  74. self._credentials = credentials
  75. self._service_account_email = service_account_email
  76. def _make_signing_request(self, message):
  77. """Makes a request to the API signBlob API."""
  78. message = _helpers.to_bytes(message)
  79. method = "POST"
  80. url = _IAM_SIGN_ENDPOINT.replace(
  81. credentials.DEFAULT_UNIVERSE_DOMAIN, self._credentials.universe_domain
  82. ).format(self._service_account_email)
  83. headers = {"Content-Type": "application/json"}
  84. body = json.dumps(
  85. {"payload": base64.b64encode(message).decode("utf-8")}
  86. ).encode("utf-8")
  87. retries = _exponential_backoff.ExponentialBackoff()
  88. for _ in retries:
  89. self._credentials.before_request(self._request, method, url, headers)
  90. response = self._request(url=url, method=method, body=body, headers=headers)
  91. if response.status in IAM_RETRY_CODES:
  92. continue
  93. if response.status != http_client.OK:
  94. raise exceptions.TransportError(
  95. "Error calling the IAM signBlob API: {}".format(response.data)
  96. )
  97. return json.loads(response.data.decode("utf-8"))
  98. raise exceptions.TransportError("exhausted signBlob endpoint retries")
  99. @property
  100. def key_id(self):
  101. """Optional[str]: The key ID used to identify this private key.
  102. .. warning::
  103. This is always ``None``. The key ID used by IAM can not
  104. be reliably determined ahead of time.
  105. """
  106. return None
  107. @_helpers.copy_docstring(crypt.Signer)
  108. def sign(self, message):
  109. response = self._make_signing_request(message)
  110. return base64.b64decode(response["signedBlob"])