123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967 |
- # Copyright 2016 Google LLC
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import base64
- import datetime
- import mock
- import pytest # type: ignore
- import responses # type: ignore
- from google.auth import _helpers
- from google.auth import exceptions
- from google.auth import jwt
- from google.auth import transport
- from google.auth.compute_engine import credentials
- from google.auth.transport import requests
- SAMPLE_ID_TOKEN_EXP = 1584393400
- # header: {"alg": "RS256", "typ": "JWT", "kid": "1"}
- # payload: {"iss": "issuer", "iat": 1584393348, "sub": "subject",
- # "exp": 1584393400,"aud": "audience"}
- SAMPLE_ID_TOKEN = (
- b"eyJhbGciOiAiUlMyNTYiLCAidHlwIjogIkpXVCIsICJraWQiOiAiMSJ9."
- b"eyJpc3MiOiAiaXNzdWVyIiwgImlhdCI6IDE1ODQzOTMzNDgsICJzdWIiO"
- b"iAic3ViamVjdCIsICJleHAiOiAxNTg0MzkzNDAwLCAiYXVkIjogImF1ZG"
- b"llbmNlIn0."
- b"OquNjHKhTmlgCk361omRo18F_uY-7y0f_AmLbzW062Q1Zr61HAwHYP5FM"
- b"316CK4_0cH8MUNGASsvZc3VqXAqub6PUTfhemH8pFEwBdAdG0LhrNkU0H"
- b"WN1YpT55IiQ31esLdL5q-qDsOPpNZJUti1y1lAreM5nIn2srdWzGXGs4i"
- b"TRQsn0XkNUCL4RErpciXmjfhMrPkcAjKA-mXQm2fa4jmTlEZFqFmUlym1"
- b"ozJ0yf5grjN6AslN4OGvAv1pS-_Ko_pGBS6IQtSBC6vVKCUuBfaqNjykg"
- b"bsxbLa6Fp0SYeYwO8ifEnkRvasVpc1WTQqfRB2JCj5pTBDzJpIpFCMmnQ"
- )
- ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE = (
- "gl-python/3.7 auth/1.1 auth-request-type/at cred-type/mds"
- )
- ID_TOKEN_REQUEST_METRICS_HEADER_VALUE = (
- "gl-python/3.7 auth/1.1 auth-request-type/it cred-type/mds"
- )
- FAKE_SERVICE_ACCOUNT_EMAIL = "foo@bar.com"
- FAKE_QUOTA_PROJECT_ID = "fake-quota-project"
- FAKE_SCOPES = ["scope1", "scope2"]
- FAKE_DEFAULT_SCOPES = ["scope3", "scope4"]
- FAKE_UNIVERSE_DOMAIN = "fake-universe-domain"
- class TestCredentials(object):
- credentials = None
- credentials_with_all_fields = None
- @pytest.fixture(autouse=True)
- def credentials_fixture(self):
- self.credentials = credentials.Credentials()
- self.credentials_with_all_fields = credentials.Credentials(
- service_account_email=FAKE_SERVICE_ACCOUNT_EMAIL,
- quota_project_id=FAKE_QUOTA_PROJECT_ID,
- scopes=FAKE_SCOPES,
- default_scopes=FAKE_DEFAULT_SCOPES,
- universe_domain=FAKE_UNIVERSE_DOMAIN,
- )
- def test_get_cred_info(self):
- assert self.credentials.get_cred_info() == {
- "credential_source": "metadata server",
- "credential_type": "VM credentials",
- "principal": "default",
- }
- def test_default_state(self):
- assert not self.credentials.valid
- # Expiration hasn't been set yet
- assert not self.credentials.expired
- # Scopes are needed
- assert self.credentials.requires_scopes
- # Service account email hasn't been populated
- assert self.credentials.service_account_email == "default"
- # No quota project
- assert not self.credentials._quota_project_id
- # Universe domain is the default and not cached
- assert self.credentials._universe_domain == "googleapis.com"
- assert not self.credentials._universe_domain_cached
- @mock.patch(
- "google.auth._helpers.utcnow",
- return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD,
- )
- @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
- def test_refresh_success(self, get, utcnow):
- get.side_effect = [
- {
- # First request is for sevice account info.
- "email": "service-account@example.com",
- "scopes": ["one", "two"],
- },
- {
- # Second request is for the token.
- "access_token": "token",
- "expires_in": 500,
- },
- ]
- # Refresh credentials
- self.credentials.refresh(None)
- # Check that the credentials have the token and proper expiration
- assert self.credentials.token == "token"
- assert self.credentials.expiry == (utcnow() + datetime.timedelta(seconds=500))
- # Check the credential info
- assert self.credentials.service_account_email == "service-account@example.com"
- assert self.credentials._scopes == ["one", "two"]
- # Check that the credentials are valid (have a token and are not
- # expired)
- assert self.credentials.valid
- @mock.patch(
- "google.auth.metrics.token_request_access_token_mds",
- return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
- )
- @mock.patch(
- "google.auth._helpers.utcnow",
- return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD,
- )
- @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
- def test_refresh_success_with_scopes(self, get, utcnow, mock_metrics_header_value):
- get.side_effect = [
- {
- # First request is for sevice account info.
- "email": "service-account@example.com",
- "scopes": ["one", "two"],
- },
- {
- # Second request is for the token.
- "access_token": "token",
- "expires_in": 500,
- },
- ]
- # Refresh credentials
- scopes = ["three", "four"]
- self.credentials = self.credentials.with_scopes(scopes)
- self.credentials.refresh(None)
- # Check that the credentials have the token and proper expiration
- assert self.credentials.token == "token"
- assert self.credentials.expiry == (utcnow() + datetime.timedelta(seconds=500))
- # Check the credential info
- assert self.credentials.service_account_email == "service-account@example.com"
- assert self.credentials._scopes == scopes
- # Check that the credentials are valid (have a token and are not
- # expired)
- assert self.credentials.valid
- kwargs = get.call_args[1]
- assert kwargs["params"] == {"scopes": "three,four"}
- assert kwargs["headers"] == {
- "x-goog-api-client": ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE
- }
- @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
- def test_refresh_error(self, get):
- get.side_effect = exceptions.TransportError("http error")
- with pytest.raises(exceptions.RefreshError) as excinfo:
- self.credentials.refresh(None)
- assert excinfo.match(r"http error")
- @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
- def test_before_request_refreshes(self, get):
- get.side_effect = [
- {
- # First request is for sevice account info.
- "email": "service-account@example.com",
- "scopes": "one two",
- },
- {
- # Second request is for the token.
- "access_token": "token",
- "expires_in": 500,
- },
- ]
- # Credentials should start as invalid
- assert not self.credentials.valid
- # before_request should cause a refresh
- request = mock.create_autospec(transport.Request, instance=True)
- self.credentials.before_request(request, "GET", "http://example.com?a=1#3", {})
- # The refresh endpoint should've been called.
- assert get.called
- # Credentials should now be valid.
- assert self.credentials.valid
- def test_with_quota_project(self):
- creds = self.credentials_with_all_fields.with_quota_project("project-foo")
- assert creds._quota_project_id == "project-foo"
- assert creds._service_account_email == FAKE_SERVICE_ACCOUNT_EMAIL
- assert creds._scopes == FAKE_SCOPES
- assert creds._default_scopes == FAKE_DEFAULT_SCOPES
- assert creds.universe_domain == FAKE_UNIVERSE_DOMAIN
- assert creds._universe_domain_cached
- def test_with_scopes(self):
- scopes = ["one", "two"]
- creds = self.credentials_with_all_fields.with_scopes(scopes)
- assert creds._scopes == scopes
- assert creds._quota_project_id == FAKE_QUOTA_PROJECT_ID
- assert creds._service_account_email == FAKE_SERVICE_ACCOUNT_EMAIL
- assert creds._default_scopes is None
- assert creds.universe_domain == FAKE_UNIVERSE_DOMAIN
- assert creds._universe_domain_cached
- def test_with_universe_domain(self):
- creds = self.credentials_with_all_fields.with_universe_domain("universe_domain")
- assert creds._scopes == FAKE_SCOPES
- assert creds._quota_project_id == FAKE_QUOTA_PROJECT_ID
- assert creds._service_account_email == FAKE_SERVICE_ACCOUNT_EMAIL
- assert creds._default_scopes == FAKE_DEFAULT_SCOPES
- assert creds.universe_domain == "universe_domain"
- assert creds._universe_domain_cached
- def test_token_usage_metrics(self):
- self.credentials.token = "token"
- self.credentials.expiry = None
- headers = {}
- self.credentials.before_request(mock.Mock(), None, None, headers)
- assert headers["authorization"] == "Bearer token"
- assert headers["x-goog-api-client"] == "cred-type/mds"
- @mock.patch(
- "google.auth.compute_engine._metadata.get_universe_domain",
- return_value="fake_universe_domain",
- )
- def test_universe_domain(self, get_universe_domain):
- # Check the default state
- assert not self.credentials._universe_domain_cached
- assert self.credentials._universe_domain == "googleapis.com"
- # calling the universe_domain property should trigger a call to
- # get_universe_domain to fetch the value. The value should be cached.
- assert self.credentials.universe_domain == "fake_universe_domain"
- assert self.credentials._universe_domain == "fake_universe_domain"
- assert self.credentials._universe_domain_cached
- get_universe_domain.assert_called_once()
- # calling the universe_domain property the second time should use the
- # cached value instead of calling get_universe_domain
- assert self.credentials.universe_domain == "fake_universe_domain"
- get_universe_domain.assert_called_once()
- @mock.patch("google.auth.compute_engine._metadata.get_universe_domain")
- def test_user_provided_universe_domain(self, get_universe_domain):
- assert self.credentials_with_all_fields.universe_domain == FAKE_UNIVERSE_DOMAIN
- assert self.credentials_with_all_fields._universe_domain_cached
- # Since user provided universe_domain, we will not call the universe
- # domain endpoint.
- get_universe_domain.assert_not_called()
- class TestIDTokenCredentials(object):
- credentials = None
- @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
- def test_default_state(self, get):
- get.side_effect = [
- {"email": "service-account@example.com", "scope": ["one", "two"]}
- ]
- request = mock.create_autospec(transport.Request, instance=True)
- self.credentials = credentials.IDTokenCredentials(
- request=request, target_audience="https://example.com"
- )
- assert not self.credentials.valid
- # Expiration hasn't been set yet
- assert not self.credentials.expired
- # Service account email hasn't been populated
- assert self.credentials.service_account_email == "service-account@example.com"
- # Signer is initialized
- assert self.credentials.signer
- assert self.credentials.signer_email == "service-account@example.com"
- # No quota project
- assert not self.credentials._quota_project_id
- @mock.patch(
- "google.auth._helpers.utcnow",
- return_value=datetime.datetime.utcfromtimestamp(0),
- )
- @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
- @mock.patch("google.auth.iam.Signer.sign", autospec=True)
- def test_make_authorization_grant_assertion(self, sign, get, utcnow):
- get.side_effect = [
- {"email": "service-account@example.com", "scopes": ["one", "two"]}
- ]
- sign.side_effect = [b"signature"]
- request = mock.create_autospec(transport.Request, instance=True)
- self.credentials = credentials.IDTokenCredentials(
- request=request, target_audience="https://audience.com"
- )
- # Generate authorization grant:
- token = self.credentials._make_authorization_grant_assertion()
- payload = jwt.decode(token, verify=False)
- # The JWT token signature is 'signature' encoded in base 64:
- assert token.endswith(b".c2lnbmF0dXJl")
- # Check that the credentials have the token and proper expiration
- assert payload == {
- "aud": "https://www.googleapis.com/oauth2/v4/token",
- "exp": 3600,
- "iat": 0,
- "iss": "service-account@example.com",
- "target_audience": "https://audience.com",
- }
- @mock.patch(
- "google.auth._helpers.utcnow",
- return_value=datetime.datetime.utcfromtimestamp(0),
- )
- @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
- @mock.patch("google.auth.iam.Signer.sign", autospec=True)
- def test_with_service_account(self, sign, get, utcnow):
- sign.side_effect = [b"signature"]
- request = mock.create_autospec(transport.Request, instance=True)
- self.credentials = credentials.IDTokenCredentials(
- request=request,
- target_audience="https://audience.com",
- service_account_email="service-account@other.com",
- )
- # Generate authorization grant:
- token = self.credentials._make_authorization_grant_assertion()
- payload = jwt.decode(token, verify=False)
- # The JWT token signature is 'signature' encoded in base 64:
- assert token.endswith(b".c2lnbmF0dXJl")
- # Check that the credentials have the token and proper expiration
- assert payload == {
- "aud": "https://www.googleapis.com/oauth2/v4/token",
- "exp": 3600,
- "iat": 0,
- "iss": "service-account@other.com",
- "target_audience": "https://audience.com",
- }
- @mock.patch(
- "google.auth._helpers.utcnow",
- return_value=datetime.datetime.utcfromtimestamp(0),
- )
- @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
- @mock.patch("google.auth.iam.Signer.sign", autospec=True)
- def test_additional_claims(self, sign, get, utcnow):
- get.side_effect = [
- {"email": "service-account@example.com", "scopes": ["one", "two"]}
- ]
- sign.side_effect = [b"signature"]
- request = mock.create_autospec(transport.Request, instance=True)
- self.credentials = credentials.IDTokenCredentials(
- request=request,
- target_audience="https://audience.com",
- additional_claims={"foo": "bar"},
- )
- # Generate authorization grant:
- token = self.credentials._make_authorization_grant_assertion()
- payload = jwt.decode(token, verify=False)
- # The JWT token signature is 'signature' encoded in base 64:
- assert token.endswith(b".c2lnbmF0dXJl")
- # Check that the credentials have the token and proper expiration
- assert payload == {
- "aud": "https://www.googleapis.com/oauth2/v4/token",
- "exp": 3600,
- "iat": 0,
- "iss": "service-account@example.com",
- "target_audience": "https://audience.com",
- "foo": "bar",
- }
- def test_token_uri(self):
- request = mock.create_autospec(transport.Request, instance=True)
- self.credentials = credentials.IDTokenCredentials(
- request=request,
- signer=mock.Mock(),
- service_account_email="foo@example.com",
- target_audience="https://audience.com",
- )
- assert self.credentials._token_uri == credentials._DEFAULT_TOKEN_URI
- self.credentials = credentials.IDTokenCredentials(
- request=request,
- signer=mock.Mock(),
- service_account_email="foo@example.com",
- target_audience="https://audience.com",
- token_uri="https://example.com/token",
- )
- assert self.credentials._token_uri == "https://example.com/token"
- @mock.patch(
- "google.auth._helpers.utcnow",
- return_value=datetime.datetime.utcfromtimestamp(0),
- )
- @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
- @mock.patch("google.auth.iam.Signer.sign", autospec=True)
- def test_with_target_audience(self, sign, get, utcnow):
- get.side_effect = [
- {"email": "service-account@example.com", "scopes": ["one", "two"]}
- ]
- sign.side_effect = [b"signature"]
- request = mock.create_autospec(transport.Request, instance=True)
- self.credentials = credentials.IDTokenCredentials(
- request=request, target_audience="https://audience.com"
- )
- self.credentials = self.credentials.with_target_audience("https://actually.not")
- # Generate authorization grant:
- token = self.credentials._make_authorization_grant_assertion()
- payload = jwt.decode(token, verify=False)
- # The JWT token signature is 'signature' encoded in base 64:
- assert token.endswith(b".c2lnbmF0dXJl")
- # Check that the credentials have the token and proper expiration
- assert payload == {
- "aud": "https://www.googleapis.com/oauth2/v4/token",
- "exp": 3600,
- "iat": 0,
- "iss": "service-account@example.com",
- "target_audience": "https://actually.not",
- }
- # Check that the signer have been initialized with a Request object
- assert isinstance(self.credentials._signer._request, transport.Request)
- @responses.activate
- def test_with_target_audience_integration(self):
- """ Test that it is possible to refresh credentials
- generated from `with_target_audience`.
- Instead of mocking the methods, the HTTP responses
- have been mocked.
- """
- # mock information about credentials
- responses.add(
- responses.GET,
- "http://metadata.google.internal/computeMetadata/v1/instance/"
- "service-accounts/default/?recursive=true",
- status=200,
- content_type="application/json",
- json={
- "scopes": "email",
- "email": "service-account@example.com",
- "aliases": ["default"],
- },
- )
- # mock information about universe_domain
- responses.add(
- responses.GET,
- "http://metadata.google.internal/computeMetadata/v1/universe/"
- "universe-domain",
- status=200,
- content_type="application/json",
- json={},
- )
- # mock token for credentials
- responses.add(
- responses.GET,
- "http://metadata.google.internal/computeMetadata/v1/instance/"
- "service-accounts/service-account@example.com/token",
- status=200,
- content_type="application/json",
- json={
- "access_token": "some-token",
- "expires_in": 3210,
- "token_type": "Bearer",
- },
- )
- # mock sign blob endpoint
- signature = base64.b64encode(b"some-signature").decode("utf-8")
- responses.add(
- responses.POST,
- "https://iamcredentials.googleapis.com/v1/projects/-/"
- "serviceAccounts/service-account@example.com:signBlob",
- status=200,
- content_type="application/json",
- json={"keyId": "some-key-id", "signedBlob": signature},
- )
- id_token = "{}.{}.{}".format(
- base64.b64encode(b'{"some":"some"}').decode("utf-8"),
- base64.b64encode(b'{"exp": 3210}').decode("utf-8"),
- base64.b64encode(b"token").decode("utf-8"),
- )
- # mock id token endpoint
- responses.add(
- responses.POST,
- "https://www.googleapis.com/oauth2/v4/token",
- status=200,
- content_type="application/json",
- json={"id_token": id_token, "expiry": 3210},
- )
- self.credentials = credentials.IDTokenCredentials(
- request=requests.Request(),
- service_account_email="service-account@example.com",
- target_audience="https://audience.com",
- )
- self.credentials = self.credentials.with_target_audience("https://actually.not")
- self.credentials.refresh(requests.Request())
- assert self.credentials.token is not None
- @mock.patch(
- "google.auth._helpers.utcnow",
- return_value=datetime.datetime.utcfromtimestamp(0),
- )
- @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
- @mock.patch("google.auth.iam.Signer.sign", autospec=True)
- def test_with_quota_project(self, sign, get, utcnow):
- get.side_effect = [
- {"email": "service-account@example.com", "scopes": ["one", "two"]}
- ]
- sign.side_effect = [b"signature"]
- request = mock.create_autospec(transport.Request, instance=True)
- self.credentials = credentials.IDTokenCredentials(
- request=request, target_audience="https://audience.com"
- )
- self.credentials = self.credentials.with_quota_project("project-foo")
- assert self.credentials._quota_project_id == "project-foo"
- # Generate authorization grant:
- token = self.credentials._make_authorization_grant_assertion()
- payload = jwt.decode(token, verify=False)
- # The JWT token signature is 'signature' encoded in base 64:
- assert token.endswith(b".c2lnbmF0dXJl")
- # Check that the credentials have the token and proper expiration
- assert payload == {
- "aud": "https://www.googleapis.com/oauth2/v4/token",
- "exp": 3600,
- "iat": 0,
- "iss": "service-account@example.com",
- "target_audience": "https://audience.com",
- }
- # Check that the signer have been initialized with a Request object
- assert isinstance(self.credentials._signer._request, transport.Request)
- @mock.patch(
- "google.auth._helpers.utcnow",
- return_value=datetime.datetime.utcfromtimestamp(0),
- )
- @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
- @mock.patch("google.auth.iam.Signer.sign", autospec=True)
- def test_with_token_uri(self, sign, get, utcnow):
- get.side_effect = [
- {"email": "service-account@example.com", "scopes": ["one", "two"]}
- ]
- sign.side_effect = [b"signature"]
- request = mock.create_autospec(transport.Request, instance=True)
- self.credentials = credentials.IDTokenCredentials(
- request=request,
- target_audience="https://audience.com",
- token_uri="http://xyz.com",
- )
- assert self.credentials._token_uri == "http://xyz.com"
- creds_with_token_uri = self.credentials.with_token_uri("http://example.com")
- assert creds_with_token_uri._token_uri == "http://example.com"
- @mock.patch(
- "google.auth._helpers.utcnow",
- return_value=datetime.datetime.utcfromtimestamp(0),
- )
- @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
- @mock.patch("google.auth.iam.Signer.sign", autospec=True)
- def test_with_token_uri_exception(self, sign, get, utcnow):
- get.side_effect = [
- {"email": "service-account@example.com", "scopes": ["one", "two"]}
- ]
- sign.side_effect = [b"signature"]
- request = mock.create_autospec(transport.Request, instance=True)
- self.credentials = credentials.IDTokenCredentials(
- request=request,
- target_audience="https://audience.com",
- use_metadata_identity_endpoint=True,
- )
- assert self.credentials._token_uri is None
- with pytest.raises(ValueError):
- self.credentials.with_token_uri("http://example.com")
- @responses.activate
- def test_with_quota_project_integration(self):
- """ Test that it is possible to refresh credentials
- generated from `with_quota_project`.
- Instead of mocking the methods, the HTTP responses
- have been mocked.
- """
- # mock information about credentials
- responses.add(
- responses.GET,
- "http://metadata.google.internal/computeMetadata/v1/instance/"
- "service-accounts/default/?recursive=true",
- status=200,
- content_type="application/json",
- json={
- "scopes": "email",
- "email": "service-account@example.com",
- "aliases": ["default"],
- },
- )
- # mock token for credentials
- responses.add(
- responses.GET,
- "http://metadata.google.internal/computeMetadata/v1/instance/"
- "service-accounts/service-account@example.com/token",
- status=200,
- content_type="application/json",
- json={
- "access_token": "some-token",
- "expires_in": 3210,
- "token_type": "Bearer",
- },
- )
- # stubby response about universe_domain
- responses.add(
- responses.GET,
- "http://metadata.google.internal/computeMetadata/v1/universe/"
- "universe-domain",
- status=200,
- content_type="application/json",
- json={},
- )
- # mock sign blob endpoint
- signature = base64.b64encode(b"some-signature").decode("utf-8")
- responses.add(
- responses.POST,
- "https://iamcredentials.googleapis.com/v1/projects/-/"
- "serviceAccounts/service-account@example.com:signBlob",
- status=200,
- content_type="application/json",
- json={"keyId": "some-key-id", "signedBlob": signature},
- )
- id_token = "{}.{}.{}".format(
- base64.b64encode(b'{"some":"some"}').decode("utf-8"),
- base64.b64encode(b'{"exp": 3210}').decode("utf-8"),
- base64.b64encode(b"token").decode("utf-8"),
- )
- # mock id token endpoint
- responses.add(
- responses.POST,
- "https://www.googleapis.com/oauth2/v4/token",
- status=200,
- content_type="application/json",
- json={"id_token": id_token, "expiry": 3210},
- )
- self.credentials = credentials.IDTokenCredentials(
- request=requests.Request(),
- service_account_email="service-account@example.com",
- target_audience="https://audience.com",
- )
- self.credentials = self.credentials.with_quota_project("project-foo")
- self.credentials.refresh(requests.Request())
- assert self.credentials.token is not None
- assert self.credentials._quota_project_id == "project-foo"
- @mock.patch(
- "google.auth._helpers.utcnow",
- return_value=datetime.datetime.utcfromtimestamp(0),
- )
- @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
- @mock.patch("google.auth.iam.Signer.sign", autospec=True)
- @mock.patch("google.oauth2._client.id_token_jwt_grant", autospec=True)
- def test_refresh_success(self, id_token_jwt_grant, sign, get, utcnow):
- get.side_effect = [
- {"email": "service-account@example.com", "scopes": ["one", "two"]}
- ]
- sign.side_effect = [b"signature"]
- id_token_jwt_grant.side_effect = [
- ("idtoken", datetime.datetime.utcfromtimestamp(3600), {})
- ]
- request = mock.create_autospec(transport.Request, instance=True)
- self.credentials = credentials.IDTokenCredentials(
- request=request, target_audience="https://audience.com"
- )
- # Refresh credentials
- self.credentials.refresh(None)
- # Check that the credentials have the token and proper expiration
- assert self.credentials.token == "idtoken"
- assert self.credentials.expiry == (datetime.datetime.utcfromtimestamp(3600))
- # Check the credential info
- assert self.credentials.service_account_email == "service-account@example.com"
- # Check that the credentials are valid (have a token and are not
- # expired)
- assert self.credentials.valid
- @mock.patch(
- "google.auth._helpers.utcnow",
- return_value=datetime.datetime.utcfromtimestamp(0),
- )
- @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
- @mock.patch("google.auth.iam.Signer.sign", autospec=True)
- def test_refresh_error(self, sign, get, utcnow):
- get.side_effect = [
- {"email": "service-account@example.com", "scopes": ["one", "two"]}
- ]
- sign.side_effect = [b"signature"]
- request = mock.create_autospec(transport.Request, instance=True)
- response = mock.Mock()
- response.data = b'{"error": "http error"}'
- response.status = 404 # Throw a 404 so the request is not retried.
- request.side_effect = [response]
- self.credentials = credentials.IDTokenCredentials(
- request=request, target_audience="https://audience.com"
- )
- with pytest.raises(exceptions.RefreshError) as excinfo:
- self.credentials.refresh(request)
- assert excinfo.match(r"http error")
- @mock.patch(
- "google.auth._helpers.utcnow",
- return_value=datetime.datetime.utcfromtimestamp(0),
- )
- @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
- @mock.patch("google.auth.iam.Signer.sign", autospec=True)
- @mock.patch("google.oauth2._client.id_token_jwt_grant", autospec=True)
- def test_before_request_refreshes(self, id_token_jwt_grant, sign, get, utcnow):
- get.side_effect = [
- {"email": "service-account@example.com", "scopes": "one two"}
- ]
- sign.side_effect = [b"signature"]
- id_token_jwt_grant.side_effect = [
- ("idtoken", datetime.datetime.utcfromtimestamp(3600), {})
- ]
- request = mock.create_autospec(transport.Request, instance=True)
- self.credentials = credentials.IDTokenCredentials(
- request=request, target_audience="https://audience.com"
- )
- # Credentials should start as invalid
- assert not self.credentials.valid
- # before_request should cause a refresh
- request = mock.create_autospec(transport.Request, instance=True)
- self.credentials.before_request(request, "GET", "http://example.com?a=1#3", {})
- # The refresh endpoint should've been called.
- assert get.called
- # Credentials should now be valid.
- assert self.credentials.valid
- @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
- @mock.patch("google.auth.iam.Signer.sign", autospec=True)
- def test_sign_bytes(self, sign, get):
- get.side_effect = [
- {"email": "service-account@example.com", "scopes": ["one", "two"]}
- ]
- sign.side_effect = [b"signature"]
- request = mock.create_autospec(transport.Request, instance=True)
- response = mock.Mock()
- response.data = b'{"signature": "c2lnbmF0dXJl"}'
- response.status = 200
- request.side_effect = [response]
- self.credentials = credentials.IDTokenCredentials(
- request=request, target_audience="https://audience.com"
- )
- # Generate authorization grant:
- signature = self.credentials.sign_bytes(b"some bytes")
- # The JWT token signature is 'signature' encoded in base 64:
- assert signature == b"signature"
- @mock.patch(
- "google.auth.metrics.token_request_id_token_mds",
- return_value=ID_TOKEN_REQUEST_METRICS_HEADER_VALUE,
- )
- @mock.patch(
- "google.auth.compute_engine._metadata.get_service_account_info", autospec=True
- )
- @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
- def test_get_id_token_from_metadata(
- self, get, get_service_account_info, mock_metrics_header_value
- ):
- get.return_value = SAMPLE_ID_TOKEN
- get_service_account_info.return_value = {"email": "foo@example.com"}
- cred = credentials.IDTokenCredentials(
- mock.Mock(), "audience", use_metadata_identity_endpoint=True
- )
- cred.refresh(request=mock.Mock())
- assert get.call_args.kwargs["headers"] == {
- "x-goog-api-client": ID_TOKEN_REQUEST_METRICS_HEADER_VALUE
- }
- assert cred.token == SAMPLE_ID_TOKEN
- assert cred.expiry == datetime.datetime.utcfromtimestamp(SAMPLE_ID_TOKEN_EXP)
- assert cred._use_metadata_identity_endpoint
- assert cred._signer is None
- assert cred._token_uri is None
- assert cred._service_account_email == "foo@example.com"
- assert cred._target_audience == "audience"
- with pytest.raises(ValueError):
- cred.sign_bytes(b"bytes")
- @mock.patch(
- "google.auth.compute_engine._metadata.get_service_account_info", autospec=True
- )
- def test_with_target_audience_for_metadata(self, get_service_account_info):
- get_service_account_info.return_value = {"email": "foo@example.com"}
- cred = credentials.IDTokenCredentials(
- mock.Mock(), "audience", use_metadata_identity_endpoint=True
- )
- cred = cred.with_target_audience("new_audience")
- assert cred._target_audience == "new_audience"
- assert cred._use_metadata_identity_endpoint
- assert cred._signer is None
- assert cred._token_uri is None
- assert cred._service_account_email == "foo@example.com"
- @mock.patch(
- "google.auth.compute_engine._metadata.get_service_account_info", autospec=True
- )
- def test_id_token_with_quota_project(self, get_service_account_info):
- get_service_account_info.return_value = {"email": "foo@example.com"}
- cred = credentials.IDTokenCredentials(
- mock.Mock(), "audience", use_metadata_identity_endpoint=True
- )
- cred = cred.with_quota_project("project-foo")
- assert cred._quota_project_id == "project-foo"
- assert cred._use_metadata_identity_endpoint
- assert cred._signer is None
- assert cred._token_uri is None
- assert cred._service_account_email == "foo@example.com"
- @mock.patch(
- "google.auth.compute_engine._metadata.get_service_account_info", autospec=True
- )
- @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
- def test_invalid_id_token_from_metadata(self, get, get_service_account_info):
- get.return_value = "invalid_id_token"
- get_service_account_info.return_value = {"email": "foo@example.com"}
- cred = credentials.IDTokenCredentials(
- mock.Mock(), "audience", use_metadata_identity_endpoint=True
- )
- with pytest.raises(ValueError):
- cred.refresh(request=mock.Mock())
- @mock.patch(
- "google.auth.compute_engine._metadata.get_service_account_info", autospec=True
- )
- @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
- def test_transport_error_from_metadata(self, get, get_service_account_info):
- get.side_effect = exceptions.TransportError("transport error")
- get_service_account_info.return_value = {"email": "foo@example.com"}
- cred = credentials.IDTokenCredentials(
- mock.Mock(), "audience", use_metadata_identity_endpoint=True
- )
- with pytest.raises(exceptions.RefreshError) as excinfo:
- cred.refresh(request=mock.Mock())
- assert excinfo.match(r"transport error")
- def test_get_id_token_from_metadata_constructor(self):
- with pytest.raises(ValueError):
- credentials.IDTokenCredentials(
- mock.Mock(),
- "audience",
- use_metadata_identity_endpoint=True,
- token_uri="token_uri",
- )
- with pytest.raises(ValueError):
- credentials.IDTokenCredentials(
- mock.Mock(),
- "audience",
- use_metadata_identity_endpoint=True,
- signer=mock.Mock(),
- )
- with pytest.raises(ValueError):
- credentials.IDTokenCredentials(
- mock.Mock(),
- "audience",
- use_metadata_identity_endpoint=True,
- additional_claims={"key", "value"},
- )
- with pytest.raises(ValueError):
- credentials.IDTokenCredentials(
- mock.Mock(),
- "audience",
- use_metadata_identity_endpoint=True,
- service_account_email="foo@example.com",
- )
|