123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488 |
- # Copyright 2016 Google LLC
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """Google Compute Engine credentials.
- This module provides authentication for an application running on Google
- Compute Engine using the Compute Engine metadata server.
- """
- import datetime
- from google.auth import _helpers
- from google.auth import credentials
- from google.auth import exceptions
- from google.auth import iam
- from google.auth import jwt
- from google.auth import metrics
- from google.auth.compute_engine import _metadata
- from google.oauth2 import _client
- class Credentials(
- credentials.Scoped,
- credentials.CredentialsWithQuotaProject,
- credentials.CredentialsWithUniverseDomain,
- ):
- """Compute Engine Credentials.
- These credentials use the Google Compute Engine metadata server to obtain
- OAuth 2.0 access tokens associated with the instance's service account,
- and are also used for Cloud Run, Flex and App Engine (except for the Python
- 2.7 runtime, which is supported only on older versions of this library).
- For more information about Compute Engine authentication, including how
- to configure scopes, see the `Compute Engine authentication
- documentation`_.
- .. note:: On Compute Engine the metadata server ignores requested scopes.
- On Cloud Run, Flex and App Engine the server honours requested scopes.
- .. _Compute Engine authentication documentation:
- https://cloud.google.com/compute/docs/authentication#using
- """
- def __init__(
- self,
- service_account_email="default",
- quota_project_id=None,
- scopes=None,
- default_scopes=None,
- universe_domain=None,
- ):
- """
- Args:
- service_account_email (str): The service account email to use, or
- 'default'. A Compute Engine instance may have multiple service
- accounts.
- quota_project_id (Optional[str]): The project ID used for quota and
- billing.
- scopes (Optional[Sequence[str]]): The list of scopes for the credentials.
- default_scopes (Optional[Sequence[str]]): Default scopes passed by a
- Google client library. Use 'scopes' for user-defined scopes.
- universe_domain (Optional[str]): The universe domain. If not
- provided or None, credential will attempt to fetch the value
- from metadata server. If metadata server doesn't have universe
- domain endpoint, then the default googleapis.com will be used.
- """
- super(Credentials, self).__init__()
- self._service_account_email = service_account_email
- self._quota_project_id = quota_project_id
- self._scopes = scopes
- self._default_scopes = default_scopes
- self._universe_domain_cached = False
- if universe_domain:
- self._universe_domain = universe_domain
- self._universe_domain_cached = True
- def _retrieve_info(self, request):
- """Retrieve information about the service account.
- Updates the scopes and retrieves the full service account email.
- Args:
- request (google.auth.transport.Request): The object used to make
- HTTP requests.
- """
- info = _metadata.get_service_account_info(
- request, service_account=self._service_account_email
- )
- self._service_account_email = info["email"]
- # Don't override scopes requested by the user.
- if self._scopes is None:
- self._scopes = info["scopes"]
- def _metric_header_for_usage(self):
- return metrics.CRED_TYPE_SA_MDS
- def refresh(self, request):
- """Refresh the access token and scopes.
- Args:
- request (google.auth.transport.Request): The object used to make
- HTTP requests.
- Raises:
- google.auth.exceptions.RefreshError: If the Compute Engine metadata
- service can't be reached if if the instance has not
- credentials.
- """
- scopes = self._scopes if self._scopes is not None else self._default_scopes
- try:
- self._retrieve_info(request)
- self.token, self.expiry = _metadata.get_service_account_token(
- request, service_account=self._service_account_email, scopes=scopes
- )
- except exceptions.TransportError as caught_exc:
- new_exc = exceptions.RefreshError(caught_exc)
- raise new_exc from caught_exc
- @property
- def service_account_email(self):
- """The service account email.
- .. note:: This is not guaranteed to be set until :meth:`refresh` has been
- called.
- """
- return self._service_account_email
- @property
- def requires_scopes(self):
- return not self._scopes
- @property
- def universe_domain(self):
- if self._universe_domain_cached:
- return self._universe_domain
- from google.auth.transport import requests as google_auth_requests
- self._universe_domain = _metadata.get_universe_domain(
- google_auth_requests.Request()
- )
- self._universe_domain_cached = True
- return self._universe_domain
- @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
- def with_quota_project(self, quota_project_id):
- creds = self.__class__(
- service_account_email=self._service_account_email,
- quota_project_id=quota_project_id,
- scopes=self._scopes,
- default_scopes=self._default_scopes,
- )
- creds._universe_domain = self._universe_domain
- creds._universe_domain_cached = self._universe_domain_cached
- return creds
- @_helpers.copy_docstring(credentials.Scoped)
- def with_scopes(self, scopes, default_scopes=None):
- # Compute Engine credentials can not be scoped (the metadata service
- # ignores the scopes parameter). App Engine, Cloud Run and Flex support
- # requesting scopes.
- creds = self.__class__(
- scopes=scopes,
- default_scopes=default_scopes,
- service_account_email=self._service_account_email,
- quota_project_id=self._quota_project_id,
- )
- creds._universe_domain = self._universe_domain
- creds._universe_domain_cached = self._universe_domain_cached
- return creds
- @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain)
- def with_universe_domain(self, universe_domain):
- return self.__class__(
- scopes=self._scopes,
- default_scopes=self._default_scopes,
- service_account_email=self._service_account_email,
- quota_project_id=self._quota_project_id,
- universe_domain=universe_domain,
- )
- _DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
- _DEFAULT_TOKEN_URI = "https://www.googleapis.com/oauth2/v4/token"
- class IDTokenCredentials(
- credentials.CredentialsWithQuotaProject,
- credentials.Signing,
- credentials.CredentialsWithTokenUri,
- ):
- """Open ID Connect ID Token-based service account credentials.
- These credentials relies on the default service account of a GCE instance.
- ID token can be requested from `GCE metadata server identity endpoint`_, IAM
- token endpoint or other token endpoints you specify. If metadata server
- identity endpoint is not used, the GCE instance must have been started with
- a service account that has access to the IAM Cloud API.
- .. _GCE metadata server identity endpoint:
- https://cloud.google.com/compute/docs/instances/verifying-instance-identity
- """
- def __init__(
- self,
- request,
- target_audience,
- token_uri=None,
- additional_claims=None,
- service_account_email=None,
- signer=None,
- use_metadata_identity_endpoint=False,
- quota_project_id=None,
- ):
- """
- Args:
- request (google.auth.transport.Request): The object used to make
- HTTP requests.
- target_audience (str): The intended audience for these credentials,
- used when requesting the ID Token. The ID Token's ``aud`` claim
- will be set to this string.
- token_uri (str): The OAuth 2.0 Token URI.
- additional_claims (Mapping[str, str]): Any additional claims for
- the JWT assertion used in the authorization grant.
- service_account_email (str): Optional explicit service account to
- use to sign JWT tokens.
- By default, this is the default GCE service account.
- signer (google.auth.crypt.Signer): The signer used to sign JWTs.
- In case the signer is specified, the request argument will be
- ignored.
- use_metadata_identity_endpoint (bool): Whether to use GCE metadata
- identity endpoint. For backward compatibility the default value
- is False. If set to True, ``token_uri``, ``additional_claims``,
- ``service_account_email``, ``signer`` argument should not be set;
- otherwise ValueError will be raised.
- quota_project_id (Optional[str]): The project ID used for quota and
- billing.
- Raises:
- ValueError:
- If ``use_metadata_identity_endpoint`` is set to True, and one of
- ``token_uri``, ``additional_claims``, ``service_account_email``,
- ``signer`` arguments is set.
- """
- super(IDTokenCredentials, self).__init__()
- self._quota_project_id = quota_project_id
- self._use_metadata_identity_endpoint = use_metadata_identity_endpoint
- self._target_audience = target_audience
- if use_metadata_identity_endpoint:
- if token_uri or additional_claims or service_account_email or signer:
- raise exceptions.MalformedError(
- "If use_metadata_identity_endpoint is set, token_uri, "
- "additional_claims, service_account_email, signer arguments"
- " must not be set"
- )
- self._token_uri = None
- self._additional_claims = None
- self._signer = None
- if service_account_email is None:
- sa_info = _metadata.get_service_account_info(request)
- self._service_account_email = sa_info["email"]
- else:
- self._service_account_email = service_account_email
- if not use_metadata_identity_endpoint:
- if signer is None:
- signer = iam.Signer(
- request=request,
- credentials=Credentials(),
- service_account_email=self._service_account_email,
- )
- self._signer = signer
- self._token_uri = token_uri or _DEFAULT_TOKEN_URI
- if additional_claims is not None:
- self._additional_claims = additional_claims
- else:
- self._additional_claims = {}
- def with_target_audience(self, target_audience):
- """Create a copy of these credentials with the specified target
- audience.
- Args:
- target_audience (str): The intended audience for these credentials,
- used when requesting the ID Token.
- Returns:
- google.auth.service_account.IDTokenCredentials: A new credentials
- instance.
- """
- # since the signer is already instantiated,
- # the request is not needed
- if self._use_metadata_identity_endpoint:
- return self.__class__(
- None,
- target_audience=target_audience,
- use_metadata_identity_endpoint=True,
- quota_project_id=self._quota_project_id,
- )
- else:
- return self.__class__(
- None,
- service_account_email=self._service_account_email,
- token_uri=self._token_uri,
- target_audience=target_audience,
- additional_claims=self._additional_claims.copy(),
- signer=self.signer,
- use_metadata_identity_endpoint=False,
- quota_project_id=self._quota_project_id,
- )
- @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
- def with_quota_project(self, quota_project_id):
- # since the signer is already instantiated,
- # the request is not needed
- if self._use_metadata_identity_endpoint:
- return self.__class__(
- None,
- target_audience=self._target_audience,
- use_metadata_identity_endpoint=True,
- quota_project_id=quota_project_id,
- )
- else:
- return self.__class__(
- None,
- service_account_email=self._service_account_email,
- token_uri=self._token_uri,
- target_audience=self._target_audience,
- additional_claims=self._additional_claims.copy(),
- signer=self.signer,
- use_metadata_identity_endpoint=False,
- quota_project_id=quota_project_id,
- )
- @_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
- def with_token_uri(self, token_uri):
- # since the signer is already instantiated,
- # the request is not needed
- if self._use_metadata_identity_endpoint:
- raise exceptions.MalformedError(
- "If use_metadata_identity_endpoint is set, token_uri" " must not be set"
- )
- else:
- return self.__class__(
- None,
- service_account_email=self._service_account_email,
- token_uri=token_uri,
- target_audience=self._target_audience,
- additional_claims=self._additional_claims.copy(),
- signer=self.signer,
- use_metadata_identity_endpoint=False,
- quota_project_id=self.quota_project_id,
- )
- def _make_authorization_grant_assertion(self):
- """Create the OAuth 2.0 assertion.
- This assertion is used during the OAuth 2.0 grant to acquire an
- ID token.
- Returns:
- bytes: The authorization grant assertion.
- """
- now = _helpers.utcnow()
- lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS)
- expiry = now + lifetime
- payload = {
- "iat": _helpers.datetime_to_secs(now),
- "exp": _helpers.datetime_to_secs(expiry),
- # The issuer must be the service account email.
- "iss": self.service_account_email,
- # The audience must be the auth token endpoint's URI
- "aud": self._token_uri,
- # The target audience specifies which service the ID token is
- # intended for.
- "target_audience": self._target_audience,
- }
- payload.update(self._additional_claims)
- token = jwt.encode(self._signer, payload)
- return token
- def _call_metadata_identity_endpoint(self, request):
- """Request ID token from metadata identity endpoint.
- Args:
- request (google.auth.transport.Request): The object used to make
- HTTP requests.
- Returns:
- Tuple[str, datetime.datetime]: The ID token and the expiry of the ID token.
- Raises:
- google.auth.exceptions.RefreshError: If the Compute Engine metadata
- service can't be reached or if the instance has no credentials.
- ValueError: If extracting expiry from the obtained ID token fails.
- """
- try:
- path = "instance/service-accounts/default/identity"
- params = {"audience": self._target_audience, "format": "full"}
- metrics_header = {
- metrics.API_CLIENT_HEADER: metrics.token_request_id_token_mds()
- }
- id_token = _metadata.get(
- request, path, params=params, headers=metrics_header
- )
- except exceptions.TransportError as caught_exc:
- new_exc = exceptions.RefreshError(caught_exc)
- raise new_exc from caught_exc
- _, payload, _, _ = jwt._unverified_decode(id_token)
- return id_token, datetime.datetime.utcfromtimestamp(payload["exp"])
- def refresh(self, request):
- """Refreshes the ID token.
- Args:
- request (google.auth.transport.Request): The object used to make
- HTTP requests.
- Raises:
- google.auth.exceptions.RefreshError: If the credentials could
- not be refreshed.
- ValueError: If extracting expiry from the obtained ID token fails.
- """
- if self._use_metadata_identity_endpoint:
- self.token, self.expiry = self._call_metadata_identity_endpoint(request)
- else:
- assertion = self._make_authorization_grant_assertion()
- access_token, expiry, _ = _client.id_token_jwt_grant(
- request, self._token_uri, assertion
- )
- self.token = access_token
- self.expiry = expiry
- @property # type: ignore
- @_helpers.copy_docstring(credentials.Signing)
- def signer(self):
- return self._signer
- def sign_bytes(self, message):
- """Signs the given message.
- Args:
- message (bytes): The message to sign.
- Returns:
- bytes: The message's cryptographic signature.
- Raises:
- ValueError:
- Signer is not available if metadata identity endpoint is used.
- """
- if self._use_metadata_identity_endpoint:
- raise exceptions.InvalidOperation(
- "Signer is not available if metadata identity endpoint is used"
- )
- return self._signer.sign(message)
- @property
- def service_account_email(self):
- """The service account email."""
- return self._service_account_email
- @property
- def signer_email(self):
- return self._service_account_email
|