# Copyright 2021 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import datetime import json import mock import pytest from six.moves import http_client from six.moves import urllib from google.auth import _helpers from google.auth import credentials from google.auth import downscoped from google.auth import exceptions from google.auth import transport EXPRESSION = ( "resource.name.startsWith('projects/_/buckets/example-bucket/objects/customer-a')" ) TITLE = "customer-a-objects" DESCRIPTION = ( "Condition to make permissions available for objects starting with customer-a" ) AVAILABLE_RESOURCE = "//storage.googleapis.com/projects/_/buckets/example-bucket" AVAILABLE_PERMISSIONS = ["inRole:roles/storage.objectViewer"] OTHER_EXPRESSION = ( "resource.name.startsWith('projects/_/buckets/example-bucket/objects/customer-b')" ) OTHER_TITLE = "customer-b-objects" OTHER_DESCRIPTION = ( "Condition to make permissions available for objects starting with customer-b" ) OTHER_AVAILABLE_RESOURCE = "//storage.googleapis.com/projects/_/buckets/other-bucket" OTHER_AVAILABLE_PERMISSIONS = ["inRole:roles/storage.objectCreator"] QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID" GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange" REQUESTED_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token" TOKEN_EXCHANGE_ENDPOINT = "https://sts.googleapis.com/v1/token" SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token" SUCCESS_RESPONSE = { "access_token": "ACCESS_TOKEN", "issued_token_type": "urn:ietf:params:oauth:token-type:access_token", "token_type": "Bearer", "expires_in": 3600, } ERROR_RESPONSE = { "error": "invalid_grant", "error_description": "Subject token is invalid.", "error_uri": "https://tools.ietf.org/html/rfc6749", } CREDENTIAL_ACCESS_BOUNDARY_JSON = { "accessBoundary": { "accessBoundaryRules": [ { "availablePermissions": AVAILABLE_PERMISSIONS, "availableResource": AVAILABLE_RESOURCE, "availabilityCondition": { "expression": EXPRESSION, "title": TITLE, "description": DESCRIPTION, }, } ] } } class SourceCredentials(credentials.Credentials): def __init__(self, raise_error=False, expires_in=3600): super(SourceCredentials, self).__init__() self._counter = 0 self._raise_error = raise_error self._expires_in = expires_in def refresh(self, request): if self._raise_error: raise exceptions.RefreshError( "Failed to refresh access token in source credentials." ) now = _helpers.utcnow() self._counter += 1 self.token = "ACCESS_TOKEN_{}".format(self._counter) self.expiry = now + datetime.timedelta(seconds=self._expires_in) def make_availability_condition(expression, title=None, description=None): return downscoped.AvailabilityCondition(expression, title, description) def make_access_boundary_rule( available_resource, available_permissions, availability_condition=None ): return downscoped.AccessBoundaryRule( available_resource, available_permissions, availability_condition ) def make_credential_access_boundary(rules): return downscoped.CredentialAccessBoundary(rules) class TestAvailabilityCondition(object): def test_constructor(self): availability_condition = make_availability_condition( EXPRESSION, TITLE, DESCRIPTION ) assert availability_condition.expression == EXPRESSION assert availability_condition.title == TITLE assert availability_condition.description == DESCRIPTION def test_constructor_required_params_only(self): availability_condition = make_availability_condition(EXPRESSION) assert availability_condition.expression == EXPRESSION assert availability_condition.title is None assert availability_condition.description is None def test_setters(self): availability_condition = make_availability_condition( EXPRESSION, TITLE, DESCRIPTION ) availability_condition.expression = OTHER_EXPRESSION availability_condition.title = OTHER_TITLE availability_condition.description = OTHER_DESCRIPTION assert availability_condition.expression == OTHER_EXPRESSION assert availability_condition.title == OTHER_TITLE assert availability_condition.description == OTHER_DESCRIPTION def test_invalid_expression_type(self): with pytest.raises(TypeError) as excinfo: make_availability_condition([EXPRESSION], TITLE, DESCRIPTION) assert excinfo.match("The provided expression is not a string.") def test_invalid_title_type(self): with pytest.raises(TypeError) as excinfo: make_availability_condition(EXPRESSION, False, DESCRIPTION) assert excinfo.match("The provided title is not a string or None.") def test_invalid_description_type(self): with pytest.raises(TypeError) as excinfo: make_availability_condition(EXPRESSION, TITLE, False) assert excinfo.match("The provided description is not a string or None.") def test_to_json_required_params_only(self): availability_condition = make_availability_condition(EXPRESSION) assert availability_condition.to_json() == {"expression": EXPRESSION} def test_to_json_(self): availability_condition = make_availability_condition( EXPRESSION, TITLE, DESCRIPTION ) assert availability_condition.to_json() == { "expression": EXPRESSION, "title": TITLE, "description": DESCRIPTION, } class TestAccessBoundaryRule(object): def test_constructor(self): availability_condition = make_availability_condition( EXPRESSION, TITLE, DESCRIPTION ) access_boundary_rule = make_access_boundary_rule( AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition ) assert access_boundary_rule.available_resource == AVAILABLE_RESOURCE assert access_boundary_rule.available_permissions == tuple( AVAILABLE_PERMISSIONS ) assert access_boundary_rule.availability_condition == availability_condition def test_constructor_required_params_only(self): access_boundary_rule = make_access_boundary_rule( AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS ) assert access_boundary_rule.available_resource == AVAILABLE_RESOURCE assert access_boundary_rule.available_permissions == tuple( AVAILABLE_PERMISSIONS ) assert access_boundary_rule.availability_condition is None def test_setters(self): availability_condition = make_availability_condition( EXPRESSION, TITLE, DESCRIPTION ) other_availability_condition = make_availability_condition( OTHER_EXPRESSION, OTHER_TITLE, OTHER_DESCRIPTION ) access_boundary_rule = make_access_boundary_rule( AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition ) access_boundary_rule.available_resource = OTHER_AVAILABLE_RESOURCE access_boundary_rule.available_permissions = OTHER_AVAILABLE_PERMISSIONS access_boundary_rule.availability_condition = other_availability_condition assert access_boundary_rule.available_resource == OTHER_AVAILABLE_RESOURCE assert access_boundary_rule.available_permissions == tuple( OTHER_AVAILABLE_PERMISSIONS ) assert ( access_boundary_rule.availability_condition == other_availability_condition ) def test_invalid_available_resource_type(self): availability_condition = make_availability_condition( EXPRESSION, TITLE, DESCRIPTION ) with pytest.raises(TypeError) as excinfo: make_access_boundary_rule( None, AVAILABLE_PERMISSIONS, availability_condition ) assert excinfo.match("The provided available_resource is not a string.") def test_invalid_available_permissions_type(self): availability_condition = make_availability_condition( EXPRESSION, TITLE, DESCRIPTION ) with pytest.raises(TypeError) as excinfo: make_access_boundary_rule( AVAILABLE_RESOURCE, [0, 1, 2], availability_condition ) assert excinfo.match( "Provided available_permissions are not a list of strings." ) def test_invalid_available_permissions_value(self): availability_condition = make_availability_condition( EXPRESSION, TITLE, DESCRIPTION ) with pytest.raises(ValueError) as excinfo: make_access_boundary_rule( AVAILABLE_RESOURCE, ["roles/storage.objectViewer"], availability_condition, ) assert excinfo.match("available_permissions must be prefixed with 'inRole:'.") def test_invalid_availability_condition_type(self): with pytest.raises(TypeError) as excinfo: make_access_boundary_rule( AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, {"foo": "bar"} ) assert excinfo.match( "The provided availability_condition is not a 'google.auth.downscoped.AvailabilityCondition' or None." ) def test_to_json(self): availability_condition = make_availability_condition( EXPRESSION, TITLE, DESCRIPTION ) access_boundary_rule = make_access_boundary_rule( AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition ) assert access_boundary_rule.to_json() == { "availablePermissions": AVAILABLE_PERMISSIONS, "availableResource": AVAILABLE_RESOURCE, "availabilityCondition": { "expression": EXPRESSION, "title": TITLE, "description": DESCRIPTION, }, } def test_to_json_required_params_only(self): access_boundary_rule = make_access_boundary_rule( AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS ) assert access_boundary_rule.to_json() == { "availablePermissions": AVAILABLE_PERMISSIONS, "availableResource": AVAILABLE_RESOURCE, } class TestCredentialAccessBoundary(object): def test_constructor(self): availability_condition = make_availability_condition( EXPRESSION, TITLE, DESCRIPTION ) access_boundary_rule = make_access_boundary_rule( AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition ) rules = [access_boundary_rule] credential_access_boundary = make_credential_access_boundary(rules) assert credential_access_boundary.rules == tuple(rules) def test_setters(self): availability_condition = make_availability_condition( EXPRESSION, TITLE, DESCRIPTION ) access_boundary_rule = make_access_boundary_rule( AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition ) rules = [access_boundary_rule] other_availability_condition = make_availability_condition( OTHER_EXPRESSION, OTHER_TITLE, OTHER_DESCRIPTION ) other_access_boundary_rule = make_access_boundary_rule( OTHER_AVAILABLE_RESOURCE, OTHER_AVAILABLE_PERMISSIONS, other_availability_condition, ) other_rules = [other_access_boundary_rule] credential_access_boundary = make_credential_access_boundary(rules) credential_access_boundary.rules = other_rules assert credential_access_boundary.rules == tuple(other_rules) def test_add_rule(self): availability_condition = make_availability_condition( EXPRESSION, TITLE, DESCRIPTION ) access_boundary_rule = make_access_boundary_rule( AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition ) rules = [access_boundary_rule] * 9 credential_access_boundary = make_credential_access_boundary(rules) # Add one more rule. This should not raise an error. additional_access_boundary_rule = make_access_boundary_rule( OTHER_AVAILABLE_RESOURCE, OTHER_AVAILABLE_PERMISSIONS ) credential_access_boundary.add_rule(additional_access_boundary_rule) assert len(credential_access_boundary.rules) == 10 assert credential_access_boundary.rules[9] == additional_access_boundary_rule def test_add_rule_invalid_value(self): availability_condition = make_availability_condition( EXPRESSION, TITLE, DESCRIPTION ) access_boundary_rule = make_access_boundary_rule( AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition ) rules = [access_boundary_rule] * 10 credential_access_boundary = make_credential_access_boundary(rules) # Add one more rule to exceed maximum allowed rules. with pytest.raises(ValueError) as excinfo: credential_access_boundary.add_rule(access_boundary_rule) assert excinfo.match( "Credential access boundary rules can have a maximum of 10 rules." ) assert len(credential_access_boundary.rules) == 10 def test_add_rule_invalid_type(self): availability_condition = make_availability_condition( EXPRESSION, TITLE, DESCRIPTION ) access_boundary_rule = make_access_boundary_rule( AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition ) rules = [access_boundary_rule] credential_access_boundary = make_credential_access_boundary(rules) # Add an invalid rule to exceed maximum allowed rules. with pytest.raises(TypeError) as excinfo: credential_access_boundary.add_rule("invalid") assert excinfo.match( "The provided rule does not contain a valid 'google.auth.downscoped.AccessBoundaryRule'." ) assert len(credential_access_boundary.rules) == 1 assert credential_access_boundary.rules[0] == access_boundary_rule def test_invalid_rules_type(self): with pytest.raises(TypeError) as excinfo: make_credential_access_boundary(["invalid"]) assert excinfo.match( "List of rules provided do not contain a valid 'google.auth.downscoped.AccessBoundaryRule'." ) def test_invalid_rules_value(self): availability_condition = make_availability_condition( EXPRESSION, TITLE, DESCRIPTION ) access_boundary_rule = make_access_boundary_rule( AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition ) too_many_rules = [access_boundary_rule] * 11 with pytest.raises(ValueError) as excinfo: make_credential_access_boundary(too_many_rules) assert excinfo.match( "Credential access boundary rules can have a maximum of 10 rules." ) def test_to_json(self): availability_condition = make_availability_condition( EXPRESSION, TITLE, DESCRIPTION ) access_boundary_rule = make_access_boundary_rule( AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition ) rules = [access_boundary_rule] credential_access_boundary = make_credential_access_boundary(rules) assert credential_access_boundary.to_json() == { "accessBoundary": { "accessBoundaryRules": [ { "availablePermissions": AVAILABLE_PERMISSIONS, "availableResource": AVAILABLE_RESOURCE, "availabilityCondition": { "expression": EXPRESSION, "title": TITLE, "description": DESCRIPTION, }, } ] } } class TestCredentials(object): @staticmethod def make_credentials(source_credentials=SourceCredentials(), quota_project_id=None): availability_condition = make_availability_condition( EXPRESSION, TITLE, DESCRIPTION ) access_boundary_rule = make_access_boundary_rule( AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition ) rules = [access_boundary_rule] credential_access_boundary = make_credential_access_boundary(rules) return downscoped.Credentials( source_credentials, credential_access_boundary, quota_project_id ) @staticmethod def make_mock_request(data, status=http_client.OK): response = mock.create_autospec(transport.Response, instance=True) response.status = status response.data = json.dumps(data).encode("utf-8") request = mock.create_autospec(transport.Request) request.return_value = response return request @staticmethod def assert_request_kwargs(request_kwargs, headers, request_data): """Asserts the request was called with the expected parameters. """ assert request_kwargs["url"] == TOKEN_EXCHANGE_ENDPOINT assert request_kwargs["method"] == "POST" assert request_kwargs["headers"] == headers assert request_kwargs["body"] is not None body_tuples = urllib.parse.parse_qsl(request_kwargs["body"]) for (k, v) in body_tuples: assert v.decode("utf-8") == request_data[k.decode("utf-8")] assert len(body_tuples) == len(request_data.keys()) def test_default_state(self): credentials = self.make_credentials() # No token acquired yet. assert not credentials.token assert not credentials.valid # Expiration hasn't been set yet. assert not credentials.expiry assert not credentials.expired # No quota project ID set. assert not credentials.quota_project_id def test_with_quota_project(self): credentials = self.make_credentials() assert not credentials.quota_project_id quota_project_creds = credentials.with_quota_project("project-foo") assert quota_project_creds.quota_project_id == "project-foo" @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) def test_refresh(self, unused_utcnow): response = SUCCESS_RESPONSE.copy() # Test custom expiration to confirm expiry is set correctly. response["expires_in"] = 2800 expected_expiry = datetime.datetime.min + datetime.timedelta( seconds=response["expires_in"] ) headers = {"Content-Type": "application/x-www-form-urlencoded"} request_data = { "grant_type": GRANT_TYPE, "subject_token": "ACCESS_TOKEN_1", "subject_token_type": SUBJECT_TOKEN_TYPE, "requested_token_type": REQUESTED_TOKEN_TYPE, "options": urllib.parse.quote(json.dumps(CREDENTIAL_ACCESS_BOUNDARY_JSON)), } request = self.make_mock_request(status=http_client.OK, data=response) source_credentials = SourceCredentials() credentials = self.make_credentials(source_credentials=source_credentials) # Spy on calls to source credentials refresh to confirm the expected request # instance is used. with mock.patch.object( source_credentials, "refresh", wraps=source_credentials.refresh ) as wrapped_souce_cred_refresh: credentials.refresh(request) self.assert_request_kwargs(request.call_args[1], headers, request_data) assert credentials.valid assert credentials.expiry == expected_expiry assert not credentials.expired assert credentials.token == response["access_token"] # Confirm source credentials called with the same request instance. wrapped_souce_cred_refresh.assert_called_with(request) @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) def test_refresh_without_response_expires_in(self, unused_utcnow): response = SUCCESS_RESPONSE.copy() # Simulate the response is missing the expires_in field. # The downscoped token expiration should match the source credentials # expiration. del response["expires_in"] expected_expires_in = 1800 # Simulate the source credentials generates a token with 1800 second # expiration time. The generated downscoped token should have the same # expiration time. source_credentials = SourceCredentials(expires_in=expected_expires_in) expected_expiry = datetime.datetime.min + datetime.timedelta( seconds=expected_expires_in ) headers = {"Content-Type": "application/x-www-form-urlencoded"} request_data = { "grant_type": GRANT_TYPE, "subject_token": "ACCESS_TOKEN_1", "subject_token_type": SUBJECT_TOKEN_TYPE, "requested_token_type": REQUESTED_TOKEN_TYPE, "options": urllib.parse.quote(json.dumps(CREDENTIAL_ACCESS_BOUNDARY_JSON)), } request = self.make_mock_request(status=http_client.OK, data=response) credentials = self.make_credentials(source_credentials=source_credentials) # Spy on calls to source credentials refresh to confirm the expected request # instance is used. with mock.patch.object( source_credentials, "refresh", wraps=source_credentials.refresh ) as wrapped_souce_cred_refresh: credentials.refresh(request) self.assert_request_kwargs(request.call_args[1], headers, request_data) assert credentials.valid assert credentials.expiry == expected_expiry assert not credentials.expired assert credentials.token == response["access_token"] # Confirm source credentials called with the same request instance. wrapped_souce_cred_refresh.assert_called_with(request) def test_refresh_token_exchange_error(self): request = self.make_mock_request( status=http_client.BAD_REQUEST, data=ERROR_RESPONSE ) credentials = self.make_credentials() with pytest.raises(exceptions.OAuthError) as excinfo: credentials.refresh(request) assert excinfo.match( r"Error code invalid_grant: Subject token is invalid. - https://tools.ietf.org/html/rfc6749" ) assert not credentials.expired assert credentials.token is None def test_refresh_source_credentials_refresh_error(self): # Initialize downscoped credentials with source credentials that raise # an error on refresh. credentials = self.make_credentials( source_credentials=SourceCredentials(raise_error=True) ) with pytest.raises(exceptions.RefreshError) as excinfo: credentials.refresh(mock.sentinel.request) assert excinfo.match(r"Failed to refresh access token in source credentials.") assert not credentials.expired assert credentials.token is None def test_apply_without_quota_project_id(self): headers = {} request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE) credentials = self.make_credentials() credentials.refresh(request) credentials.apply(headers) assert headers == { "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]) } def test_apply_with_quota_project_id(self): headers = {"other": "header-value"} request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE) credentials = self.make_credentials(quota_project_id=QUOTA_PROJECT_ID) credentials.refresh(request) credentials.apply(headers) assert headers == { "other": "header-value", "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]), "x-goog-user-project": QUOTA_PROJECT_ID, } def test_before_request(self): headers = {"other": "header-value"} request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE) credentials = self.make_credentials() # First call should call refresh, setting the token. credentials.before_request(request, "POST", "https://example.com/api", headers) assert headers == { "other": "header-value", "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]), } # Second call shouldn't call refresh (request should be untouched). credentials.before_request( mock.sentinel.request, "POST", "https://example.com/api", headers ) assert headers == { "other": "header-value", "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]), } @mock.patch("google.auth._helpers.utcnow") def test_before_request_expired(self, utcnow): headers = {} request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE) credentials = self.make_credentials() credentials.token = "token" utcnow.return_value = datetime.datetime.min # Set the expiration to one second more than now plus the clock skew # accommodation. These credentials should be valid. credentials.expiry = ( datetime.datetime.min + _helpers.CLOCK_SKEW + datetime.timedelta(seconds=1) ) assert credentials.valid assert not credentials.expired credentials.before_request(request, "POST", "https://example.com/api", headers) # Cached token should be used. assert headers == {"authorization": "Bearer token"} # Next call should simulate 1 second passed. utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1) assert not credentials.valid assert credentials.expired credentials.before_request(request, "POST", "https://example.com/api", headers) # New token should be retrieved. assert headers == { "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]) }