123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512 |
- # Copyright 2021 Google LLC
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """Downscoping with Credential Access Boundaries
- This module provides the ability to downscope credentials using
- `Downscoping with Credential Access Boundaries`_. This is useful to restrict the
- Identity and Access Management (IAM) permissions that a short-lived credential
- can use.
- To downscope permissions of a source credential, a Credential Access Boundary
- that specifies which resources the new credential can access, as well as
- an upper bound on the permissions that are available on each resource, has to
- be defined. A downscoped credential can then be instantiated using the source
- credential and the Credential Access Boundary.
- The common pattern of usage is to have a token broker with elevated access
- generate these downscoped credentials from higher access source credentials and
- pass the downscoped short-lived access tokens to a token consumer via some
- secure authenticated channel for limited access to Google Cloud Storage
- resources.
- For example, a token broker can be set up on a server in a private network.
- Various workloads (token consumers) in the same network will send authenticated
- requests to that broker for downscoped tokens to access or modify specific google
- cloud storage buckets.
- The broker will instantiate downscoped credentials instances that can be used to
- generate short lived downscoped access tokens that can be passed to the token
- consumer. These downscoped access tokens can be injected by the consumer into
- google.oauth2.Credentials and used to initialize a storage client instance to
- access Google Cloud Storage resources with restricted access.
- Note: Only Cloud Storage supports Credential Access Boundaries. Other Google
- Cloud services do not support this feature.
- .. _Downscoping with Credential Access Boundaries: https://cloud.google.com/iam/docs/downscoping-short-lived-credentials
- """
- import datetime
- from google.auth import _helpers
- from google.auth import credentials
- from google.auth import exceptions
- from google.oauth2 import sts
- # The maximum number of access boundary rules a Credential Access Boundary can
- # contain.
- _MAX_ACCESS_BOUNDARY_RULES_COUNT = 10
- # The token exchange grant_type used for exchanging credentials.
- _STS_GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange"
- # The token exchange requested_token_type. This is always an access_token.
- _STS_REQUESTED_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
- # The STS token URL used to exchanged a short lived access token for a downscoped one.
- _STS_TOKEN_URL_PATTERN = "https://sts.{}/v1/token"
- # The subject token type to use when exchanging a short lived access token for a
- # downscoped token.
- _STS_SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
- class CredentialAccessBoundary(object):
- """Defines a Credential Access Boundary which contains a list of access boundary
- rules. Each rule contains information on the resource that the rule applies to,
- the upper bound of the permissions that are available on that resource and an
- optional condition to further restrict permissions.
- """
- def __init__(self, rules=[]):
- """Instantiates a Credential Access Boundary. A Credential Access Boundary
- can contain up to 10 access boundary rules.
- Args:
- rules (Sequence[google.auth.downscoped.AccessBoundaryRule]): The list of
- access boundary rules limiting the access that a downscoped credential
- will have.
- Raises:
- InvalidType: If any of the rules are not a valid type.
- InvalidValue: If the provided rules exceed the maximum allowed.
- """
- self.rules = rules
- @property
- def rules(self):
- """Returns the list of access boundary rules defined on the Credential
- Access Boundary.
- Returns:
- Tuple[google.auth.downscoped.AccessBoundaryRule, ...]: The list of access
- boundary rules defined on the Credential Access Boundary. These are returned
- as an immutable tuple to prevent modification.
- """
- return tuple(self._rules)
- @rules.setter
- def rules(self, value):
- """Updates the current rules on the Credential Access Boundary. This will overwrite
- the existing set of rules.
- Args:
- value (Sequence[google.auth.downscoped.AccessBoundaryRule]): The list of
- access boundary rules limiting the access that a downscoped credential
- will have.
- Raises:
- InvalidType: If any of the rules are not a valid type.
- InvalidValue: If the provided rules exceed the maximum allowed.
- """
- if len(value) > _MAX_ACCESS_BOUNDARY_RULES_COUNT:
- raise exceptions.InvalidValue(
- "Credential access boundary rules can have a maximum of {} rules.".format(
- _MAX_ACCESS_BOUNDARY_RULES_COUNT
- )
- )
- for access_boundary_rule in value:
- if not isinstance(access_boundary_rule, AccessBoundaryRule):
- raise exceptions.InvalidType(
- "List of rules provided do not contain a valid 'google.auth.downscoped.AccessBoundaryRule'."
- )
- # Make a copy of the original list.
- self._rules = list(value)
- def add_rule(self, rule):
- """Adds a single access boundary rule to the existing rules.
- Args:
- rule (google.auth.downscoped.AccessBoundaryRule): The access boundary rule,
- limiting the access that a downscoped credential will have, to be added to
- the existing rules.
- Raises:
- InvalidType: If any of the rules are not a valid type.
- InvalidValue: If the provided rules exceed the maximum allowed.
- """
- if len(self.rules) == _MAX_ACCESS_BOUNDARY_RULES_COUNT:
- raise exceptions.InvalidValue(
- "Credential access boundary rules can have a maximum of {} rules.".format(
- _MAX_ACCESS_BOUNDARY_RULES_COUNT
- )
- )
- if not isinstance(rule, AccessBoundaryRule):
- raise exceptions.InvalidType(
- "The provided rule does not contain a valid 'google.auth.downscoped.AccessBoundaryRule'."
- )
- self._rules.append(rule)
- def to_json(self):
- """Generates the dictionary representation of the Credential Access Boundary.
- This uses the format expected by the Security Token Service API as documented in
- `Defining a Credential Access Boundary`_.
- .. _Defining a Credential Access Boundary:
- https://cloud.google.com/iam/docs/downscoping-short-lived-credentials#define-boundary
- Returns:
- Mapping: Credential Access Boundary Rule represented in a dictionary object.
- """
- rules = []
- for access_boundary_rule in self.rules:
- rules.append(access_boundary_rule.to_json())
- return {"accessBoundary": {"accessBoundaryRules": rules}}
- class AccessBoundaryRule(object):
- """Defines an access boundary rule which contains information on the resource that
- the rule applies to, the upper bound of the permissions that are available on that
- resource and an optional condition to further restrict permissions.
- """
- def __init__(
- self, available_resource, available_permissions, availability_condition=None
- ):
- """Instantiates a single access boundary rule.
- Args:
- available_resource (str): The full resource name of the Cloud Storage bucket
- that the rule applies to. Use the format
- "//storage.googleapis.com/projects/_/buckets/bucket-name".
- available_permissions (Sequence[str]): A list defining the upper bound that
- the downscoped token will have on the available permissions for the
- resource. Each value is the identifier for an IAM predefined role or
- custom role, with the prefix "inRole:". For example:
- "inRole:roles/storage.objectViewer".
- Only the permissions in these roles will be available.
- availability_condition (Optional[google.auth.downscoped.AvailabilityCondition]):
- Optional condition that restricts the availability of permissions to
- specific Cloud Storage objects.
- Raises:
- InvalidType: If any of the parameters are not of the expected types.
- InvalidValue: If any of the parameters are not of the expected values.
- """
- self.available_resource = available_resource
- self.available_permissions = available_permissions
- self.availability_condition = availability_condition
- @property
- def available_resource(self):
- """Returns the current available resource.
- Returns:
- str: The current available resource.
- """
- return self._available_resource
- @available_resource.setter
- def available_resource(self, value):
- """Updates the current available resource.
- Args:
- value (str): The updated value of the available resource.
- Raises:
- google.auth.exceptions.InvalidType: If the value is not a string.
- """
- if not isinstance(value, str):
- raise exceptions.InvalidType(
- "The provided available_resource is not a string."
- )
- self._available_resource = value
- @property
- def available_permissions(self):
- """Returns the current available permissions.
- Returns:
- Tuple[str, ...]: The current available permissions. These are returned
- as an immutable tuple to prevent modification.
- """
- return tuple(self._available_permissions)
- @available_permissions.setter
- def available_permissions(self, value):
- """Updates the current available permissions.
- Args:
- value (Sequence[str]): The updated value of the available permissions.
- Raises:
- InvalidType: If the value is not a list of strings.
- InvalidValue: If the value is not valid.
- """
- for available_permission in value:
- if not isinstance(available_permission, str):
- raise exceptions.InvalidType(
- "Provided available_permissions are not a list of strings."
- )
- if available_permission.find("inRole:") != 0:
- raise exceptions.InvalidValue(
- "available_permissions must be prefixed with 'inRole:'."
- )
- # Make a copy of the original list.
- self._available_permissions = list(value)
- @property
- def availability_condition(self):
- """Returns the current availability condition.
- Returns:
- Optional[google.auth.downscoped.AvailabilityCondition]: The current
- availability condition.
- """
- return self._availability_condition
- @availability_condition.setter
- def availability_condition(self, value):
- """Updates the current availability condition.
- Args:
- value (Optional[google.auth.downscoped.AvailabilityCondition]): The updated
- value of the availability condition.
- Raises:
- google.auth.exceptions.InvalidType: If the value is not of type google.auth.downscoped.AvailabilityCondition
- or None.
- """
- if not isinstance(value, AvailabilityCondition) and value is not None:
- raise exceptions.InvalidType(
- "The provided availability_condition is not a 'google.auth.downscoped.AvailabilityCondition' or None."
- )
- self._availability_condition = value
- def to_json(self):
- """Generates the dictionary representation of the access boundary rule.
- This uses the format expected by the Security Token Service API as documented in
- `Defining a Credential Access Boundary`_.
- .. _Defining a Credential Access Boundary:
- https://cloud.google.com/iam/docs/downscoping-short-lived-credentials#define-boundary
- Returns:
- Mapping: The access boundary rule represented in a dictionary object.
- """
- json = {
- "availablePermissions": list(self.available_permissions),
- "availableResource": self.available_resource,
- }
- if self.availability_condition:
- json["availabilityCondition"] = self.availability_condition.to_json()
- return json
- class AvailabilityCondition(object):
- """An optional condition that can be used as part of a Credential Access Boundary
- to further restrict permissions."""
- def __init__(self, expression, title=None, description=None):
- """Instantiates an availability condition using the provided expression and
- optional title or description.
- Args:
- expression (str): A condition expression that specifies the Cloud Storage
- objects where permissions are available. For example, this expression
- makes permissions available for objects whose name starts with "customer-a":
- "resource.name.startsWith('projects/_/buckets/example-bucket/objects/customer-a')"
- title (Optional[str]): An optional short string that identifies the purpose of
- the condition.
- description (Optional[str]): Optional details about the purpose of the condition.
- Raises:
- InvalidType: If any of the parameters are not of the expected types.
- InvalidValue: If any of the parameters are not of the expected values.
- """
- self.expression = expression
- self.title = title
- self.description = description
- @property
- def expression(self):
- """Returns the current condition expression.
- Returns:
- str: The current conditon expression.
- """
- return self._expression
- @expression.setter
- def expression(self, value):
- """Updates the current condition expression.
- Args:
- value (str): The updated value of the condition expression.
- Raises:
- google.auth.exceptions.InvalidType: If the value is not of type string.
- """
- if not isinstance(value, str):
- raise exceptions.InvalidType("The provided expression is not a string.")
- self._expression = value
- @property
- def title(self):
- """Returns the current title.
- Returns:
- Optional[str]: The current title.
- """
- return self._title
- @title.setter
- def title(self, value):
- """Updates the current title.
- Args:
- value (Optional[str]): The updated value of the title.
- Raises:
- google.auth.exceptions.InvalidType: If the value is not of type string or None.
- """
- if not isinstance(value, str) and value is not None:
- raise exceptions.InvalidType("The provided title is not a string or None.")
- self._title = value
- @property
- def description(self):
- """Returns the current description.
- Returns:
- Optional[str]: The current description.
- """
- return self._description
- @description.setter
- def description(self, value):
- """Updates the current description.
- Args:
- value (Optional[str]): The updated value of the description.
- Raises:
- google.auth.exceptions.InvalidType: If the value is not of type string or None.
- """
- if not isinstance(value, str) and value is not None:
- raise exceptions.InvalidType(
- "The provided description is not a string or None."
- )
- self._description = value
- def to_json(self):
- """Generates the dictionary representation of the availability condition.
- This uses the format expected by the Security Token Service API as documented in
- `Defining a Credential Access Boundary`_.
- .. _Defining a Credential Access Boundary:
- https://cloud.google.com/iam/docs/downscoping-short-lived-credentials#define-boundary
- Returns:
- Mapping[str, str]: The availability condition represented in a dictionary
- object.
- """
- json = {"expression": self.expression}
- if self.title:
- json["title"] = self.title
- if self.description:
- json["description"] = self.description
- return json
- class Credentials(credentials.CredentialsWithQuotaProject):
- """Defines a set of Google credentials that are downscoped from an existing set
- of Google OAuth2 credentials. This is useful to restrict the Identity and Access
- Management (IAM) permissions that a short-lived credential can use.
- The common pattern of usage is to have a token broker with elevated access
- generate these downscoped credentials from higher access source credentials and
- pass the downscoped short-lived access tokens to a token consumer via some
- secure authenticated channel for limited access to Google Cloud Storage
- resources.
- """
- def __init__(
- self,
- source_credentials,
- credential_access_boundary,
- quota_project_id=None,
- universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN,
- ):
- """Instantiates a downscoped credentials object using the provided source
- credentials and credential access boundary rules.
- To downscope permissions of a source credential, a Credential Access Boundary
- that specifies which resources the new credential can access, as well as an
- upper bound on the permissions that are available on each resource, has to be
- defined. A downscoped credential can then be instantiated using the source
- credential and the Credential Access Boundary.
- Args:
- source_credentials (google.auth.credentials.Credentials): The source credentials
- to be downscoped based on the provided Credential Access Boundary rules.
- credential_access_boundary (google.auth.downscoped.CredentialAccessBoundary):
- The Credential Access Boundary which contains a list of access boundary
- rules. Each rule contains information on the resource that the rule applies to,
- the upper bound of the permissions that are available on that resource and an
- optional condition to further restrict permissions.
- quota_project_id (Optional[str]): The optional quota project ID.
- universe_domain (Optional[str]): The universe domain value, default is googleapis.com
- Raises:
- google.auth.exceptions.RefreshError: If the source credentials
- return an error on token refresh.
- google.auth.exceptions.OAuthError: If the STS token exchange
- endpoint returned an error during downscoped token generation.
- """
- super(Credentials, self).__init__()
- self._source_credentials = source_credentials
- self._credential_access_boundary = credential_access_boundary
- self._quota_project_id = quota_project_id
- self._universe_domain = universe_domain or credentials.DEFAULT_UNIVERSE_DOMAIN
- self._sts_client = sts.Client(
- _STS_TOKEN_URL_PATTERN.format(self.universe_domain)
- )
- @_helpers.copy_docstring(credentials.Credentials)
- def refresh(self, request):
- # Generate an access token from the source credentials.
- self._source_credentials.refresh(request)
- now = _helpers.utcnow()
- # Exchange the access token for a downscoped access token.
- response_data = self._sts_client.exchange_token(
- request=request,
- grant_type=_STS_GRANT_TYPE,
- subject_token=self._source_credentials.token,
- subject_token_type=_STS_SUBJECT_TOKEN_TYPE,
- requested_token_type=_STS_REQUESTED_TOKEN_TYPE,
- additional_options=self._credential_access_boundary.to_json(),
- )
- self.token = response_data.get("access_token")
- # For downscoping CAB flow, the STS endpoint may not return the expiration
- # field for some flows. The generated downscoped token should always have
- # the same expiration time as the source credentials. When no expires_in
- # field is returned in the response, we can just get the expiration time
- # from the source credentials.
- if response_data.get("expires_in"):
- lifetime = datetime.timedelta(seconds=response_data.get("expires_in"))
- self.expiry = now + lifetime
- else:
- self.expiry = self._source_credentials.expiry
- @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
- def with_quota_project(self, quota_project_id):
- return self.__class__(
- self._source_credentials,
- self._credential_access_boundary,
- quota_project_id=quota_project_id,
- )
|