123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279 |
- # Copyright 2020 Google LLC
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """Identity Pool Credentials.
- This module provides credentials to access Google Cloud resources from on-prem
- or non-Google Cloud platforms which support external credentials (e.g. OIDC ID
- tokens) retrieved from local file locations or local servers. This includes
- Microsoft Azure and OIDC identity providers (e.g. K8s workloads registered with
- Hub with Hub workload identity enabled).
- These credentials are recommended over the use of service account credentials
- in on-prem/non-Google Cloud platforms as they do not involve the management of
- long-live service account private keys.
- Identity Pool Credentials are initialized using external_account
- arguments which are typically loaded from an external credentials file or
- an external credentials URL. Unlike other Credentials that can be initialized
- with a list of explicit arguments, secrets or credentials, external account
- clients use the environment and hints/guidelines provided by the
- external_account JSON file to retrieve credentials and exchange them for Google
- access tokens.
- """
- try:
- from collections.abc import Mapping
- # Python 2.7 compatibility
- except ImportError: # pragma: NO COVER
- from collections import Mapping
- import io
- import json
- import os
- from google.auth import _helpers
- from google.auth import exceptions
- from google.auth import external_account
- class Credentials(external_account.Credentials):
- """External account credentials sourced from files and URLs."""
- def __init__(
- self,
- audience,
- subject_token_type,
- token_url,
- credential_source,
- service_account_impersonation_url=None,
- client_id=None,
- client_secret=None,
- quota_project_id=None,
- scopes=None,
- default_scopes=None,
- ):
- """Instantiates an external account credentials object from a file/URL.
- Args:
- audience (str): The STS audience field.
- subject_token_type (str): The subject token type.
- token_url (str): The STS endpoint URL.
- credential_source (Mapping): The credential source dictionary used to
- provide instructions on how to retrieve external credential to be
- exchanged for Google access tokens.
- Example credential_source for url-sourced credential::
- {
- "url": "http://www.example.com",
- "format": {
- "type": "json",
- "subject_token_field_name": "access_token",
- },
- "headers": {"foo": "bar"},
- }
- Example credential_source for file-sourced credential::
- {
- "file": "/path/to/token/file.txt"
- }
- service_account_impersonation_url (Optional[str]): The optional service account
- impersonation getAccessToken URL.
- client_id (Optional[str]): The optional client ID.
- client_secret (Optional[str]): The optional client secret.
- quota_project_id (Optional[str]): The optional quota project ID.
- scopes (Optional[Sequence[str]]): Optional scopes to request during the
- authorization grant.
- default_scopes (Optional[Sequence[str]]): Default scopes passed by a
- Google client library. Use 'scopes' for user-defined scopes.
- Raises:
- google.auth.exceptions.RefreshError: If an error is encountered during
- access token retrieval logic.
- ValueError: For invalid parameters.
- .. note:: Typically one of the helper constructors
- :meth:`from_file` or
- :meth:`from_info` are used instead of calling the constructor directly.
- """
- super(Credentials, self).__init__(
- audience=audience,
- subject_token_type=subject_token_type,
- token_url=token_url,
- credential_source=credential_source,
- service_account_impersonation_url=service_account_impersonation_url,
- client_id=client_id,
- client_secret=client_secret,
- quota_project_id=quota_project_id,
- scopes=scopes,
- default_scopes=default_scopes,
- )
- if not isinstance(credential_source, Mapping):
- self._credential_source_file = None
- self._credential_source_url = None
- else:
- self._credential_source_file = credential_source.get("file")
- self._credential_source_url = credential_source.get("url")
- self._credential_source_headers = credential_source.get("headers")
- credential_source_format = credential_source.get("format", {})
- # Get credential_source format type. When not provided, this
- # defaults to text.
- self._credential_source_format_type = (
- credential_source_format.get("type") or "text"
- )
- # environment_id is only supported in AWS or dedicated future external
- # account credentials.
- if "environment_id" in credential_source:
- raise ValueError(
- "Invalid Identity Pool credential_source field 'environment_id'"
- )
- if self._credential_source_format_type not in ["text", "json"]:
- raise ValueError(
- "Invalid credential_source format '{}'".format(
- self._credential_source_format_type
- )
- )
- # For JSON types, get the required subject_token field name.
- if self._credential_source_format_type == "json":
- self._credential_source_field_name = credential_source_format.get(
- "subject_token_field_name"
- )
- if self._credential_source_field_name is None:
- raise ValueError(
- "Missing subject_token_field_name for JSON credential_source format"
- )
- else:
- self._credential_source_field_name = None
- if self._credential_source_file and self._credential_source_url:
- raise ValueError(
- "Ambiguous credential_source. 'file' is mutually exclusive with 'url'."
- )
- if not self._credential_source_file and not self._credential_source_url:
- raise ValueError(
- "Missing credential_source. A 'file' or 'url' must be provided."
- )
- @_helpers.copy_docstring(external_account.Credentials)
- def retrieve_subject_token(self, request):
- return self._parse_token_data(
- self._get_token_data(request),
- self._credential_source_format_type,
- self._credential_source_field_name,
- )
- def _get_token_data(self, request):
- if self._credential_source_file:
- return self._get_file_data(self._credential_source_file)
- else:
- return self._get_url_data(
- request, self._credential_source_url, self._credential_source_headers
- )
- def _get_file_data(self, filename):
- if not os.path.exists(filename):
- raise exceptions.RefreshError("File '{}' was not found.".format(filename))
- with io.open(filename, "r", encoding="utf-8") as file_obj:
- return file_obj.read(), filename
- def _get_url_data(self, request, url, headers):
- response = request(url=url, method="GET", headers=headers)
- # support both string and bytes type response.data
- response_body = (
- response.data.decode("utf-8")
- if hasattr(response.data, "decode")
- else response.data
- )
- if response.status != 200:
- raise exceptions.RefreshError(
- "Unable to retrieve Identity Pool subject token", response_body
- )
- return response_body, url
- def _parse_token_data(
- self, token_content, format_type="text", subject_token_field_name=None
- ):
- content, filename = token_content
- if format_type == "text":
- token = content
- else:
- try:
- # Parse file content as JSON.
- response_data = json.loads(content)
- # Get the subject_token.
- token = response_data[subject_token_field_name]
- except (KeyError, ValueError):
- raise exceptions.RefreshError(
- "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
- filename, subject_token_field_name
- )
- )
- if not token:
- raise exceptions.RefreshError(
- "Missing subject_token in the credential_source file"
- )
- return token
- @classmethod
- def from_info(cls, info, **kwargs):
- """Creates an Identity Pool Credentials instance from parsed external account info.
- Args:
- info (Mapping[str, str]): The Identity Pool external account info in Google
- format.
- kwargs: Additional arguments to pass to the constructor.
- Returns:
- google.auth.identity_pool.Credentials: The constructed
- credentials.
- Raises:
- ValueError: For invalid parameters.
- """
- return cls(
- audience=info.get("audience"),
- subject_token_type=info.get("subject_token_type"),
- token_url=info.get("token_url"),
- service_account_impersonation_url=info.get(
- "service_account_impersonation_url"
- ),
- client_id=info.get("client_id"),
- client_secret=info.get("client_secret"),
- credential_source=info.get("credential_source"),
- quota_project_id=info.get("quota_project_id"),
- **kwargs
- )
- @classmethod
- def from_file(cls, filename, **kwargs):
- """Creates an IdentityPool Credentials instance from an external account json file.
- Args:
- filename (str): The path to the IdentityPool external account json file.
- kwargs: Additional arguments to pass to the constructor.
- Returns:
- google.auth.identity_pool.Credentials: The constructed
- credentials.
- """
- with io.open(filename, "r", encoding="utf-8") as json_file:
- data = json.load(json_file)
- return cls.from_info(data, **kwargs)
|