1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069 |
- # Copyright 2016 Google LLC
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import datetime
- import json
- import os
- import pickle
- import sys
- import mock
- import pytest # type: ignore
- from google.auth import _helpers
- from google.auth import exceptions
- from google.auth import transport
- from google.auth.credentials import TokenState
- from google.oauth2 import credentials
- import yatest.common as yc
- DATA_DIR = os.path.join(os.path.dirname(yc.source_path(__file__)), "..", "data")
- AUTH_USER_JSON_FILE = os.path.join(DATA_DIR, "authorized_user.json")
- with open(AUTH_USER_JSON_FILE, "r") as fh:
- AUTH_USER_INFO = json.load(fh)
- class TestCredentials(object):
- TOKEN_URI = "https://example.com/oauth2/token"
- REFRESH_TOKEN = "refresh_token"
- RAPT_TOKEN = "rapt_token"
- CLIENT_ID = "client_id"
- CLIENT_SECRET = "client_secret"
- @classmethod
- def make_credentials(cls):
- return credentials.Credentials(
- token=None,
- refresh_token=cls.REFRESH_TOKEN,
- token_uri=cls.TOKEN_URI,
- client_id=cls.CLIENT_ID,
- client_secret=cls.CLIENT_SECRET,
- rapt_token=cls.RAPT_TOKEN,
- enable_reauth_refresh=True,
- )
- def test_default_state(self):
- credentials = self.make_credentials()
- assert not credentials.valid
- # Expiration hasn't been set yet
- assert not credentials.expired
- # Scopes aren't required for these credentials
- assert not credentials.requires_scopes
- assert credentials.token_state == TokenState.INVALID
- # Test properties
- assert credentials.refresh_token == self.REFRESH_TOKEN
- assert credentials.token_uri == self.TOKEN_URI
- assert credentials.client_id == self.CLIENT_ID
- assert credentials.client_secret == self.CLIENT_SECRET
- assert credentials.rapt_token == self.RAPT_TOKEN
- assert credentials.refresh_handler is None
- def test_get_cred_info(self):
- credentials = self.make_credentials()
- credentials._account = "fake-account"
- assert not credentials.get_cred_info()
- credentials._cred_file_path = "/path/to/file"
- assert credentials.get_cred_info() == {
- "credential_source": "/path/to/file",
- "credential_type": "user credentials",
- "principal": "fake-account",
- }
- def test_get_cred_info_no_account(self):
- credentials = self.make_credentials()
- assert not credentials.get_cred_info()
- credentials._cred_file_path = "/path/to/file"
- assert credentials.get_cred_info() == {
- "credential_source": "/path/to/file",
- "credential_type": "user credentials",
- }
- def test__make_copy_get_cred_info(self):
- credentials = self.make_credentials()
- credentials._cred_file_path = "/path/to/file"
- cred_copy = credentials._make_copy()
- assert cred_copy._cred_file_path == "/path/to/file"
- def test_token_usage_metrics(self):
- credentials = self.make_credentials()
- credentials.token = "token"
- credentials.expiry = None
- headers = {}
- credentials.before_request(mock.Mock(), None, None, headers)
- assert headers["authorization"] == "Bearer token"
- assert headers["x-goog-api-client"] == "cred-type/u"
- def test_refresh_handler_setter_and_getter(self):
- scopes = ["email", "profile"]
- original_refresh_handler = mock.Mock(return_value=("ACCESS_TOKEN_1", None))
- updated_refresh_handler = mock.Mock(return_value=("ACCESS_TOKEN_2", None))
- creds = credentials.Credentials(
- token=None,
- refresh_token=None,
- token_uri=None,
- client_id=None,
- client_secret=None,
- rapt_token=None,
- scopes=scopes,
- default_scopes=None,
- refresh_handler=original_refresh_handler,
- )
- assert creds.refresh_handler is original_refresh_handler
- creds.refresh_handler = updated_refresh_handler
- assert creds.refresh_handler is updated_refresh_handler
- creds.refresh_handler = None
- assert creds.refresh_handler is None
- def test_invalid_refresh_handler(self):
- scopes = ["email", "profile"]
- with pytest.raises(TypeError) as excinfo:
- credentials.Credentials(
- token=None,
- refresh_token=None,
- token_uri=None,
- client_id=None,
- client_secret=None,
- rapt_token=None,
- scopes=scopes,
- default_scopes=None,
- refresh_handler=object(),
- )
- assert excinfo.match("The provided refresh_handler is not a callable or None.")
- def test_refresh_with_non_default_universe_domain(self):
- creds = credentials.Credentials(
- token="token", universe_domain="dummy_universe.com"
- )
- with pytest.raises(exceptions.RefreshError) as excinfo:
- creds.refresh(mock.Mock())
- assert excinfo.match(
- "refresh is only supported in the default googleapis.com universe domain"
- )
- @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True)
- @mock.patch(
- "google.auth._helpers.utcnow",
- return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD,
- )
- def test_refresh_success(self, unused_utcnow, refresh_grant):
- token = "token"
- new_rapt_token = "new_rapt_token"
- expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
- grant_response = {"id_token": mock.sentinel.id_token}
- refresh_grant.return_value = (
- # Access token
- token,
- # New refresh token
- None,
- # Expiry,
- expiry,
- # Extra data
- grant_response,
- # rapt_token
- new_rapt_token,
- )
- request = mock.create_autospec(transport.Request)
- credentials = self.make_credentials()
- # Refresh credentials
- credentials.refresh(request)
- # Check jwt grant call.
- refresh_grant.assert_called_with(
- request,
- self.TOKEN_URI,
- self.REFRESH_TOKEN,
- self.CLIENT_ID,
- self.CLIENT_SECRET,
- None,
- self.RAPT_TOKEN,
- True,
- )
- # Check that the credentials have the token and expiry
- assert credentials.token == token
- assert credentials.expiry == expiry
- assert credentials.id_token == mock.sentinel.id_token
- assert credentials.rapt_token == new_rapt_token
- # Check that the credentials are valid (have a token and are not
- # expired)
- assert credentials.valid
- def test_refresh_no_refresh_token(self):
- request = mock.create_autospec(transport.Request)
- credentials_ = credentials.Credentials(token=None, refresh_token=None)
- with pytest.raises(exceptions.RefreshError, match="necessary fields"):
- credentials_.refresh(request)
- request.assert_not_called()
- @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True)
- @mock.patch(
- "google.auth._helpers.utcnow",
- return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD,
- )
- def test_refresh_with_refresh_token_and_refresh_handler(
- self, unused_utcnow, refresh_grant
- ):
- token = "token"
- new_rapt_token = "new_rapt_token"
- expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
- grant_response = {"id_token": mock.sentinel.id_token}
- refresh_grant.return_value = (
- # Access token
- token,
- # New refresh token
- None,
- # Expiry,
- expiry,
- # Extra data
- grant_response,
- # rapt_token
- new_rapt_token,
- )
- refresh_handler = mock.Mock()
- request = mock.create_autospec(transport.Request)
- creds = credentials.Credentials(
- token=None,
- refresh_token=self.REFRESH_TOKEN,
- token_uri=self.TOKEN_URI,
- client_id=self.CLIENT_ID,
- client_secret=self.CLIENT_SECRET,
- rapt_token=self.RAPT_TOKEN,
- refresh_handler=refresh_handler,
- )
- # Refresh credentials
- creds.refresh(request)
- # Check jwt grant call.
- refresh_grant.assert_called_with(
- request,
- self.TOKEN_URI,
- self.REFRESH_TOKEN,
- self.CLIENT_ID,
- self.CLIENT_SECRET,
- None,
- self.RAPT_TOKEN,
- False,
- )
- # Check that the credentials have the token and expiry
- assert creds.token == token
- assert creds.expiry == expiry
- assert creds.id_token == mock.sentinel.id_token
- assert creds.rapt_token == new_rapt_token
- # Check that the credentials are valid (have a token and are not
- # expired)
- assert creds.valid
- # Assert refresh handler not called as the refresh token has
- # higher priority.
- refresh_handler.assert_not_called()
- @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
- def test_refresh_with_refresh_handler_success_scopes(self, unused_utcnow):
- expected_expiry = datetime.datetime.min + datetime.timedelta(seconds=2800)
- refresh_handler = mock.Mock(return_value=("ACCESS_TOKEN", expected_expiry))
- scopes = ["email", "profile"]
- default_scopes = ["https://www.googleapis.com/auth/cloud-platform"]
- request = mock.create_autospec(transport.Request)
- creds = credentials.Credentials(
- token=None,
- refresh_token=None,
- token_uri=None,
- client_id=None,
- client_secret=None,
- rapt_token=None,
- scopes=scopes,
- default_scopes=default_scopes,
- refresh_handler=refresh_handler,
- )
- creds.refresh(request)
- assert creds.token == "ACCESS_TOKEN"
- assert creds.expiry == expected_expiry
- assert creds.valid
- assert not creds.expired
- # Confirm refresh handler called with the expected arguments.
- refresh_handler.assert_called_with(request, scopes=scopes)
- @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
- def test_refresh_with_refresh_handler_success_default_scopes(self, unused_utcnow):
- expected_expiry = datetime.datetime.min + datetime.timedelta(seconds=2800)
- original_refresh_handler = mock.Mock(
- return_value=("UNUSED_TOKEN", expected_expiry)
- )
- refresh_handler = mock.Mock(return_value=("ACCESS_TOKEN", expected_expiry))
- default_scopes = ["https://www.googleapis.com/auth/cloud-platform"]
- request = mock.create_autospec(transport.Request)
- creds = credentials.Credentials(
- token=None,
- refresh_token=None,
- token_uri=None,
- client_id=None,
- client_secret=None,
- rapt_token=None,
- scopes=None,
- default_scopes=default_scopes,
- refresh_handler=original_refresh_handler,
- )
- # Test newly set refresh_handler is used instead of the original one.
- creds.refresh_handler = refresh_handler
- creds.refresh(request)
- assert creds.token == "ACCESS_TOKEN"
- assert creds.expiry == expected_expiry
- assert creds.valid
- assert not creds.expired
- # default_scopes should be used since no developer provided scopes
- # are provided.
- refresh_handler.assert_called_with(request, scopes=default_scopes)
- @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
- def test_refresh_with_refresh_handler_invalid_token(self, unused_utcnow):
- expected_expiry = datetime.datetime.min + datetime.timedelta(seconds=2800)
- # Simulate refresh handler does not return a valid token.
- refresh_handler = mock.Mock(return_value=(None, expected_expiry))
- scopes = ["email", "profile"]
- default_scopes = ["https://www.googleapis.com/auth/cloud-platform"]
- request = mock.create_autospec(transport.Request)
- creds = credentials.Credentials(
- token=None,
- refresh_token=None,
- token_uri=None,
- client_id=None,
- client_secret=None,
- rapt_token=None,
- scopes=scopes,
- default_scopes=default_scopes,
- refresh_handler=refresh_handler,
- )
- with pytest.raises(
- exceptions.RefreshError, match="returned token is not a string"
- ):
- creds.refresh(request)
- assert creds.token is None
- assert creds.expiry is None
- assert not creds.valid
- # Confirm refresh handler called with the expected arguments.
- refresh_handler.assert_called_with(request, scopes=scopes)
- def test_refresh_with_refresh_handler_invalid_expiry(self):
- # Simulate refresh handler returns expiration time in an invalid unit.
- refresh_handler = mock.Mock(return_value=("TOKEN", 2800))
- scopes = ["email", "profile"]
- default_scopes = ["https://www.googleapis.com/auth/cloud-platform"]
- request = mock.create_autospec(transport.Request)
- creds = credentials.Credentials(
- token=None,
- refresh_token=None,
- token_uri=None,
- client_id=None,
- client_secret=None,
- rapt_token=None,
- scopes=scopes,
- default_scopes=default_scopes,
- refresh_handler=refresh_handler,
- )
- with pytest.raises(
- exceptions.RefreshError, match="returned expiry is not a datetime object"
- ):
- creds.refresh(request)
- assert creds.token is None
- assert creds.expiry is None
- assert not creds.valid
- # Confirm refresh handler called with the expected arguments.
- refresh_handler.assert_called_with(request, scopes=scopes)
- @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
- def test_refresh_with_refresh_handler_expired_token(self, unused_utcnow):
- expected_expiry = datetime.datetime.min + _helpers.REFRESH_THRESHOLD
- # Simulate refresh handler returns an expired token.
- refresh_handler = mock.Mock(return_value=("TOKEN", expected_expiry))
- scopes = ["email", "profile"]
- default_scopes = ["https://www.googleapis.com/auth/cloud-platform"]
- request = mock.create_autospec(transport.Request)
- creds = credentials.Credentials(
- token=None,
- refresh_token=None,
- token_uri=None,
- client_id=None,
- client_secret=None,
- rapt_token=None,
- scopes=scopes,
- default_scopes=default_scopes,
- refresh_handler=refresh_handler,
- )
- with pytest.raises(exceptions.RefreshError, match="already expired"):
- creds.refresh(request)
- assert creds.token is None
- assert creds.expiry is None
- assert not creds.valid
- # Confirm refresh handler called with the expected arguments.
- refresh_handler.assert_called_with(request, scopes=scopes)
- @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True)
- @mock.patch(
- "google.auth._helpers.utcnow",
- return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD,
- )
- def test_credentials_with_scopes_requested_refresh_success(
- self, unused_utcnow, refresh_grant
- ):
- scopes = ["email", "profile"]
- default_scopes = ["https://www.googleapis.com/auth/cloud-platform"]
- token = "token"
- new_rapt_token = "new_rapt_token"
- expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
- grant_response = {"id_token": mock.sentinel.id_token, "scope": "email profile"}
- refresh_grant.return_value = (
- # Access token
- token,
- # New refresh token
- None,
- # Expiry,
- expiry,
- # Extra data
- grant_response,
- # rapt token
- new_rapt_token,
- )
- request = mock.create_autospec(transport.Request)
- creds = credentials.Credentials(
- token=None,
- refresh_token=self.REFRESH_TOKEN,
- token_uri=self.TOKEN_URI,
- client_id=self.CLIENT_ID,
- client_secret=self.CLIENT_SECRET,
- scopes=scopes,
- default_scopes=default_scopes,
- rapt_token=self.RAPT_TOKEN,
- enable_reauth_refresh=True,
- )
- # Refresh credentials
- creds.refresh(request)
- # Check jwt grant call.
- refresh_grant.assert_called_with(
- request,
- self.TOKEN_URI,
- self.REFRESH_TOKEN,
- self.CLIENT_ID,
- self.CLIENT_SECRET,
- scopes,
- self.RAPT_TOKEN,
- True,
- )
- # Check that the credentials have the token and expiry
- assert creds.token == token
- assert creds.expiry == expiry
- assert creds.id_token == mock.sentinel.id_token
- assert creds.has_scopes(scopes)
- assert creds.rapt_token == new_rapt_token
- assert creds.granted_scopes == scopes
- # Check that the credentials are valid (have a token and are not
- # expired.)
- assert creds.valid
- @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True)
- @mock.patch(
- "google.auth._helpers.utcnow",
- return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD,
- )
- def test_credentials_with_only_default_scopes_requested(
- self, unused_utcnow, refresh_grant
- ):
- default_scopes = ["email", "profile"]
- token = "token"
- new_rapt_token = "new_rapt_token"
- expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
- grant_response = {"id_token": mock.sentinel.id_token, "scope": "email profile"}
- refresh_grant.return_value = (
- # Access token
- token,
- # New refresh token
- None,
- # Expiry,
- expiry,
- # Extra data
- grant_response,
- # rapt token
- new_rapt_token,
- )
- request = mock.create_autospec(transport.Request)
- creds = credentials.Credentials(
- token=None,
- refresh_token=self.REFRESH_TOKEN,
- token_uri=self.TOKEN_URI,
- client_id=self.CLIENT_ID,
- client_secret=self.CLIENT_SECRET,
- default_scopes=default_scopes,
- rapt_token=self.RAPT_TOKEN,
- enable_reauth_refresh=True,
- )
- # Refresh credentials
- creds.refresh(request)
- # Check jwt grant call.
- refresh_grant.assert_called_with(
- request,
- self.TOKEN_URI,
- self.REFRESH_TOKEN,
- self.CLIENT_ID,
- self.CLIENT_SECRET,
- default_scopes,
- self.RAPT_TOKEN,
- True,
- )
- # Check that the credentials have the token and expiry
- assert creds.token == token
- assert creds.expiry == expiry
- assert creds.id_token == mock.sentinel.id_token
- assert creds.has_scopes(default_scopes)
- assert creds.rapt_token == new_rapt_token
- assert creds.granted_scopes == default_scopes
- # Check that the credentials are valid (have a token and are not
- # expired.)
- assert creds.valid
- @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True)
- @mock.patch(
- "google.auth._helpers.utcnow",
- return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD,
- )
- def test_credentials_with_scopes_returned_refresh_success(
- self, unused_utcnow, refresh_grant
- ):
- scopes = ["email", "profile"]
- token = "token"
- new_rapt_token = "new_rapt_token"
- expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
- grant_response = {"id_token": mock.sentinel.id_token, "scope": " ".join(scopes)}
- refresh_grant.return_value = (
- # Access token
- token,
- # New refresh token
- None,
- # Expiry,
- expiry,
- # Extra data
- grant_response,
- # rapt token
- new_rapt_token,
- )
- request = mock.create_autospec(transport.Request)
- creds = credentials.Credentials(
- token=None,
- refresh_token=self.REFRESH_TOKEN,
- token_uri=self.TOKEN_URI,
- client_id=self.CLIENT_ID,
- client_secret=self.CLIENT_SECRET,
- scopes=scopes,
- rapt_token=self.RAPT_TOKEN,
- enable_reauth_refresh=True,
- )
- # Refresh credentials
- creds.refresh(request)
- # Check jwt grant call.
- refresh_grant.assert_called_with(
- request,
- self.TOKEN_URI,
- self.REFRESH_TOKEN,
- self.CLIENT_ID,
- self.CLIENT_SECRET,
- scopes,
- self.RAPT_TOKEN,
- True,
- )
- # Check that the credentials have the token and expiry
- assert creds.token == token
- assert creds.expiry == expiry
- assert creds.id_token == mock.sentinel.id_token
- assert creds.has_scopes(scopes)
- assert creds.rapt_token == new_rapt_token
- assert creds.granted_scopes == scopes
- # Check that the credentials are valid (have a token and are not
- # expired.)
- assert creds.valid
- @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True)
- @mock.patch(
- "google.auth._helpers.utcnow",
- return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD,
- )
- def test_credentials_with_only_default_scopes_requested_different_granted_scopes(
- self, unused_utcnow, refresh_grant
- ):
- default_scopes = ["email", "profile"]
- token = "token"
- new_rapt_token = "new_rapt_token"
- expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
- grant_response = {"id_token": mock.sentinel.id_token, "scope": "email"}
- refresh_grant.return_value = (
- # Access token
- token,
- # New refresh token
- None,
- # Expiry,
- expiry,
- # Extra data
- grant_response,
- # rapt token
- new_rapt_token,
- )
- request = mock.create_autospec(transport.Request)
- creds = credentials.Credentials(
- token=None,
- refresh_token=self.REFRESH_TOKEN,
- token_uri=self.TOKEN_URI,
- client_id=self.CLIENT_ID,
- client_secret=self.CLIENT_SECRET,
- default_scopes=default_scopes,
- rapt_token=self.RAPT_TOKEN,
- enable_reauth_refresh=True,
- )
- # Refresh credentials
- creds.refresh(request)
- # Check jwt grant call.
- refresh_grant.assert_called_with(
- request,
- self.TOKEN_URI,
- self.REFRESH_TOKEN,
- self.CLIENT_ID,
- self.CLIENT_SECRET,
- default_scopes,
- self.RAPT_TOKEN,
- True,
- )
- # Check that the credentials have the token and expiry
- assert creds.token == token
- assert creds.expiry == expiry
- assert creds.id_token == mock.sentinel.id_token
- assert creds.has_scopes(default_scopes)
- assert creds.rapt_token == new_rapt_token
- assert creds.granted_scopes == ["email"]
- # Check that the credentials are valid (have a token and are not
- # expired.)
- assert creds.valid
- @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True)
- @mock.patch(
- "google.auth._helpers.utcnow",
- return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD,
- )
- def test_credentials_with_scopes_refresh_different_granted_scopes(
- self, unused_utcnow, refresh_grant
- ):
- scopes = ["email", "profile"]
- scopes_returned = ["email"]
- token = "token"
- new_rapt_token = "new_rapt_token"
- expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
- grant_response = {
- "id_token": mock.sentinel.id_token,
- "scope": " ".join(scopes_returned),
- }
- refresh_grant.return_value = (
- # Access token
- token,
- # New refresh token
- None,
- # Expiry,
- expiry,
- # Extra data
- grant_response,
- # rapt token
- new_rapt_token,
- )
- request = mock.create_autospec(transport.Request)
- creds = credentials.Credentials(
- token=None,
- refresh_token=self.REFRESH_TOKEN,
- token_uri=self.TOKEN_URI,
- client_id=self.CLIENT_ID,
- client_secret=self.CLIENT_SECRET,
- scopes=scopes,
- rapt_token=self.RAPT_TOKEN,
- enable_reauth_refresh=True,
- )
- # Refresh credentials
- creds.refresh(request)
- # Check jwt grant call.
- refresh_grant.assert_called_with(
- request,
- self.TOKEN_URI,
- self.REFRESH_TOKEN,
- self.CLIENT_ID,
- self.CLIENT_SECRET,
- scopes,
- self.RAPT_TOKEN,
- True,
- )
- # Check that the credentials have the token and expiry
- assert creds.token == token
- assert creds.expiry == expiry
- assert creds.id_token == mock.sentinel.id_token
- assert creds.has_scopes(scopes)
- assert creds.rapt_token == new_rapt_token
- assert creds.granted_scopes == scopes_returned
- # Check that the credentials are valid (have a token and are not
- # expired.)
- assert creds.valid
- def test_apply_with_quota_project_id(self):
- creds = credentials.Credentials(
- token="token",
- refresh_token=self.REFRESH_TOKEN,
- token_uri=self.TOKEN_URI,
- client_id=self.CLIENT_ID,
- client_secret=self.CLIENT_SECRET,
- quota_project_id="quota-project-123",
- )
- headers = {}
- creds.apply(headers)
- assert headers["x-goog-user-project"] == "quota-project-123"
- assert "token" in headers["authorization"]
- def test_apply_with_no_quota_project_id(self):
- creds = credentials.Credentials(
- token="token",
- refresh_token=self.REFRESH_TOKEN,
- token_uri=self.TOKEN_URI,
- client_id=self.CLIENT_ID,
- client_secret=self.CLIENT_SECRET,
- )
- headers = {}
- creds.apply(headers)
- assert "x-goog-user-project" not in headers
- assert "token" in headers["authorization"]
- def test_with_quota_project(self):
- creds = credentials.Credentials(
- token="token",
- refresh_token=self.REFRESH_TOKEN,
- token_uri=self.TOKEN_URI,
- client_id=self.CLIENT_ID,
- client_secret=self.CLIENT_SECRET,
- quota_project_id="quota-project-123",
- )
- new_creds = creds.with_quota_project("new-project-456")
- assert new_creds.quota_project_id == "new-project-456"
- headers = {}
- creds.apply(headers)
- assert "x-goog-user-project" in headers
- def test_with_universe_domain(self):
- creds = credentials.Credentials(token="token")
- assert creds.universe_domain == "googleapis.com"
- new_creds = creds.with_universe_domain("dummy_universe.com")
- assert new_creds.universe_domain == "dummy_universe.com"
- def test_with_account(self):
- creds = credentials.Credentials(token="token")
- assert creds.account == ""
- new_creds = creds.with_account("mock@example.com")
- assert new_creds.account == "mock@example.com"
- def test_with_token_uri(self):
- info = AUTH_USER_INFO.copy()
- creds = credentials.Credentials.from_authorized_user_info(info)
- new_token_uri = "https://oauth2-eu.googleapis.com/token"
- assert creds._token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT
- creds_with_new_token_uri = creds.with_token_uri(new_token_uri)
- assert creds_with_new_token_uri._token_uri == new_token_uri
- def test_from_authorized_user_info(self):
- info = AUTH_USER_INFO.copy()
- creds = credentials.Credentials.from_authorized_user_info(info)
- assert creds.client_secret == info["client_secret"]
- assert creds.client_id == info["client_id"]
- assert creds.refresh_token == info["refresh_token"]
- assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT
- assert creds.scopes is None
- scopes = ["email", "profile"]
- creds = credentials.Credentials.from_authorized_user_info(info, scopes)
- assert creds.client_secret == info["client_secret"]
- assert creds.client_id == info["client_id"]
- assert creds.refresh_token == info["refresh_token"]
- assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT
- assert creds.scopes == scopes
- info["scopes"] = "email" # single non-array scope from file
- creds = credentials.Credentials.from_authorized_user_info(info)
- assert creds.scopes == [info["scopes"]]
- info["scopes"] = ["email", "profile"] # array scope from file
- creds = credentials.Credentials.from_authorized_user_info(info)
- assert creds.scopes == info["scopes"]
- expiry = datetime.datetime(2020, 8, 14, 15, 54, 1)
- info["expiry"] = expiry.isoformat() + "Z"
- creds = credentials.Credentials.from_authorized_user_info(info)
- assert creds.expiry == expiry
- assert creds.expired
- def test_from_authorized_user_file(self):
- info = AUTH_USER_INFO.copy()
- creds = credentials.Credentials.from_authorized_user_file(AUTH_USER_JSON_FILE)
- assert creds.client_secret == info["client_secret"]
- assert creds.client_id == info["client_id"]
- assert creds.refresh_token == info["refresh_token"]
- assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT
- assert creds.scopes is None
- assert creds.rapt_token is None
- scopes = ["email", "profile"]
- creds = credentials.Credentials.from_authorized_user_file(
- AUTH_USER_JSON_FILE, scopes
- )
- assert creds.client_secret == info["client_secret"]
- assert creds.client_id == info["client_id"]
- assert creds.refresh_token == info["refresh_token"]
- assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT
- assert creds.scopes == scopes
- def test_from_authorized_user_file_with_rapt_token(self):
- info = AUTH_USER_INFO.copy()
- file_path = os.path.join(DATA_DIR, "authorized_user_with_rapt_token.json")
- creds = credentials.Credentials.from_authorized_user_file(file_path)
- assert creds.client_secret == info["client_secret"]
- assert creds.client_id == info["client_id"]
- assert creds.refresh_token == info["refresh_token"]
- assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT
- assert creds.scopes is None
- assert creds.rapt_token == "rapt"
- def test_to_json(self):
- info = AUTH_USER_INFO.copy()
- expiry = datetime.datetime(2020, 8, 14, 15, 54, 1)
- info["expiry"] = expiry.isoformat() + "Z"
- creds = credentials.Credentials.from_authorized_user_info(info)
- assert creds.expiry == expiry
- # Test with no `strip` arg
- json_output = creds.to_json()
- json_asdict = json.loads(json_output)
- assert json_asdict.get("token") == creds.token
- assert json_asdict.get("refresh_token") == creds.refresh_token
- assert json_asdict.get("token_uri") == creds.token_uri
- assert json_asdict.get("client_id") == creds.client_id
- assert json_asdict.get("scopes") == creds.scopes
- assert json_asdict.get("client_secret") == creds.client_secret
- assert json_asdict.get("expiry") == info["expiry"]
- assert json_asdict.get("universe_domain") == creds.universe_domain
- assert json_asdict.get("account") == creds.account
- # Test with a `strip` arg
- json_output = creds.to_json(strip=["client_secret"])
- json_asdict = json.loads(json_output)
- assert json_asdict.get("token") == creds.token
- assert json_asdict.get("refresh_token") == creds.refresh_token
- assert json_asdict.get("token_uri") == creds.token_uri
- assert json_asdict.get("client_id") == creds.client_id
- assert json_asdict.get("scopes") == creds.scopes
- assert json_asdict.get("client_secret") is None
- # Test with no expiry
- creds.expiry = None
- json_output = creds.to_json()
- json_asdict = json.loads(json_output)
- assert json_asdict.get("expiry") is None
- def test_pickle_and_unpickle(self):
- creds = self.make_credentials()
- unpickled = pickle.loads(pickle.dumps(creds))
- # make sure attributes aren't lost during pickling
- assert list(creds.__dict__).sort() == list(unpickled.__dict__).sort()
- for attr in list(creds.__dict__):
- # Worker should always be None
- if attr == "_refresh_worker":
- assert getattr(unpickled, attr) is None
- else:
- assert getattr(creds, attr) == getattr(unpickled, attr)
- def test_pickle_and_unpickle_universe_domain(self):
- # old version of auth lib doesn't have _universe_domain, so the pickled
- # cred doesn't have such a field.
- creds = self.make_credentials()
- del creds._universe_domain
- unpickled = pickle.loads(pickle.dumps(creds))
- # make sure the unpickled cred sets _universe_domain to default.
- assert unpickled.universe_domain == "googleapis.com"
- def test_pickle_and_unpickle_with_refresh_handler(self):
- expected_expiry = _helpers.utcnow() + datetime.timedelta(seconds=2800)
- refresh_handler = mock.Mock(return_value=("TOKEN", expected_expiry))
- creds = credentials.Credentials(
- token=None,
- refresh_token=None,
- token_uri=None,
- client_id=None,
- client_secret=None,
- rapt_token=None,
- refresh_handler=refresh_handler,
- )
- unpickled = pickle.loads(pickle.dumps(creds))
- # make sure attributes aren't lost during pickling
- assert list(creds.__dict__).sort() == list(unpickled.__dict__).sort()
- for attr in list(creds.__dict__):
- # For the _refresh_handler property, the unpickled creds should be
- # set to None.
- if attr == "_refresh_handler" or attr == "_refresh_worker":
- assert getattr(unpickled, attr) is None
- else:
- assert getattr(creds, attr) == getattr(unpickled, attr)
- def test_pickle_with_missing_attribute(self):
- creds = self.make_credentials()
- # remove an optional attribute before pickling
- # this mimics a pickle created with a previous class definition with
- # fewer attributes
- del creds.__dict__["_quota_project_id"]
- del creds.__dict__["_refresh_handler"]
- del creds.__dict__["_refresh_worker"]
- unpickled = pickle.loads(pickle.dumps(creds))
- # Attribute should be initialized by `__setstate__`
- assert unpickled.quota_project_id is None
- # pickles are not compatible across versions
- @pytest.mark.skipif(
- sys.version_info < (3, 5),
- reason="pickle file can only be loaded with Python >= 3.5",
- )
- def test_unpickle_old_credentials_pickle(self):
- # make sure a credentials file pickled with an older
- # library version (google-auth==1.5.1) can be unpickled
- with open(
- os.path.join(DATA_DIR, "old_oauth_credentials_py3.pickle"), "rb"
- ) as f:
- credentials = pickle.load(f)
- assert credentials.quota_project_id is None
- class TestUserAccessTokenCredentials(object):
- def test_instance(self):
- with pytest.warns(
- UserWarning, match="UserAccessTokenCredentials is deprecated"
- ):
- cred = credentials.UserAccessTokenCredentials()
- assert cred._account is None
- cred = cred.with_account("account")
- assert cred._account == "account"
- @mock.patch("google.auth._cloud_sdk.get_auth_access_token", autospec=True)
- def test_refresh(self, get_auth_access_token):
- with pytest.warns(
- UserWarning, match="UserAccessTokenCredentials is deprecated"
- ):
- get_auth_access_token.return_value = "access_token"
- cred = credentials.UserAccessTokenCredentials()
- cred.refresh(None)
- assert cred.token == "access_token"
- def test_with_quota_project(self):
- with pytest.warns(
- UserWarning, match="UserAccessTokenCredentials is deprecated"
- ):
- cred = credentials.UserAccessTokenCredentials()
- quota_project_cred = cred.with_quota_project("project-foo")
- assert quota_project_cred._quota_project_id == "project-foo"
- assert quota_project_cred._account == cred._account
- @mock.patch(
- "google.oauth2.credentials.UserAccessTokenCredentials.apply", autospec=True
- )
- @mock.patch(
- "google.oauth2.credentials.UserAccessTokenCredentials.refresh", autospec=True
- )
- def test_before_request(self, refresh, apply):
- with pytest.warns(
- UserWarning, match="UserAccessTokenCredentials is deprecated"
- ):
- cred = credentials.UserAccessTokenCredentials()
- cred.before_request(mock.Mock(), "GET", "https://example.com", {})
- refresh.assert_called()
- apply.assert_called()
|