123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439 |
- # Copyright 2020 Google LLC
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """Identity Pool Credentials.
- This module provides credentials to access Google Cloud resources from on-prem
- or non-Google Cloud platforms which support external credentials (e.g. OIDC ID
- tokens) retrieved from local file locations or local servers. This includes
- Microsoft Azure and OIDC identity providers (e.g. K8s workloads registered with
- Hub with Hub workload identity enabled).
- These credentials are recommended over the use of service account credentials
- in on-prem/non-Google Cloud platforms as they do not involve the management of
- long-live service account private keys.
- Identity Pool Credentials are initialized using external_account
- arguments which are typically loaded from an external credentials file or
- an external credentials URL.
- This module also provides a definition for an abstract subject token supplier.
- This supplier can be implemented to return a valid OIDC or SAML2.0 subject token
- and used to create Identity Pool credentials. The credentials will then call the
- supplier instead of using pre-defined methods such as reading a local file or
- calling a URL.
- """
- try:
- from collections.abc import Mapping
- # Python 2.7 compatibility
- except ImportError: # pragma: NO COVER
- from collections import Mapping # type: ignore
- import abc
- import json
- import os
- from typing import NamedTuple
- from google.auth import _helpers
- from google.auth import exceptions
- from google.auth import external_account
- from google.auth.transport import _mtls_helper
- class SubjectTokenSupplier(metaclass=abc.ABCMeta):
- """Base class for subject token suppliers. This can be implemented with custom logic to retrieve
- a subject token to exchange for a Google Cloud access token when using Workload or
- Workforce Identity Federation. The identity pool credential does not cache the subject token,
- so caching logic should be added in the implementation.
- """
- @abc.abstractmethod
- def get_subject_token(self, context, request):
- """Returns the requested subject token. The subject token must be valid.
- .. warning: This is not cached by the calling Google credential, so caching logic should be implemented in the supplier.
- Args:
- context (google.auth.externalaccount.SupplierContext): The context object
- containing information about the requested audience and subject token type.
- request (google.auth.transport.Request): The object used to make
- HTTP requests.
- Raises:
- google.auth.exceptions.RefreshError: If an error is encountered during
- subject token retrieval logic.
- Returns:
- str: The requested subject token string.
- """
- raise NotImplementedError("")
- class _TokenContent(NamedTuple):
- """Models the token content response from file and url internal suppliers.
- Attributes:
- content (str): The string content of the file or URL response.
- location (str): The location the content was retrieved from. This will either be a file location or a URL.
- """
- content: str
- location: str
- class _FileSupplier(SubjectTokenSupplier):
- """ Internal implementation of subject token supplier which supports reading a subject token from a file."""
- def __init__(self, path, format_type, subject_token_field_name):
- self._path = path
- self._format_type = format_type
- self._subject_token_field_name = subject_token_field_name
- @_helpers.copy_docstring(SubjectTokenSupplier)
- def get_subject_token(self, context, request):
- if not os.path.exists(self._path):
- raise exceptions.RefreshError("File '{}' was not found.".format(self._path))
- with open(self._path, "r", encoding="utf-8") as file_obj:
- token_content = _TokenContent(file_obj.read(), self._path)
- return _parse_token_data(
- token_content, self._format_type, self._subject_token_field_name
- )
- class _UrlSupplier(SubjectTokenSupplier):
- """ Internal implementation of subject token supplier which supports retrieving a subject token by calling a URL endpoint."""
- def __init__(self, url, format_type, subject_token_field_name, headers):
- self._url = url
- self._format_type = format_type
- self._subject_token_field_name = subject_token_field_name
- self._headers = headers
- @_helpers.copy_docstring(SubjectTokenSupplier)
- def get_subject_token(self, context, request):
- response = request(url=self._url, method="GET", headers=self._headers)
- # support both string and bytes type response.data
- response_body = (
- response.data.decode("utf-8")
- if hasattr(response.data, "decode")
- else response.data
- )
- if response.status != 200:
- raise exceptions.RefreshError(
- "Unable to retrieve Identity Pool subject token", response_body
- )
- token_content = _TokenContent(response_body, self._url)
- return _parse_token_data(
- token_content, self._format_type, self._subject_token_field_name
- )
- class _X509Supplier(SubjectTokenSupplier):
- """Internal supplier for X509 workload credentials. This class is used internally and always returns an empty string as the subject token."""
- @_helpers.copy_docstring(SubjectTokenSupplier)
- def get_subject_token(self, context, request):
- return ""
- def _parse_token_data(token_content, format_type="text", subject_token_field_name=None):
- if format_type == "text":
- token = token_content.content
- else:
- try:
- # Parse file content as JSON.
- response_data = json.loads(token_content.content)
- # Get the subject_token.
- token = response_data[subject_token_field_name]
- except (KeyError, ValueError):
- raise exceptions.RefreshError(
- "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
- token_content.location, subject_token_field_name
- )
- )
- if not token:
- raise exceptions.RefreshError(
- "Missing subject_token in the credential_source file"
- )
- return token
- class Credentials(external_account.Credentials):
- """External account credentials sourced from files and URLs."""
- def __init__(
- self,
- audience,
- subject_token_type,
- token_url=external_account._DEFAULT_TOKEN_URL,
- credential_source=None,
- subject_token_supplier=None,
- *args,
- **kwargs
- ):
- """Instantiates an external account credentials object from a file/URL.
- Args:
- audience (str): The STS audience field.
- subject_token_type (str): The subject token type based on the Oauth2.0 token exchange spec.
- Expected values include::
- “urn:ietf:params:oauth:token-type:jwt”
- “urn:ietf:params:oauth:token-type:id-token”
- “urn:ietf:params:oauth:token-type:saml2”
- token_url (Optional [str]): The STS endpoint URL. If not provided, will default to "https://sts.googleapis.com/v1/token".
- credential_source (Optional [Mapping]): The credential source dictionary used to
- provide instructions on how to retrieve external credential to be
- exchanged for Google access tokens. Either a credential source or
- a subject token supplier must be provided.
- Example credential_source for url-sourced credential::
- {
- "url": "http://www.example.com",
- "format": {
- "type": "json",
- "subject_token_field_name": "access_token",
- },
- "headers": {"foo": "bar"},
- }
- Example credential_source for file-sourced credential::
- {
- "file": "/path/to/token/file.txt"
- }
- subject_token_supplier (Optional [SubjectTokenSupplier]): Optional subject token supplier.
- This will be called to supply a valid subject token which will then
- be exchanged for Google access tokens. Either a subject token supplier
- or a credential source must be provided.
- args (List): Optional positional arguments passed into the underlying :meth:`~external_account.Credentials.__init__` method.
- kwargs (Mapping): Optional keyword arguments passed into the underlying :meth:`~external_account.Credentials.__init__` method.
- Raises:
- google.auth.exceptions.RefreshError: If an error is encountered during
- access token retrieval logic.
- ValueError: For invalid parameters.
- .. note:: Typically one of the helper constructors
- :meth:`from_file` or
- :meth:`from_info` are used instead of calling the constructor directly.
- """
- super(Credentials, self).__init__(
- audience=audience,
- subject_token_type=subject_token_type,
- token_url=token_url,
- credential_source=credential_source,
- *args,
- **kwargs
- )
- if credential_source is None and subject_token_supplier is None:
- raise exceptions.InvalidValue(
- "A valid credential source or a subject token supplier must be provided."
- )
- if credential_source is not None and subject_token_supplier is not None:
- raise exceptions.InvalidValue(
- "Identity pool credential cannot have both a credential source and a subject token supplier."
- )
- if subject_token_supplier is not None:
- self._subject_token_supplier = subject_token_supplier
- self._credential_source_file = None
- self._credential_source_url = None
- self._credential_source_certificate = None
- else:
- if not isinstance(credential_source, Mapping):
- self._credential_source_executable = None
- raise exceptions.MalformedError(
- "Invalid credential_source. The credential_source is not a dict."
- )
- self._credential_source_file = credential_source.get("file")
- self._credential_source_url = credential_source.get("url")
- self._credential_source_certificate = credential_source.get("certificate")
- # environment_id is only supported in AWS or dedicated future external
- # account credentials.
- if "environment_id" in credential_source:
- raise exceptions.MalformedError(
- "Invalid Identity Pool credential_source field 'environment_id'"
- )
- # check that only one of file, url, or certificate are provided.
- self._validate_single_source()
- if self._credential_source_certificate:
- self._validate_certificate_config()
- else:
- self._validate_file_or_url_config(credential_source)
- if self._credential_source_file:
- self._subject_token_supplier = _FileSupplier(
- self._credential_source_file,
- self._credential_source_format_type,
- self._credential_source_field_name,
- )
- elif self._credential_source_url:
- self._subject_token_supplier = _UrlSupplier(
- self._credential_source_url,
- self._credential_source_format_type,
- self._credential_source_field_name,
- self._credential_source_headers,
- )
- else: # self._credential_source_certificate
- self._subject_token_supplier = _X509Supplier()
- @_helpers.copy_docstring(external_account.Credentials)
- def retrieve_subject_token(self, request):
- return self._subject_token_supplier.get_subject_token(
- self._supplier_context, request
- )
- def _get_mtls_cert_and_key_paths(self):
- if self._credential_source_certificate is None:
- raise exceptions.RefreshError(
- 'The credential is not configured to use mtls requests. The credential should include a "certificate" section in the credential source.'
- )
- else:
- return _mtls_helper._get_workload_cert_and_key_paths(
- self._certificate_config_location
- )
- def _mtls_required(self):
- return self._credential_source_certificate is not None
- def _create_default_metrics_options(self):
- metrics_options = super(Credentials, self)._create_default_metrics_options()
- # Check that credential source is a dict before checking for credential type. This check needs to be done
- # here because the external_account credential constructor needs to pass the metrics options to the
- # impersonated credential object before the identity_pool credentials are validated.
- if isinstance(self._credential_source, Mapping):
- if self._credential_source.get("file"):
- metrics_options["source"] = "file"
- elif self._credential_source.get("url"):
- metrics_options["source"] = "url"
- else:
- metrics_options["source"] = "x509"
- else:
- metrics_options["source"] = "programmatic"
- return metrics_options
- def _has_custom_supplier(self):
- return self._credential_source is None
- def _constructor_args(self):
- args = super(Credentials, self)._constructor_args()
- # If a custom supplier was used, append it to the args dict.
- if self._has_custom_supplier():
- args.update({"subject_token_supplier": self._subject_token_supplier})
- return args
- def _validate_certificate_config(self):
- self._certificate_config_location = self._credential_source_certificate.get(
- "certificate_config_location"
- )
- use_default = self._credential_source_certificate.get(
- "use_default_certificate_config"
- )
- if self._certificate_config_location and use_default:
- raise exceptions.MalformedError(
- "Invalid certificate configuration, certificate_config_location cannot be specified when use_default_certificate_config = true."
- )
- if not self._certificate_config_location and not use_default:
- raise exceptions.MalformedError(
- "Invalid certificate configuration, use_default_certificate_config should be true if no certificate_config_location is provided."
- )
- def _validate_file_or_url_config(self, credential_source):
- self._credential_source_headers = credential_source.get("headers")
- credential_source_format = credential_source.get("format", {})
- # Get credential_source format type. When not provided, this
- # defaults to text.
- self._credential_source_format_type = (
- credential_source_format.get("type") or "text"
- )
- if self._credential_source_format_type not in ["text", "json"]:
- raise exceptions.MalformedError(
- "Invalid credential_source format '{}'".format(
- self._credential_source_format_type
- )
- )
- # For JSON types, get the required subject_token field name.
- if self._credential_source_format_type == "json":
- self._credential_source_field_name = credential_source_format.get(
- "subject_token_field_name"
- )
- if self._credential_source_field_name is None:
- raise exceptions.MalformedError(
- "Missing subject_token_field_name for JSON credential_source format"
- )
- else:
- self._credential_source_field_name = None
- def _validate_single_source(self):
- credential_sources = [
- self._credential_source_file,
- self._credential_source_url,
- self._credential_source_certificate,
- ]
- valid_credential_sources = list(
- filter(lambda source: source is not None, credential_sources)
- )
- if len(valid_credential_sources) > 1:
- raise exceptions.MalformedError(
- "Ambiguous credential_source. 'file', 'url', and 'certificate' are mutually exclusive.."
- )
- if len(valid_credential_sources) != 1:
- raise exceptions.MalformedError(
- "Missing credential_source. A 'file', 'url', or 'certificate' must be provided."
- )
- @classmethod
- def from_info(cls, info, **kwargs):
- """Creates an Identity Pool Credentials instance from parsed external account info.
- Args:
- info (Mapping[str, str]): The Identity Pool external account info in Google
- format.
- kwargs: Additional arguments to pass to the constructor.
- Returns:
- google.auth.identity_pool.Credentials: The constructed
- credentials.
- Raises:
- ValueError: For invalid parameters.
- """
- subject_token_supplier = info.get("subject_token_supplier")
- kwargs.update({"subject_token_supplier": subject_token_supplier})
- return super(Credentials, cls).from_info(info, **kwargs)
- @classmethod
- def from_file(cls, filename, **kwargs):
- """Creates an IdentityPool Credentials instance from an external account json file.
- Args:
- filename (str): The path to the IdentityPool external account json file.
- kwargs: Additional arguments to pass to the constructor.
- Returns:
- google.auth.identity_pool.Credentials: The constructed
- credentials.
- """
- return super(Credentials, cls).from_file(filename, **kwargs)
|