123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694 |
- # Copyright 2021 Google LLC
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import datetime
- import json
- import mock
- import pytest
- from six.moves import http_client
- from six.moves import urllib
- from google.auth import _helpers
- from google.auth import credentials
- from google.auth import downscoped
- from google.auth import exceptions
- from google.auth import transport
- EXPRESSION = (
- "resource.name.startsWith('projects/_/buckets/example-bucket/objects/customer-a')"
- )
- TITLE = "customer-a-objects"
- DESCRIPTION = (
- "Condition to make permissions available for objects starting with customer-a"
- )
- AVAILABLE_RESOURCE = "//storage.googleapis.com/projects/_/buckets/example-bucket"
- AVAILABLE_PERMISSIONS = ["inRole:roles/storage.objectViewer"]
- OTHER_EXPRESSION = (
- "resource.name.startsWith('projects/_/buckets/example-bucket/objects/customer-b')"
- )
- OTHER_TITLE = "customer-b-objects"
- OTHER_DESCRIPTION = (
- "Condition to make permissions available for objects starting with customer-b"
- )
- OTHER_AVAILABLE_RESOURCE = "//storage.googleapis.com/projects/_/buckets/other-bucket"
- OTHER_AVAILABLE_PERMISSIONS = ["inRole:roles/storage.objectCreator"]
- QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
- GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange"
- REQUESTED_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
- TOKEN_EXCHANGE_ENDPOINT = "https://sts.googleapis.com/v1/token"
- SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
- SUCCESS_RESPONSE = {
- "access_token": "ACCESS_TOKEN",
- "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
- "token_type": "Bearer",
- "expires_in": 3600,
- }
- ERROR_RESPONSE = {
- "error": "invalid_grant",
- "error_description": "Subject token is invalid.",
- "error_uri": "https://tools.ietf.org/html/rfc6749",
- }
- CREDENTIAL_ACCESS_BOUNDARY_JSON = {
- "accessBoundary": {
- "accessBoundaryRules": [
- {
- "availablePermissions": AVAILABLE_PERMISSIONS,
- "availableResource": AVAILABLE_RESOURCE,
- "availabilityCondition": {
- "expression": EXPRESSION,
- "title": TITLE,
- "description": DESCRIPTION,
- },
- }
- ]
- }
- }
- class SourceCredentials(credentials.Credentials):
- def __init__(self, raise_error=False, expires_in=3600):
- super(SourceCredentials, self).__init__()
- self._counter = 0
- self._raise_error = raise_error
- self._expires_in = expires_in
- def refresh(self, request):
- if self._raise_error:
- raise exceptions.RefreshError(
- "Failed to refresh access token in source credentials."
- )
- now = _helpers.utcnow()
- self._counter += 1
- self.token = "ACCESS_TOKEN_{}".format(self._counter)
- self.expiry = now + datetime.timedelta(seconds=self._expires_in)
- def make_availability_condition(expression, title=None, description=None):
- return downscoped.AvailabilityCondition(expression, title, description)
- def make_access_boundary_rule(
- available_resource, available_permissions, availability_condition=None
- ):
- return downscoped.AccessBoundaryRule(
- available_resource, available_permissions, availability_condition
- )
- def make_credential_access_boundary(rules):
- return downscoped.CredentialAccessBoundary(rules)
- class TestAvailabilityCondition(object):
- def test_constructor(self):
- availability_condition = make_availability_condition(
- EXPRESSION, TITLE, DESCRIPTION
- )
- assert availability_condition.expression == EXPRESSION
- assert availability_condition.title == TITLE
- assert availability_condition.description == DESCRIPTION
- def test_constructor_required_params_only(self):
- availability_condition = make_availability_condition(EXPRESSION)
- assert availability_condition.expression == EXPRESSION
- assert availability_condition.title is None
- assert availability_condition.description is None
- def test_setters(self):
- availability_condition = make_availability_condition(
- EXPRESSION, TITLE, DESCRIPTION
- )
- availability_condition.expression = OTHER_EXPRESSION
- availability_condition.title = OTHER_TITLE
- availability_condition.description = OTHER_DESCRIPTION
- assert availability_condition.expression == OTHER_EXPRESSION
- assert availability_condition.title == OTHER_TITLE
- assert availability_condition.description == OTHER_DESCRIPTION
- def test_invalid_expression_type(self):
- with pytest.raises(TypeError) as excinfo:
- make_availability_condition([EXPRESSION], TITLE, DESCRIPTION)
- assert excinfo.match("The provided expression is not a string.")
- def test_invalid_title_type(self):
- with pytest.raises(TypeError) as excinfo:
- make_availability_condition(EXPRESSION, False, DESCRIPTION)
- assert excinfo.match("The provided title is not a string or None.")
- def test_invalid_description_type(self):
- with pytest.raises(TypeError) as excinfo:
- make_availability_condition(EXPRESSION, TITLE, False)
- assert excinfo.match("The provided description is not a string or None.")
- def test_to_json_required_params_only(self):
- availability_condition = make_availability_condition(EXPRESSION)
- assert availability_condition.to_json() == {"expression": EXPRESSION}
- def test_to_json_(self):
- availability_condition = make_availability_condition(
- EXPRESSION, TITLE, DESCRIPTION
- )
- assert availability_condition.to_json() == {
- "expression": EXPRESSION,
- "title": TITLE,
- "description": DESCRIPTION,
- }
- class TestAccessBoundaryRule(object):
- def test_constructor(self):
- availability_condition = make_availability_condition(
- EXPRESSION, TITLE, DESCRIPTION
- )
- access_boundary_rule = make_access_boundary_rule(
- AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
- )
- assert access_boundary_rule.available_resource == AVAILABLE_RESOURCE
- assert access_boundary_rule.available_permissions == tuple(
- AVAILABLE_PERMISSIONS
- )
- assert access_boundary_rule.availability_condition == availability_condition
- def test_constructor_required_params_only(self):
- access_boundary_rule = make_access_boundary_rule(
- AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS
- )
- assert access_boundary_rule.available_resource == AVAILABLE_RESOURCE
- assert access_boundary_rule.available_permissions == tuple(
- AVAILABLE_PERMISSIONS
- )
- assert access_boundary_rule.availability_condition is None
- def test_setters(self):
- availability_condition = make_availability_condition(
- EXPRESSION, TITLE, DESCRIPTION
- )
- other_availability_condition = make_availability_condition(
- OTHER_EXPRESSION, OTHER_TITLE, OTHER_DESCRIPTION
- )
- access_boundary_rule = make_access_boundary_rule(
- AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
- )
- access_boundary_rule.available_resource = OTHER_AVAILABLE_RESOURCE
- access_boundary_rule.available_permissions = OTHER_AVAILABLE_PERMISSIONS
- access_boundary_rule.availability_condition = other_availability_condition
- assert access_boundary_rule.available_resource == OTHER_AVAILABLE_RESOURCE
- assert access_boundary_rule.available_permissions == tuple(
- OTHER_AVAILABLE_PERMISSIONS
- )
- assert (
- access_boundary_rule.availability_condition == other_availability_condition
- )
- def test_invalid_available_resource_type(self):
- availability_condition = make_availability_condition(
- EXPRESSION, TITLE, DESCRIPTION
- )
- with pytest.raises(TypeError) as excinfo:
- make_access_boundary_rule(
- None, AVAILABLE_PERMISSIONS, availability_condition
- )
- assert excinfo.match("The provided available_resource is not a string.")
- def test_invalid_available_permissions_type(self):
- availability_condition = make_availability_condition(
- EXPRESSION, TITLE, DESCRIPTION
- )
- with pytest.raises(TypeError) as excinfo:
- make_access_boundary_rule(
- AVAILABLE_RESOURCE, [0, 1, 2], availability_condition
- )
- assert excinfo.match(
- "Provided available_permissions are not a list of strings."
- )
- def test_invalid_available_permissions_value(self):
- availability_condition = make_availability_condition(
- EXPRESSION, TITLE, DESCRIPTION
- )
- with pytest.raises(ValueError) as excinfo:
- make_access_boundary_rule(
- AVAILABLE_RESOURCE,
- ["roles/storage.objectViewer"],
- availability_condition,
- )
- assert excinfo.match("available_permissions must be prefixed with 'inRole:'.")
- def test_invalid_availability_condition_type(self):
- with pytest.raises(TypeError) as excinfo:
- make_access_boundary_rule(
- AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, {"foo": "bar"}
- )
- assert excinfo.match(
- "The provided availability_condition is not a 'google.auth.downscoped.AvailabilityCondition' or None."
- )
- def test_to_json(self):
- availability_condition = make_availability_condition(
- EXPRESSION, TITLE, DESCRIPTION
- )
- access_boundary_rule = make_access_boundary_rule(
- AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
- )
- assert access_boundary_rule.to_json() == {
- "availablePermissions": AVAILABLE_PERMISSIONS,
- "availableResource": AVAILABLE_RESOURCE,
- "availabilityCondition": {
- "expression": EXPRESSION,
- "title": TITLE,
- "description": DESCRIPTION,
- },
- }
- def test_to_json_required_params_only(self):
- access_boundary_rule = make_access_boundary_rule(
- AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS
- )
- assert access_boundary_rule.to_json() == {
- "availablePermissions": AVAILABLE_PERMISSIONS,
- "availableResource": AVAILABLE_RESOURCE,
- }
- class TestCredentialAccessBoundary(object):
- def test_constructor(self):
- availability_condition = make_availability_condition(
- EXPRESSION, TITLE, DESCRIPTION
- )
- access_boundary_rule = make_access_boundary_rule(
- AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
- )
- rules = [access_boundary_rule]
- credential_access_boundary = make_credential_access_boundary(rules)
- assert credential_access_boundary.rules == tuple(rules)
- def test_setters(self):
- availability_condition = make_availability_condition(
- EXPRESSION, TITLE, DESCRIPTION
- )
- access_boundary_rule = make_access_boundary_rule(
- AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
- )
- rules = [access_boundary_rule]
- other_availability_condition = make_availability_condition(
- OTHER_EXPRESSION, OTHER_TITLE, OTHER_DESCRIPTION
- )
- other_access_boundary_rule = make_access_boundary_rule(
- OTHER_AVAILABLE_RESOURCE,
- OTHER_AVAILABLE_PERMISSIONS,
- other_availability_condition,
- )
- other_rules = [other_access_boundary_rule]
- credential_access_boundary = make_credential_access_boundary(rules)
- credential_access_boundary.rules = other_rules
- assert credential_access_boundary.rules == tuple(other_rules)
- def test_add_rule(self):
- availability_condition = make_availability_condition(
- EXPRESSION, TITLE, DESCRIPTION
- )
- access_boundary_rule = make_access_boundary_rule(
- AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
- )
- rules = [access_boundary_rule] * 9
- credential_access_boundary = make_credential_access_boundary(rules)
- # Add one more rule. This should not raise an error.
- additional_access_boundary_rule = make_access_boundary_rule(
- OTHER_AVAILABLE_RESOURCE, OTHER_AVAILABLE_PERMISSIONS
- )
- credential_access_boundary.add_rule(additional_access_boundary_rule)
- assert len(credential_access_boundary.rules) == 10
- assert credential_access_boundary.rules[9] == additional_access_boundary_rule
- def test_add_rule_invalid_value(self):
- availability_condition = make_availability_condition(
- EXPRESSION, TITLE, DESCRIPTION
- )
- access_boundary_rule = make_access_boundary_rule(
- AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
- )
- rules = [access_boundary_rule] * 10
- credential_access_boundary = make_credential_access_boundary(rules)
- # Add one more rule to exceed maximum allowed rules.
- with pytest.raises(ValueError) as excinfo:
- credential_access_boundary.add_rule(access_boundary_rule)
- assert excinfo.match(
- "Credential access boundary rules can have a maximum of 10 rules."
- )
- assert len(credential_access_boundary.rules) == 10
- def test_add_rule_invalid_type(self):
- availability_condition = make_availability_condition(
- EXPRESSION, TITLE, DESCRIPTION
- )
- access_boundary_rule = make_access_boundary_rule(
- AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
- )
- rules = [access_boundary_rule]
- credential_access_boundary = make_credential_access_boundary(rules)
- # Add an invalid rule to exceed maximum allowed rules.
- with pytest.raises(TypeError) as excinfo:
- credential_access_boundary.add_rule("invalid")
- assert excinfo.match(
- "The provided rule does not contain a valid 'google.auth.downscoped.AccessBoundaryRule'."
- )
- assert len(credential_access_boundary.rules) == 1
- assert credential_access_boundary.rules[0] == access_boundary_rule
- def test_invalid_rules_type(self):
- with pytest.raises(TypeError) as excinfo:
- make_credential_access_boundary(["invalid"])
- assert excinfo.match(
- "List of rules provided do not contain a valid 'google.auth.downscoped.AccessBoundaryRule'."
- )
- def test_invalid_rules_value(self):
- availability_condition = make_availability_condition(
- EXPRESSION, TITLE, DESCRIPTION
- )
- access_boundary_rule = make_access_boundary_rule(
- AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
- )
- too_many_rules = [access_boundary_rule] * 11
- with pytest.raises(ValueError) as excinfo:
- make_credential_access_boundary(too_many_rules)
- assert excinfo.match(
- "Credential access boundary rules can have a maximum of 10 rules."
- )
- def test_to_json(self):
- availability_condition = make_availability_condition(
- EXPRESSION, TITLE, DESCRIPTION
- )
- access_boundary_rule = make_access_boundary_rule(
- AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
- )
- rules = [access_boundary_rule]
- credential_access_boundary = make_credential_access_boundary(rules)
- assert credential_access_boundary.to_json() == {
- "accessBoundary": {
- "accessBoundaryRules": [
- {
- "availablePermissions": AVAILABLE_PERMISSIONS,
- "availableResource": AVAILABLE_RESOURCE,
- "availabilityCondition": {
- "expression": EXPRESSION,
- "title": TITLE,
- "description": DESCRIPTION,
- },
- }
- ]
- }
- }
- class TestCredentials(object):
- @staticmethod
- def make_credentials(source_credentials=SourceCredentials(), quota_project_id=None):
- availability_condition = make_availability_condition(
- EXPRESSION, TITLE, DESCRIPTION
- )
- access_boundary_rule = make_access_boundary_rule(
- AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
- )
- rules = [access_boundary_rule]
- credential_access_boundary = make_credential_access_boundary(rules)
- return downscoped.Credentials(
- source_credentials, credential_access_boundary, quota_project_id
- )
- @staticmethod
- def make_mock_request(data, status=http_client.OK):
- response = mock.create_autospec(transport.Response, instance=True)
- response.status = status
- response.data = json.dumps(data).encode("utf-8")
- request = mock.create_autospec(transport.Request)
- request.return_value = response
- return request
- @staticmethod
- def assert_request_kwargs(request_kwargs, headers, request_data):
- """Asserts the request was called with the expected parameters.
- """
- assert request_kwargs["url"] == TOKEN_EXCHANGE_ENDPOINT
- assert request_kwargs["method"] == "POST"
- assert request_kwargs["headers"] == headers
- assert request_kwargs["body"] is not None
- body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
- for (k, v) in body_tuples:
- assert v.decode("utf-8") == request_data[k.decode("utf-8")]
- assert len(body_tuples) == len(request_data.keys())
- def test_default_state(self):
- credentials = self.make_credentials()
- # No token acquired yet.
- assert not credentials.token
- assert not credentials.valid
- # Expiration hasn't been set yet.
- assert not credentials.expiry
- assert not credentials.expired
- # No quota project ID set.
- assert not credentials.quota_project_id
- def test_with_quota_project(self):
- credentials = self.make_credentials()
- assert not credentials.quota_project_id
- quota_project_creds = credentials.with_quota_project("project-foo")
- assert quota_project_creds.quota_project_id == "project-foo"
- @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
- def test_refresh(self, unused_utcnow):
- response = SUCCESS_RESPONSE.copy()
- # Test custom expiration to confirm expiry is set correctly.
- response["expires_in"] = 2800
- expected_expiry = datetime.datetime.min + datetime.timedelta(
- seconds=response["expires_in"]
- )
- headers = {"Content-Type": "application/x-www-form-urlencoded"}
- request_data = {
- "grant_type": GRANT_TYPE,
- "subject_token": "ACCESS_TOKEN_1",
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- "requested_token_type": REQUESTED_TOKEN_TYPE,
- "options": urllib.parse.quote(json.dumps(CREDENTIAL_ACCESS_BOUNDARY_JSON)),
- }
- request = self.make_mock_request(status=http_client.OK, data=response)
- source_credentials = SourceCredentials()
- credentials = self.make_credentials(source_credentials=source_credentials)
- # Spy on calls to source credentials refresh to confirm the expected request
- # instance is used.
- with mock.patch.object(
- source_credentials, "refresh", wraps=source_credentials.refresh
- ) as wrapped_souce_cred_refresh:
- credentials.refresh(request)
- self.assert_request_kwargs(request.call_args[1], headers, request_data)
- assert credentials.valid
- assert credentials.expiry == expected_expiry
- assert not credentials.expired
- assert credentials.token == response["access_token"]
- # Confirm source credentials called with the same request instance.
- wrapped_souce_cred_refresh.assert_called_with(request)
- @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
- def test_refresh_without_response_expires_in(self, unused_utcnow):
- response = SUCCESS_RESPONSE.copy()
- # Simulate the response is missing the expires_in field.
- # The downscoped token expiration should match the source credentials
- # expiration.
- del response["expires_in"]
- expected_expires_in = 1800
- # Simulate the source credentials generates a token with 1800 second
- # expiration time. The generated downscoped token should have the same
- # expiration time.
- source_credentials = SourceCredentials(expires_in=expected_expires_in)
- expected_expiry = datetime.datetime.min + datetime.timedelta(
- seconds=expected_expires_in
- )
- headers = {"Content-Type": "application/x-www-form-urlencoded"}
- request_data = {
- "grant_type": GRANT_TYPE,
- "subject_token": "ACCESS_TOKEN_1",
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- "requested_token_type": REQUESTED_TOKEN_TYPE,
- "options": urllib.parse.quote(json.dumps(CREDENTIAL_ACCESS_BOUNDARY_JSON)),
- }
- request = self.make_mock_request(status=http_client.OK, data=response)
- credentials = self.make_credentials(source_credentials=source_credentials)
- # Spy on calls to source credentials refresh to confirm the expected request
- # instance is used.
- with mock.patch.object(
- source_credentials, "refresh", wraps=source_credentials.refresh
- ) as wrapped_souce_cred_refresh:
- credentials.refresh(request)
- self.assert_request_kwargs(request.call_args[1], headers, request_data)
- assert credentials.valid
- assert credentials.expiry == expected_expiry
- assert not credentials.expired
- assert credentials.token == response["access_token"]
- # Confirm source credentials called with the same request instance.
- wrapped_souce_cred_refresh.assert_called_with(request)
- def test_refresh_token_exchange_error(self):
- request = self.make_mock_request(
- status=http_client.BAD_REQUEST, data=ERROR_RESPONSE
- )
- credentials = self.make_credentials()
- with pytest.raises(exceptions.OAuthError) as excinfo:
- credentials.refresh(request)
- assert excinfo.match(
- r"Error code invalid_grant: Subject token is invalid. - https://tools.ietf.org/html/rfc6749"
- )
- assert not credentials.expired
- assert credentials.token is None
- def test_refresh_source_credentials_refresh_error(self):
- # Initialize downscoped credentials with source credentials that raise
- # an error on refresh.
- credentials = self.make_credentials(
- source_credentials=SourceCredentials(raise_error=True)
- )
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.refresh(mock.sentinel.request)
- assert excinfo.match(r"Failed to refresh access token in source credentials.")
- assert not credentials.expired
- assert credentials.token is None
- def test_apply_without_quota_project_id(self):
- headers = {}
- request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
- credentials = self.make_credentials()
- credentials.refresh(request)
- credentials.apply(headers)
- assert headers == {
- "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"])
- }
- def test_apply_with_quota_project_id(self):
- headers = {"other": "header-value"}
- request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
- credentials = self.make_credentials(quota_project_id=QUOTA_PROJECT_ID)
- credentials.refresh(request)
- credentials.apply(headers)
- assert headers == {
- "other": "header-value",
- "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]),
- "x-goog-user-project": QUOTA_PROJECT_ID,
- }
- def test_before_request(self):
- headers = {"other": "header-value"}
- request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
- credentials = self.make_credentials()
- # First call should call refresh, setting the token.
- credentials.before_request(request, "POST", "https://example.com/api", headers)
- assert headers == {
- "other": "header-value",
- "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]),
- }
- # Second call shouldn't call refresh (request should be untouched).
- credentials.before_request(
- mock.sentinel.request, "POST", "https://example.com/api", headers
- )
- assert headers == {
- "other": "header-value",
- "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]),
- }
- @mock.patch("google.auth._helpers.utcnow")
- def test_before_request_expired(self, utcnow):
- headers = {}
- request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
- credentials = self.make_credentials()
- credentials.token = "token"
- utcnow.return_value = datetime.datetime.min
- # Set the expiration to one second more than now plus the clock skew
- # accommodation. These credentials should be valid.
- credentials.expiry = (
- datetime.datetime.min + _helpers.CLOCK_SKEW + datetime.timedelta(seconds=1)
- )
- assert credentials.valid
- assert not credentials.expired
- credentials.before_request(request, "POST", "https://example.com/api", headers)
- # Cached token should be used.
- assert headers == {"authorization": "Bearer token"}
- # Next call should simulate 1 second passed.
- utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1)
- assert not credentials.valid
- assert credentials.expired
- credentials.before_request(request, "POST", "https://example.com/api", headers)
- # New token should be retrieved.
- assert headers == {
- "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"])
- }
|