gdch_credentials.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. # Copyright 2022 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Experimental GDCH credentials support.
  15. """
  16. import datetime
  17. from google.auth import _helpers
  18. from google.auth import _service_account_info
  19. from google.auth import credentials
  20. from google.auth import exceptions
  21. from google.auth import jwt
  22. from google.oauth2 import _client
  23. TOKEN_EXCHANGE_TYPE = "urn:ietf:params:oauth:token-type:token-exchange"
  24. ACCESS_TOKEN_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
  25. SERVICE_ACCOUNT_TOKEN_TYPE = "urn:k8s:params:oauth:token-type:serviceaccount"
  26. JWT_LIFETIME = datetime.timedelta(seconds=3600) # 1 hour
  27. class ServiceAccountCredentials(credentials.Credentials):
  28. """Credentials for GDCH (`Google Distributed Cloud Hosted`_) for service
  29. account users.
  30. .. _Google Distributed Cloud Hosted:
  31. https://cloud.google.com/blog/topics/hybrid-cloud/\
  32. announcing-google-distributed-cloud-edge-and-hosted
  33. To create a GDCH service account credential, first create a JSON file of
  34. the following format::
  35. {
  36. "type": "gdch_service_account",
  37. "format_version": "1",
  38. "project": "<project name>",
  39. "private_key_id": "<key id>",
  40. "private_key": "-----BEGIN EC PRIVATE KEY-----\n<key bytes>\n-----END EC PRIVATE KEY-----\n",
  41. "name": "<service identity name>",
  42. "ca_cert_path": "<CA cert path>",
  43. "token_uri": "https://service-identity.<Domain>/authenticate"
  44. }
  45. The "format_version" field stands for the format of the JSON file. For now
  46. it is always "1". The `private_key_id` and `private_key` is used for signing.
  47. The `ca_cert_path` is used for token server TLS certificate verification.
  48. After the JSON file is created, set `GOOGLE_APPLICATION_CREDENTIALS` environment
  49. variable to the JSON file path, then use the following code to create the
  50. credential::
  51. import google.auth
  52. credential, _ = google.auth.default()
  53. credential = credential.with_gdch_audience("<the audience>")
  54. We can also create the credential directly::
  55. from google.oauth import gdch_credentials
  56. credential = gdch_credentials.ServiceAccountCredentials.from_service_account_file("<the json file path>")
  57. credential = credential.with_gdch_audience("<the audience>")
  58. The token is obtained in the following way. This class first creates a
  59. self signed JWT. It uses the `name` value as the `iss` and `sub` claim, and
  60. the `token_uri` as the `aud` claim, and signs the JWT with the `private_key`.
  61. It then sends the JWT to the `token_uri` to exchange a final token for
  62. `audience`.
  63. """
  64. def __init__(
  65. self, signer, service_identity_name, project, audience, token_uri, ca_cert_path
  66. ):
  67. """
  68. Args:
  69. signer (google.auth.crypt.Signer): The signer used to sign JWTs.
  70. service_identity_name (str): The service identity name. It will be
  71. used as the `iss` and `sub` claim in the self signed JWT.
  72. project (str): The project.
  73. audience (str): The audience for the final token.
  74. token_uri (str): The token server uri.
  75. ca_cert_path (str): The CA cert path for token server side TLS
  76. certificate verification. If the token server uses well known
  77. CA, then this parameter can be `None`.
  78. """
  79. super(ServiceAccountCredentials, self).__init__()
  80. self._signer = signer
  81. self._service_identity_name = service_identity_name
  82. self._project = project
  83. self._audience = audience
  84. self._token_uri = token_uri
  85. self._ca_cert_path = ca_cert_path
  86. def _create_jwt(self):
  87. now = _helpers.utcnow()
  88. expiry = now + JWT_LIFETIME
  89. iss_sub_value = "system:serviceaccount:{}:{}".format(
  90. self._project, self._service_identity_name
  91. )
  92. payload = {
  93. "iss": iss_sub_value,
  94. "sub": iss_sub_value,
  95. "aud": self._token_uri,
  96. "iat": _helpers.datetime_to_secs(now),
  97. "exp": _helpers.datetime_to_secs(expiry),
  98. }
  99. return _helpers.from_bytes(jwt.encode(self._signer, payload))
  100. @_helpers.copy_docstring(credentials.Credentials)
  101. def refresh(self, request):
  102. import google.auth.transport.requests
  103. if not isinstance(request, google.auth.transport.requests.Request):
  104. raise exceptions.RefreshError(
  105. "For GDCH service account credentials, request must be a google.auth.transport.requests.Request object"
  106. )
  107. # Create a self signed JWT, and do token exchange.
  108. jwt_token = self._create_jwt()
  109. request_body = {
  110. "grant_type": TOKEN_EXCHANGE_TYPE,
  111. "audience": self._audience,
  112. "requested_token_type": ACCESS_TOKEN_TOKEN_TYPE,
  113. "subject_token": jwt_token,
  114. "subject_token_type": SERVICE_ACCOUNT_TOKEN_TYPE,
  115. }
  116. response_data = _client._token_endpoint_request(
  117. request,
  118. self._token_uri,
  119. request_body,
  120. access_token=None,
  121. use_json=True,
  122. verify=self._ca_cert_path,
  123. )
  124. self.token, _, self.expiry, _ = _client._handle_refresh_grant_response(
  125. response_data, None
  126. )
  127. def with_gdch_audience(self, audience):
  128. """Create a copy of GDCH credentials with the specified audience.
  129. Args:
  130. audience (str): The intended audience for GDCH credentials.
  131. """
  132. return self.__class__(
  133. self._signer,
  134. self._service_identity_name,
  135. self._project,
  136. audience,
  137. self._token_uri,
  138. self._ca_cert_path,
  139. )
  140. @classmethod
  141. def _from_signer_and_info(cls, signer, info):
  142. """Creates a Credentials instance from a signer and service account
  143. info.
  144. Args:
  145. signer (google.auth.crypt.Signer): The signer used to sign JWTs.
  146. info (Mapping[str, str]): The service account info.
  147. Returns:
  148. google.oauth2.gdch_credentials.ServiceAccountCredentials: The constructed
  149. credentials.
  150. Raises:
  151. ValueError: If the info is not in the expected format.
  152. """
  153. if info["format_version"] != "1":
  154. raise ValueError("Only format version 1 is supported")
  155. return cls(
  156. signer,
  157. info["name"], # service_identity_name
  158. info["project"],
  159. None, # audience
  160. info["token_uri"],
  161. info.get("ca_cert_path", None),
  162. )
  163. @classmethod
  164. def from_service_account_info(cls, info):
  165. """Creates a Credentials instance from parsed service account info.
  166. Args:
  167. info (Mapping[str, str]): The service account info in Google
  168. format.
  169. kwargs: Additional arguments to pass to the constructor.
  170. Returns:
  171. google.oauth2.gdch_credentials.ServiceAccountCredentials: The constructed
  172. credentials.
  173. Raises:
  174. ValueError: If the info is not in the expected format.
  175. """
  176. signer = _service_account_info.from_dict(
  177. info,
  178. require=[
  179. "format_version",
  180. "private_key_id",
  181. "private_key",
  182. "name",
  183. "project",
  184. "token_uri",
  185. ],
  186. use_rsa_signer=False,
  187. )
  188. return cls._from_signer_and_info(signer, info)
  189. @classmethod
  190. def from_service_account_file(cls, filename):
  191. """Creates a Credentials instance from a service account json file.
  192. Args:
  193. filename (str): The path to the service account json file.
  194. kwargs: Additional arguments to pass to the constructor.
  195. Returns:
  196. google.oauth2.gdch_credentials.ServiceAccountCredentials: The constructed
  197. credentials.
  198. """
  199. info, signer = _service_account_info.from_filename(
  200. filename,
  201. require=[
  202. "format_version",
  203. "private_key_id",
  204. "private_key",
  205. "name",
  206. "project",
  207. "token_uri",
  208. ],
  209. use_rsa_signer=False,
  210. )
  211. return cls._from_signer_and_info(signer, info)